proxycfg: remove dependency on cache.UpdateEvent (#13144)

OSS portion of enterprise PR 1857.

This removes (most) references to the `cache.UpdateEvent` type in the
`proxycfg` package.

As we're going to be direct usage of the agent cache with interfaces that
can be satisfied by alternative server-local datasources, it doesn't make
sense to depend on this type everywhere anymore (particularly on the
`state.ch` channel).

We also plan to extract `proxycfg` out of Consul into a shared library in
the future, which would require removing this dependency.

Aside from a fairly rote find-and-replace, the main change is that the
`cache.Cache` and `health.Client` types now accept a callback function
parameter, rather than a `chan<- cache.UpdateEvents`. This allows us to
do the type conversion without running another goroutine.
This commit is contained in:
Dan Upton 2022-05-20 15:47:40 +01:00 committed by GitHub
parent 5554a40c53
commit 30775ed54d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
26 changed files with 319 additions and 230 deletions

View file

@ -626,8 +626,8 @@ func (a *Agent) Start(ctx context.Context) error {
// Start the proxy config manager. // Start the proxy config manager.
a.proxyConfig, err = proxycfg.NewManager(proxycfg.ManagerConfig{ a.proxyConfig, err = proxycfg.NewManager(proxycfg.ManagerConfig{
Cache: a.cache, Cache: &proxycfg.CacheWrapper{Cache: a.cache},
Health: a.rpcClientHealth, Health: &proxycfg.HealthWrapper{Health: a.rpcClientHealth},
Logger: a.logger.Named(logging.ProxyConfig), Logger: a.logger.Named(logging.ProxyConfig),
State: a.State, State: a.State,
Tokens: a.baseDeps.Tokens, Tokens: a.baseDeps.Tokens,

43
agent/cache/watch.go vendored
View file

@ -23,6 +23,9 @@ type UpdateEvent struct {
Err error Err error
} }
// Callback is the function type accepted by NotifyCallback.
type Callback func(ctx context.Context, event UpdateEvent)
// Notify registers a desire to be updated about changes to a cache result. // Notify registers a desire to be updated about changes to a cache result.
// //
// It is a helper that abstracts code from performing their own "blocking" query // It is a helper that abstracts code from performing their own "blocking" query
@ -56,6 +59,24 @@ func (c *Cache) Notify(
r Request, r Request,
correlationID string, correlationID string,
ch chan<- UpdateEvent, ch chan<- UpdateEvent,
) error {
return c.NotifyCallback(ctx, t, r, correlationID, func(ctx context.Context, event UpdateEvent) {
select {
case ch <- event:
case <-ctx.Done():
}
})
}
// NotifyCallback allows you to receive notifications about changes to a cache
// result in the same way as Notify, but accepts a callback function instead of
// a channel.
func (c *Cache) NotifyCallback(
ctx context.Context,
t string,
r Request,
correlationID string,
cb Callback,
) error { ) error {
c.typesLock.RLock() c.typesLock.RLock()
tEntry, ok := c.types[t] tEntry, ok := c.types[t]
@ -65,7 +86,7 @@ func (c *Cache) Notify(
} }
if tEntry.Opts.SupportsBlocking { if tEntry.Opts.SupportsBlocking {
go c.notifyBlockingQuery(ctx, newGetOptions(tEntry, r), correlationID, ch) go c.notifyBlockingQuery(ctx, newGetOptions(tEntry, r), correlationID, cb)
return nil return nil
} }
@ -73,11 +94,11 @@ func (c *Cache) Notify(
if info.MaxAge == 0 { if info.MaxAge == 0 {
return fmt.Errorf("Cannot use Notify for polling cache types without specifying the MaxAge") return fmt.Errorf("Cannot use Notify for polling cache types without specifying the MaxAge")
} }
go c.notifyPollingQuery(ctx, newGetOptions(tEntry, r), correlationID, ch) go c.notifyPollingQuery(ctx, newGetOptions(tEntry, r), correlationID, cb)
return nil return nil
} }
func (c *Cache) notifyBlockingQuery(ctx context.Context, r getOptions, correlationID string, ch chan<- UpdateEvent) { func (c *Cache) notifyBlockingQuery(ctx context.Context, r getOptions, correlationID string, cb Callback) {
// Always start at 0 index to deliver the initial (possibly currently cached // Always start at 0 index to deliver the initial (possibly currently cached
// value). // value).
index := uint64(0) index := uint64(0)
@ -101,12 +122,7 @@ func (c *Cache) notifyBlockingQuery(ctx context.Context, r getOptions, correlati
// Check the index of the value returned in the cache entry to be sure it // Check the index of the value returned in the cache entry to be sure it
// changed // changed
if index == 0 || index < meta.Index { if index == 0 || index < meta.Index {
u := UpdateEvent{correlationID, res, meta, err} cb(ctx, UpdateEvent{correlationID, res, meta, err})
select {
case ch <- u:
case <-ctx.Done():
return
}
// Update index for next request // Update index for next request
index = meta.Index index = meta.Index
@ -143,7 +159,7 @@ func (c *Cache) notifyBlockingQuery(ctx context.Context, r getOptions, correlati
} }
} }
func (c *Cache) notifyPollingQuery(ctx context.Context, r getOptions, correlationID string, ch chan<- UpdateEvent) { func (c *Cache) notifyPollingQuery(ctx context.Context, r getOptions, correlationID string, cb Callback) {
index := uint64(0) index := uint64(0)
failures := uint(0) failures := uint(0)
@ -166,12 +182,7 @@ func (c *Cache) notifyPollingQuery(ctx context.Context, r getOptions, correlatio
// Check for a change in the value or an index change // Check for a change in the value or an index change
if index < meta.Index || !reflect.DeepEqual(lastValue, res) { if index < meta.Index || !reflect.DeepEqual(lastValue, res) {
u := UpdateEvent{correlationID, res, meta, err} cb(ctx, UpdateEvent{correlationID, res, meta, err})
select {
case ch <- u:
case <-ctx.Done():
return
}
// Update index and lastValue // Update index and lastValue
lastValue = res lastValue = res

View file

@ -5,7 +5,6 @@ import (
"fmt" "fmt"
"strings" "strings"
"github.com/hashicorp/consul/agent/cache"
cachetype "github.com/hashicorp/consul/agent/cache-types" cachetype "github.com/hashicorp/consul/agent/cache-types"
"github.com/hashicorp/consul/agent/configentry" "github.com/hashicorp/consul/agent/configentry"
"github.com/hashicorp/consul/agent/consul/discoverychain" "github.com/hashicorp/consul/agent/consul/discoverychain"
@ -220,7 +219,7 @@ func (s *handlerConnectProxy) initialize(ctx context.Context) (ConfigSnapshot, e
return snap, nil return snap, nil
} }
func (s *handlerConnectProxy) handleUpdate(ctx context.Context, u cache.UpdateEvent, snap *ConfigSnapshot) error { func (s *handlerConnectProxy) handleUpdate(ctx context.Context, u UpdateEvent, snap *ConfigSnapshot) error {
if u.Err != nil { if u.Err != nil {
return fmt.Errorf("error filling agent cache: %v", u.Err) return fmt.Errorf("error filling agent cache: %v", u.Err)
} }

60
agent/proxycfg/glue.go Normal file
View file

@ -0,0 +1,60 @@
// TODO(agentless): these glue types belong in the agent package, but moving
// them is a little tricky because the proxycfg tests use them. It should be
// easier to break apart once we no longer depend on cache.Notify directly.
package proxycfg
import (
"context"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/rpcclient/health"
"github.com/hashicorp/consul/agent/structs"
)
// HealthWrapper wraps health.Client so that the rest of the proxycfg package
// doesn't need to reference cache.UpdateEvent (it will be extracted into a
// shared library in the future).
type HealthWrapper struct {
Health *health.Client
}
func (w *HealthWrapper) Notify(
ctx context.Context,
req structs.ServiceSpecificRequest,
correlationID string,
ch chan<- UpdateEvent,
) error {
return w.Health.Notify(ctx, req, correlationID, dispatchCacheUpdate(ctx, ch))
}
// CacheWrapper wraps cache.Cache so that the rest of the proxycfg package
// doesn't need to reference cache.UpdateEvent (it will be extracted into a
// shared library in the future).
type CacheWrapper struct {
Cache *cache.Cache
}
func (w *CacheWrapper) Notify(
ctx context.Context,
t string,
req cache.Request,
correlationID string,
ch chan<- UpdateEvent,
) error {
return w.Cache.NotifyCallback(ctx, t, req, correlationID, dispatchCacheUpdate(ctx, ch))
}
func dispatchCacheUpdate(ctx context.Context, ch chan<- UpdateEvent) cache.Callback {
return func(ctx context.Context, e cache.UpdateEvent) {
u := UpdateEvent{
CorrelationID: e.CorrelationID,
Result: e.Result,
Err: e.Err,
}
select {
case ch <- u:
case <-ctx.Done():
}
}
}

View file

@ -4,7 +4,6 @@ import (
"context" "context"
"fmt" "fmt"
"github.com/hashicorp/consul/agent/cache"
cachetype "github.com/hashicorp/consul/agent/cache-types" cachetype "github.com/hashicorp/consul/agent/cache-types"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
) )
@ -70,7 +69,7 @@ func (s *handlerIngressGateway) initialize(ctx context.Context) (ConfigSnapshot,
return snap, nil return snap, nil
} }
func (s *handlerIngressGateway) handleUpdate(ctx context.Context, u cache.UpdateEvent, snap *ConfigSnapshot) error { func (s *handlerIngressGateway) handleUpdate(ctx context.Context, u UpdateEvent, snap *ConfigSnapshot) error {
if u.Err != nil { if u.Err != nil {
return fmt.Errorf("error filling agent cache: %v", u.Err) return fmt.Errorf("error filling agent cache: %v", u.Err)
} }

View file

@ -6,7 +6,6 @@ import (
"github.com/hashicorp/go-hclog" "github.com/hashicorp/go-hclog"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/local" "github.com/hashicorp/consul/agent/local"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/token" "github.com/hashicorp/consul/agent/token"
@ -59,7 +58,7 @@ type Manager struct {
type ManagerConfig struct { type ManagerConfig struct {
// Cache is the agent's cache instance that can be used to retrieve, store and // Cache is the agent's cache instance that can be used to retrieve, store and
// monitor state for the proxies. // monitor state for the proxies.
Cache *cache.Cache Cache CacheNotifier
// Health provides service health updates on a notification channel. // Health provides service health updates on a notification channel.
Health Health Health Health
// state is the agent's local state to be watched for new proxy registrations. // state is the agent's local state to be watched for new proxy registrations.

View file

@ -376,8 +376,8 @@ func testManager_BasicLifecycle(
// Create manager // Create manager
m, err := NewManager(ManagerConfig{ m, err := NewManager(ManagerConfig{
Cache: c, Cache: &CacheWrapper{c},
Health: &health.Client{Cache: c, CacheName: cachetype.HealthServicesName}, Health: &HealthWrapper{&health.Client{Cache: c, CacheName: cachetype.HealthServicesName}},
State: state, State: state,
Source: source, Source: source,
Logger: logger, Logger: logger,
@ -509,7 +509,7 @@ func TestManager_deliverLatest(t *testing.T) {
// None of these need to do anything to test this method just be valid // None of these need to do anything to test this method just be valid
logger := testutil.Logger(t) logger := testutil.Logger(t)
cfg := ManagerConfig{ cfg := ManagerConfig{
Cache: cache.New(cache.Options{EntryFetchRate: rate.Inf, EntryFetchMaxBurst: 2}), Cache: &CacheWrapper{cache.New(cache.Options{EntryFetchRate: rate.Inf, EntryFetchMaxBurst: 2})},
State: local.NewState(local.Config{}, logger, &token.Store{}), State: local.NewState(local.Config{}, logger, &token.Store{}),
Source: &structs.QuerySource{ Source: &structs.QuerySource{
Node: "node1", Node: "node1",
@ -581,8 +581,8 @@ func TestManager_SyncState_DefaultToken(t *testing.T) {
state.TriggerSyncChanges = func() {} state.TriggerSyncChanges = func() {}
m, err := NewManager(ManagerConfig{ m, err := NewManager(ManagerConfig{
Cache: c, Cache: &CacheWrapper{c},
Health: &health.Client{Cache: c, CacheName: cachetype.HealthServicesName}, Health: &HealthWrapper{&health.Client{Cache: c, CacheName: cachetype.HealthServicesName}},
State: state, State: state,
Tokens: tokens, Tokens: tokens,
Source: &structs.QuerySource{Datacenter: "dc1"}, Source: &structs.QuerySource{Datacenter: "dc1"},
@ -626,8 +626,8 @@ func TestManager_SyncState_No_Notify(t *testing.T) {
state.TriggerSyncChanges = func() {} state.TriggerSyncChanges = func() {}
m, err := NewManager(ManagerConfig{ m, err := NewManager(ManagerConfig{
Cache: c, Cache: &CacheWrapper{c},
Health: &health.Client{Cache: c, CacheName: cachetype.HealthServicesName}, Health: &HealthWrapper{&health.Client{Cache: c, CacheName: cachetype.HealthServicesName}},
State: state, State: state,
Tokens: tokens, Tokens: tokens,
Source: &structs.QuerySource{Datacenter: "dc1"}, Source: &structs.QuerySource{Datacenter: "dc1"},
@ -673,7 +673,7 @@ func TestManager_SyncState_No_Notify(t *testing.T) {
// update the leaf certs // update the leaf certs
roots, issuedCert := TestCerts(t) roots, issuedCert := TestCerts(t)
notifyCH <- cache.UpdateEvent{ notifyCH <- UpdateEvent{
CorrelationID: leafWatchID, CorrelationID: leafWatchID,
Result: issuedCert, Result: issuedCert,
Err: nil, Err: nil,
@ -688,7 +688,7 @@ func TestManager_SyncState_No_Notify(t *testing.T) {
} }
// update the root certs // update the root certs
notifyCH <- cache.UpdateEvent{ notifyCH <- UpdateEvent{
CorrelationID: rootsWatchID, CorrelationID: rootsWatchID,
Result: roots, Result: roots,
Err: nil, Err: nil,
@ -704,7 +704,7 @@ func TestManager_SyncState_No_Notify(t *testing.T) {
} }
// update the mesh config entry // update the mesh config entry
notifyCH <- cache.UpdateEvent{ notifyCH <- UpdateEvent{
CorrelationID: meshConfigEntryID, CorrelationID: meshConfigEntryID,
Result: &structs.ConfigEntryResponse{}, Result: &structs.ConfigEntryResponse{},
Err: nil, Err: nil,
@ -723,7 +723,7 @@ func TestManager_SyncState_No_Notify(t *testing.T) {
readEvent <- true readEvent <- true
// update the intentions // update the intentions
notifyCH <- cache.UpdateEvent{ notifyCH <- UpdateEvent{
CorrelationID: intentionsWatchID, CorrelationID: intentionsWatchID,
Result: &structs.IndexedIntentionMatches{}, Result: &structs.IndexedIntentionMatches{},
Err: nil, Err: nil,
@ -741,7 +741,7 @@ func TestManager_SyncState_No_Notify(t *testing.T) {
// send two snapshots back to back without reading them to overflow the snapshot channel and get to the default use case // send two snapshots back to back without reading them to overflow the snapshot channel and get to the default use case
for i := 0; i < 2; i++ { for i := 0; i < 2; i++ {
time.Sleep(250 * time.Millisecond) time.Sleep(250 * time.Millisecond)
notifyCH <- cache.UpdateEvent{ notifyCH <- UpdateEvent{
CorrelationID: leafWatchID, CorrelationID: leafWatchID,
Result: issuedCert, Result: issuedCert,
Err: nil, Err: nil,

View file

@ -6,7 +6,6 @@ import (
"strings" "strings"
"time" "time"
"github.com/hashicorp/consul/agent/cache"
cachetype "github.com/hashicorp/consul/agent/cache-types" cachetype "github.com/hashicorp/consul/agent/cache-types"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/logging" "github.com/hashicorp/consul/logging"
@ -119,7 +118,7 @@ func (s *handlerMeshGateway) initializeCrossDCWatches(ctx context.Context) error
return nil return nil
} }
func (s *handlerMeshGateway) handleUpdate(ctx context.Context, u cache.UpdateEvent, snap *ConfigSnapshot) error { func (s *handlerMeshGateway) handleUpdate(ctx context.Context, u UpdateEvent, snap *ConfigSnapshot) error {
if u.Err != nil { if u.Err != nil {
return fmt.Errorf("error filling agent cache: %v", u.Err) return fmt.Errorf("error filling agent cache: %v", u.Err)
} }

View file

@ -7,14 +7,12 @@ import (
"context" "context"
"github.com/hashicorp/go-hclog" "github.com/hashicorp/go-hclog"
"github.com/hashicorp/consul/agent/cache"
) )
func (s *handlerMeshGateway) initializeEntWatches(_ context.Context) error { func (s *handlerMeshGateway) initializeEntWatches(_ context.Context) error {
return nil return nil
} }
func (s *handlerMeshGateway) handleEntUpdate(_ hclog.Logger, _ context.Context, _ cache.UpdateEvent, _ *ConfigSnapshot) error { func (s *handlerMeshGateway) handleEntUpdate(_ hclog.Logger, _ context.Context, _ UpdateEvent, _ *ConfigSnapshot) error {
return nil return nil
} }

View file

@ -17,13 +17,21 @@ import (
"github.com/hashicorp/consul/logging" "github.com/hashicorp/consul/logging"
) )
// UpdateEvent contains new data for a resource we are subscribed to (e.g. an
// agent cache entry).
type UpdateEvent struct {
CorrelationID string
Result interface{}
Err error
}
type CacheNotifier interface { type CacheNotifier interface {
Notify(ctx context.Context, t string, r cache.Request, Notify(ctx context.Context, t string, r cache.Request,
correlationID string, ch chan<- cache.UpdateEvent) error correlationID string, ch chan<- UpdateEvent) error
} }
type Health interface { type Health interface {
Notify(ctx context.Context, req structs.ServiceSpecificRequest, correlationID string, ch chan<- cache.UpdateEvent) error Notify(ctx context.Context, req structs.ServiceSpecificRequest, correlationID string, ch chan<- UpdateEvent) error
} }
const ( const (
@ -72,7 +80,7 @@ type state struct {
// in Watch. // in Watch.
cancel func() cancel func()
ch chan cache.UpdateEvent ch chan UpdateEvent
snapCh chan ConfigSnapshot snapCh chan ConfigSnapshot
reqCh chan chan *ConfigSnapshot reqCh chan chan *ConfigSnapshot
} }
@ -153,7 +161,7 @@ func newState(ns *structs.NodeService, token string, config stateConfig) (*state
// conservative to handle larger numbers of upstreams correctly but gives // conservative to handle larger numbers of upstreams correctly but gives
// some head room for normal operation to be non-blocking in most typical // some head room for normal operation to be non-blocking in most typical
// cases. // cases.
ch := make(chan cache.UpdateEvent, 10) ch := make(chan UpdateEvent, 10)
s, err := newServiceInstanceFromNodeService(ns, token) s, err := newServiceInstanceFromNodeService(ns, token)
if err != nil { if err != nil {
@ -175,7 +183,7 @@ func newState(ns *structs.NodeService, token string, config stateConfig) (*state
}, nil }, nil
} }
func newKindHandler(config stateConfig, s serviceInstance, ch chan cache.UpdateEvent) (kindHandler, error) { func newKindHandler(config stateConfig, s serviceInstance, ch chan UpdateEvent) (kindHandler, error) {
var handler kindHandler var handler kindHandler
h := handlerState{stateConfig: config, serviceInstance: s, ch: ch} h := handlerState{stateConfig: config, serviceInstance: s, ch: ch}
@ -228,7 +236,7 @@ func newServiceInstanceFromNodeService(ns *structs.NodeService, token string) (s
type kindHandler interface { type kindHandler interface {
initialize(ctx context.Context) (ConfigSnapshot, error) initialize(ctx context.Context) (ConfigSnapshot, error)
handleUpdate(ctx context.Context, u cache.UpdateEvent, snap *ConfigSnapshot) error handleUpdate(ctx context.Context, u UpdateEvent, snap *ConfigSnapshot) error
} }
// Watch initialized watches on all necessary cache data for the current proxy // Watch initialized watches on all necessary cache data for the current proxy
@ -261,7 +269,7 @@ func (s *state) Close() error {
type handlerState struct { type handlerState struct {
stateConfig // TODO: un-embed stateConfig // TODO: un-embed
serviceInstance // TODO: un-embed serviceInstance // TODO: un-embed
ch chan cache.UpdateEvent ch chan UpdateEvent
} }
func newConfigSnapshotFromServiceInstance(s serviceInstance, config stateConfig) ConfigSnapshot { func newConfigSnapshotFromServiceInstance(s serviceInstance, config stateConfig) ConfigSnapshot {
@ -450,7 +458,7 @@ func hostnameEndpoints(logger hclog.Logger, localKey GatewayKey, nodes structs.C
type gatewayWatchOpts struct { type gatewayWatchOpts struct {
notifier CacheNotifier notifier CacheNotifier
notifyCh chan cache.UpdateEvent notifyCh chan UpdateEvent
source structs.QuerySource source structs.QuerySource
token string token string
key GatewayKey key GatewayKey

View file

@ -126,7 +126,7 @@ func TestStateChanged(t *testing.T) {
type testCacheNotifierRequest struct { type testCacheNotifierRequest struct {
cacheType string cacheType string
request cache.Request request cache.Request
ch chan<- cache.UpdateEvent cb func(UpdateEvent)
} }
type testCacheNotifier struct { type testCacheNotifier struct {
@ -140,9 +140,23 @@ func newTestCacheNotifier() *testCacheNotifier {
} }
} }
func (cn *testCacheNotifier) Notify(ctx context.Context, t string, r cache.Request, correlationId string, ch chan<- cache.UpdateEvent) error { func (cn *testCacheNotifier) Notify(ctx context.Context, t string, r cache.Request, correlationId string, ch chan<- UpdateEvent) error {
cn.lock.Lock() cn.lock.Lock()
cn.notifiers[correlationId] = testCacheNotifierRequest{t, r, ch} cn.notifiers[correlationId] = testCacheNotifierRequest{t, r, func(event UpdateEvent) { ch <- event }}
cn.lock.Unlock()
return nil
}
// NotifyCallback satisfies the health.CacheGetter interface.
func (cn *testCacheNotifier) NotifyCallback(ctx context.Context, t string, r cache.Request, correlationId string, cb cache.Callback) error {
cn.lock.Lock()
cn.notifiers[correlationId] = testCacheNotifierRequest{t, r, func(event UpdateEvent) {
cb(ctx, cache.UpdateEvent{
CorrelationID: event.CorrelationID,
Result: event.Result,
Err: event.Err,
})
}}
cn.lock.Unlock() cn.lock.Unlock()
return nil return nil
} }
@ -159,20 +173,16 @@ func (cn *testCacheNotifier) getNotifierRequest(t testing.TB, correlationId stri
return req return req
} }
func (cn *testCacheNotifier) getChanForCorrelationId(t testing.TB, correlationId string) chan<- cache.UpdateEvent { func (cn *testCacheNotifier) sendNotification(t testing.TB, correlationId string, event UpdateEvent) {
req := cn.getNotifierRequest(t, correlationId) req := cn.getNotifierRequest(t, correlationId)
require.NotNil(t, req.ch) require.NotNil(t, req.cb)
return req.ch req.cb(event)
}
func (cn *testCacheNotifier) sendNotification(t testing.TB, correlationId string, event cache.UpdateEvent) {
cn.getChanForCorrelationId(t, correlationId) <- event
} }
func (cn *testCacheNotifier) verifyWatch(t testing.TB, correlationId string) (string, cache.Request) { func (cn *testCacheNotifier) verifyWatch(t testing.TB, correlationId string) (string, cache.Request) {
// t.Logf("Watches: %+v", cn.notifiers) // t.Logf("Watches: %+v", cn.notifiers)
req := cn.getNotifierRequest(t, correlationId) req := cn.getNotifierRequest(t, correlationId)
require.NotNil(t, req.ch) require.NotNil(t, req.cb)
return req.cacheType, req.request return req.cacheType, req.request
} }
@ -348,7 +358,7 @@ func genVerifyConfigEntryWatch(expectedKind, expectedName, expectedDatacenter st
} }
} }
func ingressConfigWatchEvent(gwTLS bool, mixedTLS bool) cache.UpdateEvent { func ingressConfigWatchEvent(gwTLS bool, mixedTLS bool) UpdateEvent {
e := &structs.IngressGatewayConfigEntry{ e := &structs.IngressGatewayConfigEntry{
TLS: structs.GatewayTLSConfig{ TLS: structs.GatewayTLSConfig{
Enabled: gwTLS, Enabled: gwTLS,
@ -371,7 +381,7 @@ func ingressConfigWatchEvent(gwTLS bool, mixedTLS bool) cache.UpdateEvent {
} }
} }
return cache.UpdateEvent{ return UpdateEvent{
CorrelationID: gatewayConfigWatchID, CorrelationID: gatewayConfigWatchID,
Result: &structs.ConfigEntryResponse{ Result: &structs.ConfigEntryResponse{
Entry: e, Entry: e,
@ -420,8 +430,8 @@ func TestState_WatchesAndUpdates(t *testing.T) {
// TODO(peering): NewUpstreamIDFromServiceName should take a PeerName // TODO(peering): NewUpstreamIDFromServiceName should take a PeerName
extApiUID.Peer = "peer-a" extApiUID.Peer = "peer-a"
rootWatchEvent := func() cache.UpdateEvent { rootWatchEvent := func() UpdateEvent {
return cache.UpdateEvent{ return UpdateEvent{
CorrelationID: rootsWatchID, CorrelationID: rootsWatchID,
Result: indexedRoots, Result: indexedRoots,
Err: nil, Err: nil,
@ -430,7 +440,7 @@ func TestState_WatchesAndUpdates(t *testing.T) {
type verificationStage struct { type verificationStage struct {
requiredWatches map[string]verifyWatchRequest requiredWatches map[string]verifyWatchRequest
events []cache.UpdateEvent events []UpdateEvent
verifySnapshot func(t testing.TB, snap *ConfigSnapshot) verifySnapshot func(t testing.TB, snap *ConfigSnapshot)
} }
@ -562,7 +572,7 @@ func TestState_WatchesAndUpdates(t *testing.T) {
}, },
}), }),
}, },
events: []cache.UpdateEvent{ events: []UpdateEvent{
rootWatchEvent(), rootWatchEvent(),
{ {
CorrelationID: leafWatchID, CorrelationID: leafWatchID,
@ -752,7 +762,7 @@ func TestState_WatchesAndUpdates(t *testing.T) {
}, },
}, },
{ {
events: []cache.UpdateEvent{ events: []UpdateEvent{
rootWatchEvent(), rootWatchEvent(),
}, },
verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) { verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) {
@ -768,7 +778,7 @@ func TestState_WatchesAndUpdates(t *testing.T) {
}, },
}, },
{ {
events: []cache.UpdateEvent{ events: []UpdateEvent{
{ {
CorrelationID: serviceListWatchID, CorrelationID: serviceListWatchID,
Result: &structs.IndexedServiceList{ Result: &structs.IndexedServiceList{
@ -807,7 +817,7 @@ func TestState_WatchesAndUpdates(t *testing.T) {
serviceListWatchID: genVerifyListServicesWatch("dc1"), serviceListWatchID: genVerifyListServicesWatch("dc1"),
datacentersWatchID: verifyDatacentersWatch, datacentersWatchID: verifyDatacentersWatch,
}, },
events: []cache.UpdateEvent{ events: []UpdateEvent{
rootWatchEvent(), rootWatchEvent(),
{ {
CorrelationID: serviceListWatchID, CorrelationID: serviceListWatchID,
@ -826,7 +836,7 @@ func TestState_WatchesAndUpdates(t *testing.T) {
}, },
}, },
{ {
events: []cache.UpdateEvent{ events: []UpdateEvent{
{ {
CorrelationID: serviceListWatchID, CorrelationID: serviceListWatchID,
Result: &structs.IndexedServiceList{ Result: &structs.IndexedServiceList{
@ -845,7 +855,7 @@ func TestState_WatchesAndUpdates(t *testing.T) {
}, },
}, },
{ {
events: []cache.UpdateEvent{ events: []UpdateEvent{
{ {
CorrelationID: "mesh-gateway:dc4", CorrelationID: "mesh-gateway:dc4",
Result: &structs.IndexedNodesWithGateways{ Result: &structs.IndexedNodesWithGateways{
@ -889,7 +899,7 @@ func TestState_WatchesAndUpdates(t *testing.T) {
}, },
}, },
{ {
events: []cache.UpdateEvent{ events: []UpdateEvent{
{ {
CorrelationID: federationStateListGatewaysWatchID, CorrelationID: federationStateListGatewaysWatchID,
Result: &structs.DatacenterIndexedCheckServiceNodes{ Result: &structs.DatacenterIndexedCheckServiceNodes{
@ -958,7 +968,7 @@ func TestState_WatchesAndUpdates(t *testing.T) {
}, },
}, },
{ {
events: []cache.UpdateEvent{ events: []UpdateEvent{
rootWatchEvent(), rootWatchEvent(),
{ {
CorrelationID: meshConfigEntryID, CorrelationID: meshConfigEntryID,
@ -971,7 +981,7 @@ func TestState_WatchesAndUpdates(t *testing.T) {
}, },
}, },
{ {
events: []cache.UpdateEvent{ events: []UpdateEvent{
ingressConfigWatchEvent(false, false), ingressConfigWatchEvent(false, false),
}, },
verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) { verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) {
@ -981,7 +991,7 @@ func TestState_WatchesAndUpdates(t *testing.T) {
}, },
}, },
{ {
events: []cache.UpdateEvent{ events: []UpdateEvent{
{ {
CorrelationID: gatewayServicesWatchID, CorrelationID: gatewayServicesWatchID,
Result: &structs.IndexedGatewayServices{ Result: &structs.IndexedGatewayServices{
@ -1022,7 +1032,7 @@ func TestState_WatchesAndUpdates(t *testing.T) {
requiredWatches: map[string]verifyWatchRequest{ requiredWatches: map[string]verifyWatchRequest{
leafWatchID: genVerifyLeafWatch("ingress-gateway", "dc1"), leafWatchID: genVerifyLeafWatch("ingress-gateway", "dc1"),
}, },
events: []cache.UpdateEvent{ events: []UpdateEvent{
{ {
CorrelationID: leafWatchID, CorrelationID: leafWatchID,
Result: issuedCert, Result: issuedCert,
@ -1044,7 +1054,7 @@ func TestState_WatchesAndUpdates(t *testing.T) {
Datacenter: "dc1", Datacenter: "dc1",
}), }),
}, },
events: []cache.UpdateEvent{ events: []UpdateEvent{
{ {
CorrelationID: "discovery-chain:" + apiUID.String(), CorrelationID: "discovery-chain:" + apiUID.String(),
Result: &structs.DiscoveryChainResponse{ Result: &structs.DiscoveryChainResponse{
@ -1062,7 +1072,7 @@ func TestState_WatchesAndUpdates(t *testing.T) {
requiredWatches: map[string]verifyWatchRequest{ requiredWatches: map[string]verifyWatchRequest{
"upstream-target:api.default.default.dc1:" + apiUID.String(): genVerifyServiceWatch("api", "", "dc1", true), "upstream-target:api.default.default.dc1:" + apiUID.String(): genVerifyServiceWatch("api", "", "dc1", true),
}, },
events: []cache.UpdateEvent{ events: []UpdateEvent{
{ {
CorrelationID: "upstream-target:api.default.default.dc1:" + apiUID.String(), CorrelationID: "upstream-target:api.default.default.dc1:" + apiUID.String(),
Result: &structs.IndexedCheckServiceNodes{ Result: &structs.IndexedCheckServiceNodes{
@ -1121,7 +1131,7 @@ func TestState_WatchesAndUpdates(t *testing.T) {
gatewayConfigWatchID: genVerifyConfigEntryWatch(structs.IngressGateway, "ingress-gateway", "dc1"), gatewayConfigWatchID: genVerifyConfigEntryWatch(structs.IngressGateway, "ingress-gateway", "dc1"),
gatewayServicesWatchID: genVerifyGatewayServiceWatch("ingress-gateway", "dc1"), gatewayServicesWatchID: genVerifyGatewayServiceWatch("ingress-gateway", "dc1"),
}, },
events: []cache.UpdateEvent{ events: []UpdateEvent{
rootWatchEvent(), rootWatchEvent(),
{ {
CorrelationID: meshConfigEntryID, CorrelationID: meshConfigEntryID,
@ -1169,7 +1179,7 @@ func TestState_WatchesAndUpdates(t *testing.T) {
"*.ingress.dc1.alt.consul.", "*.ingress.dc1.alt.consul.",
}), }),
}, },
events: []cache.UpdateEvent{ events: []UpdateEvent{
{ {
CorrelationID: gatewayServicesWatchID, CorrelationID: gatewayServicesWatchID,
Result: &structs.IndexedGatewayServices{}, Result: &structs.IndexedGatewayServices{},
@ -1201,7 +1211,7 @@ func TestState_WatchesAndUpdates(t *testing.T) {
gatewayConfigWatchID: genVerifyConfigEntryWatch(structs.IngressGateway, "ingress-gateway", "dc1"), gatewayConfigWatchID: genVerifyConfigEntryWatch(structs.IngressGateway, "ingress-gateway", "dc1"),
gatewayServicesWatchID: genVerifyGatewayServiceWatch("ingress-gateway", "dc1"), gatewayServicesWatchID: genVerifyGatewayServiceWatch("ingress-gateway", "dc1"),
}, },
events: []cache.UpdateEvent{ events: []UpdateEvent{
rootWatchEvent(), rootWatchEvent(),
{ {
CorrelationID: meshConfigEntryID, CorrelationID: meshConfigEntryID,
@ -1262,7 +1272,7 @@ func TestState_WatchesAndUpdates(t *testing.T) {
"*.ingress.dc1.alt.consul.", "*.ingress.dc1.alt.consul.",
}), }),
}, },
events: []cache.UpdateEvent{ events: []UpdateEvent{
{ {
CorrelationID: gatewayServicesWatchID, CorrelationID: gatewayServicesWatchID,
Result: &structs.IndexedGatewayServices{}, Result: &structs.IndexedGatewayServices{},
@ -1302,7 +1312,7 @@ func TestState_WatchesAndUpdates(t *testing.T) {
}, },
}, },
{ {
events: []cache.UpdateEvent{ events: []UpdateEvent{
rootWatchEvent(), rootWatchEvent(),
{ {
CorrelationID: meshConfigEntryID, CorrelationID: meshConfigEntryID,
@ -1337,7 +1347,7 @@ func TestState_WatchesAndUpdates(t *testing.T) {
gatewayServicesWatchID: genVerifyServiceSpecificRequest(gatewayServicesWatchID, gatewayServicesWatchID: genVerifyServiceSpecificRequest(gatewayServicesWatchID,
"terminating-gateway", "", "dc1", false), "terminating-gateway", "", "dc1", false),
}, },
events: []cache.UpdateEvent{ events: []UpdateEvent{
rootWatchEvent(), rootWatchEvent(),
{ {
CorrelationID: meshConfigEntryID, CorrelationID: meshConfigEntryID,
@ -1365,7 +1375,7 @@ func TestState_WatchesAndUpdates(t *testing.T) {
}, },
}, },
{ {
events: []cache.UpdateEvent{ events: []UpdateEvent{
{ {
CorrelationID: gatewayServicesWatchID, CorrelationID: gatewayServicesWatchID,
Result: &structs.IndexedGatewayServices{ Result: &structs.IndexedGatewayServices{
@ -1426,7 +1436,7 @@ func TestState_WatchesAndUpdates(t *testing.T) {
requiredWatches: map[string]verifyWatchRequest{ requiredWatches: map[string]verifyWatchRequest{
"external-service:" + db.String(): genVerifyServiceWatch("db", "", "dc1", false), "external-service:" + db.String(): genVerifyServiceWatch("db", "", "dc1", false),
}, },
events: []cache.UpdateEvent{ events: []UpdateEvent{
{ {
CorrelationID: "external-service:" + db.String(), CorrelationID: "external-service:" + db.String(),
Result: &structs.IndexedCheckServiceNodes{ Result: &structs.IndexedCheckServiceNodes{
@ -1471,7 +1481,7 @@ func TestState_WatchesAndUpdates(t *testing.T) {
requiredWatches: map[string]verifyWatchRequest{ requiredWatches: map[string]verifyWatchRequest{
"external-service:" + api.String(): genVerifyServiceWatch("api", "", "dc1", false), "external-service:" + api.String(): genVerifyServiceWatch("api", "", "dc1", false),
}, },
events: []cache.UpdateEvent{ events: []UpdateEvent{
{ {
CorrelationID: "external-service:" + api.String(), CorrelationID: "external-service:" + api.String(),
Result: &structs.IndexedCheckServiceNodes{ Result: &structs.IndexedCheckServiceNodes{
@ -1564,7 +1574,7 @@ func TestState_WatchesAndUpdates(t *testing.T) {
requiredWatches: map[string]verifyWatchRequest{ requiredWatches: map[string]verifyWatchRequest{
"service-leaf:" + db.String(): genVerifyLeafWatch("db", "dc1"), "service-leaf:" + db.String(): genVerifyLeafWatch("db", "dc1"),
}, },
events: []cache.UpdateEvent{ events: []UpdateEvent{
{ {
CorrelationID: "service-leaf:" + db.String(), CorrelationID: "service-leaf:" + db.String(),
Result: issuedCert, Result: issuedCert,
@ -1582,7 +1592,7 @@ func TestState_WatchesAndUpdates(t *testing.T) {
requiredWatches: map[string]verifyWatchRequest{ requiredWatches: map[string]verifyWatchRequest{
serviceIntentionsIDPrefix + db.String(): genVerifyIntentionWatch("db", "dc1"), serviceIntentionsIDPrefix + db.String(): genVerifyIntentionWatch("db", "dc1"),
}, },
events: []cache.UpdateEvent{ events: []UpdateEvent{
{ {
CorrelationID: serviceIntentionsIDPrefix + db.String(), CorrelationID: serviceIntentionsIDPrefix + db.String(),
Result: dbIxnMatch, Result: dbIxnMatch,
@ -1603,7 +1613,7 @@ func TestState_WatchesAndUpdates(t *testing.T) {
requiredWatches: map[string]verifyWatchRequest{ requiredWatches: map[string]verifyWatchRequest{
serviceConfigIDPrefix + db.String(): genVerifyResolvedConfigWatch("db", "dc1"), serviceConfigIDPrefix + db.String(): genVerifyResolvedConfigWatch("db", "dc1"),
}, },
events: []cache.UpdateEvent{ events: []UpdateEvent{
{ {
CorrelationID: serviceConfigIDPrefix + db.String(), CorrelationID: serviceConfigIDPrefix + db.String(),
Result: dbConfig, Result: dbConfig,
@ -1622,7 +1632,7 @@ func TestState_WatchesAndUpdates(t *testing.T) {
requiredWatches: map[string]verifyWatchRequest{ requiredWatches: map[string]verifyWatchRequest{
"service-resolver:" + db.String(): genVerifyResolverWatch("db", "dc1", structs.ServiceResolver), "service-resolver:" + db.String(): genVerifyResolverWatch("db", "dc1", structs.ServiceResolver),
}, },
events: []cache.UpdateEvent{ events: []UpdateEvent{
{ {
CorrelationID: "service-resolver:" + db.String(), CorrelationID: "service-resolver:" + db.String(),
Result: dbResolver, Result: dbResolver,
@ -1642,7 +1652,7 @@ func TestState_WatchesAndUpdates(t *testing.T) {
}, },
}, },
{ {
events: []cache.UpdateEvent{ events: []UpdateEvent{
{ {
CorrelationID: gatewayServicesWatchID, CorrelationID: gatewayServicesWatchID,
Result: &structs.IndexedGatewayServices{ Result: &structs.IndexedGatewayServices{
@ -1730,7 +1740,7 @@ func TestState_WatchesAndUpdates(t *testing.T) {
}, },
}, },
{ {
events: []cache.UpdateEvent{ events: []UpdateEvent{
rootWatchEvent(), rootWatchEvent(),
{ {
CorrelationID: leafWatchID, CorrelationID: leafWatchID,
@ -1814,7 +1824,7 @@ func TestState_WatchesAndUpdates(t *testing.T) {
}, },
// Valid snapshot after roots, leaf, and intentions // Valid snapshot after roots, leaf, and intentions
{ {
events: []cache.UpdateEvent{ events: []UpdateEvent{
rootWatchEvent(), rootWatchEvent(),
{ {
CorrelationID: leafWatchID, CorrelationID: leafWatchID,
@ -1857,7 +1867,7 @@ func TestState_WatchesAndUpdates(t *testing.T) {
leafWatchID: genVerifyLeafWatch("api", "dc1"), leafWatchID: genVerifyLeafWatch("api", "dc1"),
intentionsWatchID: genVerifyIntentionWatch("api", "dc1"), intentionsWatchID: genVerifyIntentionWatch("api", "dc1"),
}, },
events: []cache.UpdateEvent{ events: []UpdateEvent{
{ {
CorrelationID: intentionUpstreamsID, CorrelationID: intentionUpstreamsID,
Result: &structs.IndexedServiceList{ Result: &structs.IndexedServiceList{
@ -1900,7 +1910,7 @@ func TestState_WatchesAndUpdates(t *testing.T) {
OverrideMeshGateway: structs.MeshGatewayConfig{Mode: structs.MeshGatewayModeRemote}, OverrideMeshGateway: structs.MeshGatewayConfig{Mode: structs.MeshGatewayModeRemote},
}), }),
}, },
events: []cache.UpdateEvent{ events: []UpdateEvent{
{ {
CorrelationID: "discovery-chain:" + dbUID.String(), CorrelationID: "discovery-chain:" + dbUID.String(),
Result: &structs.DiscoveryChainResponse{ Result: &structs.DiscoveryChainResponse{
@ -1918,7 +1928,7 @@ func TestState_WatchesAndUpdates(t *testing.T) {
requiredWatches: map[string]verifyWatchRequest{ requiredWatches: map[string]verifyWatchRequest{
"upstream-target:db.default.default.dc1:" + dbUID.String(): genVerifyServiceWatch("db", "", "dc1", true), "upstream-target:db.default.default.dc1:" + dbUID.String(): genVerifyServiceWatch("db", "", "dc1", true),
}, },
events: []cache.UpdateEvent{ events: []UpdateEvent{
{ {
CorrelationID: "upstream-target:db.default.default.dc1:" + dbUID.String(), CorrelationID: "upstream-target:db.default.default.dc1:" + dbUID.String(),
Result: &structs.IndexedCheckServiceNodes{ Result: &structs.IndexedCheckServiceNodes{
@ -2069,7 +2079,7 @@ func TestState_WatchesAndUpdates(t *testing.T) {
OverrideMeshGateway: structs.MeshGatewayConfig{Mode: structs.MeshGatewayModeRemote}, OverrideMeshGateway: structs.MeshGatewayConfig{Mode: structs.MeshGatewayModeRemote},
}), }),
}, },
events: []cache.UpdateEvent{ events: []UpdateEvent{
{ {
CorrelationID: "discovery-chain:" + dbUID.String(), CorrelationID: "discovery-chain:" + dbUID.String(),
Result: &structs.DiscoveryChainResponse{ Result: &structs.DiscoveryChainResponse{
@ -2096,7 +2106,7 @@ func TestState_WatchesAndUpdates(t *testing.T) {
}, },
{ {
// Receive a new upstream target event without proxy1. // Receive a new upstream target event without proxy1.
events: []cache.UpdateEvent{ events: []UpdateEvent{
{ {
CorrelationID: "upstream-target:db.default.default.dc1:" + dbUID.String(), CorrelationID: "upstream-target:db.default.default.dc1:" + dbUID.String(),
Result: &structs.IndexedCheckServiceNodes{ Result: &structs.IndexedCheckServiceNodes{
@ -2177,7 +2187,7 @@ func TestState_WatchesAndUpdates(t *testing.T) {
}, },
{ {
// Receive a new upstream target event with a conflicting passthrough address // Receive a new upstream target event with a conflicting passthrough address
events: []cache.UpdateEvent{ events: []UpdateEvent{
{ {
CorrelationID: "upstream-target:api.default.default.dc1:" + apiUID.String(), CorrelationID: "upstream-target:api.default.default.dc1:" + apiUID.String(),
Result: &structs.IndexedCheckServiceNodes{ Result: &structs.IndexedCheckServiceNodes{
@ -2259,7 +2269,7 @@ func TestState_WatchesAndUpdates(t *testing.T) {
}, },
{ {
// Event with no nodes should clean up addrs // Event with no nodes should clean up addrs
events: []cache.UpdateEvent{ events: []UpdateEvent{
{ {
CorrelationID: "upstream-target:api.default.default.dc1:" + apiUID.String(), CorrelationID: "upstream-target:api.default.default.dc1:" + apiUID.String(),
Result: &structs.IndexedCheckServiceNodes{ Result: &structs.IndexedCheckServiceNodes{
@ -2289,7 +2299,7 @@ func TestState_WatchesAndUpdates(t *testing.T) {
leafWatchID: genVerifyLeafWatch("api", "dc1"), leafWatchID: genVerifyLeafWatch("api", "dc1"),
intentionsWatchID: genVerifyIntentionWatch("api", "dc1"), intentionsWatchID: genVerifyIntentionWatch("api", "dc1"),
}, },
events: []cache.UpdateEvent{ events: []UpdateEvent{
{ {
CorrelationID: intentionUpstreamsID, CorrelationID: intentionUpstreamsID,
Result: &structs.IndexedServiceList{ Result: &structs.IndexedServiceList{
@ -2382,7 +2392,7 @@ func TestState_WatchesAndUpdates(t *testing.T) {
}, },
// Valid snapshot after roots, leaf, and intentions // Valid snapshot after roots, leaf, and intentions
{ {
events: []cache.UpdateEvent{ events: []UpdateEvent{
rootWatchEvent(), rootWatchEvent(),
{ {
CorrelationID: leafWatchID, CorrelationID: leafWatchID,
@ -2428,7 +2438,7 @@ func TestState_WatchesAndUpdates(t *testing.T) {
OverrideMeshGateway: structs.MeshGatewayConfig{Mode: structs.MeshGatewayModeLocal}, OverrideMeshGateway: structs.MeshGatewayConfig{Mode: structs.MeshGatewayModeLocal},
}), }),
}, },
events: []cache.UpdateEvent{ events: []UpdateEvent{
{ {
CorrelationID: "discovery-chain:" + upstreamIDForDC2(dbUID).String(), CorrelationID: "discovery-chain:" + upstreamIDForDC2(dbUID).String(),
Result: &structs.DiscoveryChainResponse{ Result: &structs.DiscoveryChainResponse{
@ -2465,7 +2475,7 @@ func TestState_WatchesAndUpdates(t *testing.T) {
OverrideMeshGateway: structs.MeshGatewayConfig{Mode: structs.MeshGatewayModeLocal}, OverrideMeshGateway: structs.MeshGatewayConfig{Mode: structs.MeshGatewayModeLocal},
}), }),
}, },
events: []cache.UpdateEvent{ events: []UpdateEvent{
{ {
CorrelationID: intentionUpstreamsID, CorrelationID: intentionUpstreamsID,
Result: &structs.IndexedServiceList{ Result: &structs.IndexedServiceList{
@ -2557,7 +2567,7 @@ func TestState_WatchesAndUpdates(t *testing.T) {
}, },
{ {
// This time add the events // This time add the events
events: []cache.UpdateEvent{ events: []UpdateEvent{
rootWatchEvent(), rootWatchEvent(),
{ {
CorrelationID: leafWatchID, CorrelationID: leafWatchID,
@ -2607,7 +2617,7 @@ func TestState_WatchesAndUpdates(t *testing.T) {
state, err := newState(&tc.ns, "", stateConfig{ state, err := newState(&tc.ns, "", stateConfig{
logger: testutil.Logger(t), logger: testutil.Logger(t),
cache: cn, cache: cn,
health: &health.Client{Cache: cn, CacheName: cachetype.HealthServicesName}, health: &HealthWrapper{&health.Client{Cache: cn, CacheName: cachetype.HealthServicesName}},
source: &structs.QuerySource{ source: &structs.QuerySource{
Datacenter: tc.sourceDC, Datacenter: tc.sourceDC,
}, },

View file

@ -5,7 +5,6 @@ import (
"fmt" "fmt"
"strings" "strings"
"github.com/hashicorp/consul/agent/cache"
cachetype "github.com/hashicorp/consul/agent/cache-types" cachetype "github.com/hashicorp/consul/agent/cache-types"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
) )
@ -68,7 +67,7 @@ func (s *handlerTerminatingGateway) initialize(ctx context.Context) (ConfigSnaps
return snap, nil return snap, nil
} }
func (s *handlerTerminatingGateway) handleUpdate(ctx context.Context, u cache.UpdateEvent, snap *ConfigSnapshot) error { func (s *handlerTerminatingGateway) handleUpdate(ctx context.Context, u UpdateEvent, snap *ConfigSnapshot) error {
if u.Err != nil { if u.Err != nil {
return fmt.Errorf("error filling agent cache: %v", u.Err) return fmt.Errorf("error filling agent cache: %v", u.Err)
} }

View file

@ -672,7 +672,7 @@ type noopCacheNotifier struct{}
var _ CacheNotifier = (*noopCacheNotifier)(nil) var _ CacheNotifier = (*noopCacheNotifier)(nil)
func (*noopCacheNotifier) Notify(_ context.Context, _ string, _ cache.Request, _ string, _ chan<- cache.UpdateEvent) error { func (*noopCacheNotifier) Notify(_ context.Context, _ string, _ cache.Request, _ string, _ chan<- UpdateEvent) error {
return nil return nil
} }
@ -680,7 +680,7 @@ type noopHealth struct{}
var _ Health = (*noopHealth)(nil) var _ Health = (*noopHealth)(nil)
func (*noopHealth) Notify(_ context.Context, _ structs.ServiceSpecificRequest, _ string, _ chan<- cache.UpdateEvent) error { func (*noopHealth) Notify(_ context.Context, _ structs.ServiceSpecificRequest, _ string, _ chan<- UpdateEvent) error {
return nil return nil
} }
@ -698,7 +698,7 @@ func testConfigSnapshotFixture(
ns *structs.NodeService, ns *structs.NodeService,
nsFn func(ns *structs.NodeService), nsFn func(ns *structs.NodeService),
serverSNIFn ServerSNIFunc, serverSNIFn ServerSNIFunc,
updates []cache.UpdateEvent, updates []UpdateEvent,
) *ConfigSnapshot { ) *ConfigSnapshot {
const token = "" const token = ""
@ -750,15 +750,15 @@ func testConfigSnapshotFixture(
return &snap return &snap
} }
func testSpliceEvents(base, extra []cache.UpdateEvent) []cache.UpdateEvent { func testSpliceEvents(base, extra []UpdateEvent) []UpdateEvent {
if len(extra) == 0 { if len(extra) == 0 {
return base return base
} }
var ( var (
hasExtra = make(map[string]cache.UpdateEvent) hasExtra = make(map[string]UpdateEvent)
completeExtra = make(map[string]struct{}) completeExtra = make(map[string]struct{})
allEvents []cache.UpdateEvent allEvents []UpdateEvent
) )
for _, e := range extra { for _, e := range extra {

View file

@ -4,14 +4,13 @@ import (
"github.com/mitchellh/go-testing-interface" "github.com/mitchellh/go-testing-interface"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/connect" "github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/agent/consul/discoverychain" "github.com/hashicorp/consul/agent/consul/discoverychain"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
) )
// TestConfigSnapshot returns a fully populated snapshot // TestConfigSnapshot returns a fully populated snapshot
func TestConfigSnapshot(t testing.T, nsFn func(ns *structs.NodeService), extraUpdates []cache.UpdateEvent) *ConfigSnapshot { func TestConfigSnapshot(t testing.T, nsFn func(ns *structs.NodeService), extraUpdates []UpdateEvent) *ConfigSnapshot {
roots, leaf := TestCerts(t) roots, leaf := TestCerts(t)
// no entries implies we'll get a default chain // no entries implies we'll get a default chain
@ -29,7 +28,7 @@ func TestConfigSnapshot(t testing.T, nsFn func(ns *structs.NodeService), extraUp
webSN = structs.ServiceIDString("web", nil) webSN = structs.ServiceIDString("web", nil)
) )
baseEvents := []cache.UpdateEvent{ baseEvents := []UpdateEvent{
{ {
CorrelationID: rootsWatchID, CorrelationID: rootsWatchID,
Result: roots, Result: roots,
@ -94,7 +93,7 @@ func TestConfigSnapshotDiscoveryChain(
t testing.T, t testing.T,
variation string, variation string,
nsFn func(ns *structs.NodeService), nsFn func(ns *structs.NodeService),
extraUpdates []cache.UpdateEvent, extraUpdates []UpdateEvent,
additionalEntries ...structs.ConfigEntry, additionalEntries ...structs.ConfigEntry,
) *ConfigSnapshot { ) *ConfigSnapshot {
roots, leaf := TestCerts(t) roots, leaf := TestCerts(t)
@ -108,7 +107,7 @@ func TestConfigSnapshotDiscoveryChain(
webSN = structs.ServiceIDString("web", nil) webSN = structs.ServiceIDString("web", nil)
) )
baseEvents := testSpliceEvents([]cache.UpdateEvent{ baseEvents := testSpliceEvents([]UpdateEvent{
{ {
CorrelationID: rootsWatchID, CorrelationID: rootsWatchID,
Result: roots, Result: roots,
@ -171,7 +170,7 @@ func TestConfigSnapshotExposeConfig(t testing.T, nsFn func(ns *structs.NodeServi
webSN = structs.ServiceIDString("web", nil) webSN = structs.ServiceIDString("web", nil)
) )
baseEvents := []cache.UpdateEvent{ baseEvents := []UpdateEvent{
{ {
CorrelationID: rootsWatchID, CorrelationID: rootsWatchID,
Result: roots, Result: roots,
@ -252,7 +251,7 @@ func TestConfigSnapshotGRPCExposeHTTP1(t testing.T) *ConfigSnapshot {
}, },
Meta: nil, Meta: nil,
TaggedAddresses: nil, TaggedAddresses: nil,
}, nil, nil, []cache.UpdateEvent{ }, nil, nil, []UpdateEvent{
{ {
CorrelationID: rootsWatchID, CorrelationID: rootsWatchID,
Result: roots, Result: roots,

View file

@ -7,7 +7,6 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/connect" "github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/agent/consul/discoverychain" "github.com/hashicorp/consul/agent/consul/discoverychain"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
@ -21,7 +20,7 @@ func TestConfigSnapshotIngressGateway(
variation string, variation string,
nsFn func(ns *structs.NodeService), nsFn func(ns *structs.NodeService),
configFn func(entry *structs.IngressGatewayConfigEntry), configFn func(entry *structs.IngressGatewayConfigEntry),
extraUpdates []cache.UpdateEvent, extraUpdates []UpdateEvent,
additionalEntries ...structs.ConfigEntry, additionalEntries ...structs.ConfigEntry,
) *ConfigSnapshot { ) *ConfigSnapshot {
roots, placeholderLeaf := TestCerts(t) roots, placeholderLeaf := TestCerts(t)
@ -47,7 +46,7 @@ func TestConfigSnapshotIngressGateway(
configFn(entry) configFn(entry)
} }
baseEvents := []cache.UpdateEvent{ baseEvents := []UpdateEvent{
{ {
CorrelationID: rootsWatchID, CorrelationID: rootsWatchID,
Result: roots, Result: roots,
@ -71,7 +70,7 @@ func TestConfigSnapshotIngressGateway(
} }
if populateServices { if populateServices {
baseEvents = testSpliceEvents(baseEvents, []cache.UpdateEvent{{ baseEvents = testSpliceEvents(baseEvents, []UpdateEvent{{
CorrelationID: gatewayServicesWatchID, CorrelationID: gatewayServicesWatchID,
Result: &structs.IndexedGatewayServices{ Result: &structs.IndexedGatewayServices{
Services: []*structs.GatewayService{ Services: []*structs.GatewayService{
@ -155,7 +154,7 @@ func TestConfigSnapshotIngressGatewaySDS_GatewayLevel_MixedTLS(t testing.T) *Con
}, },
}, },
} }
}, []cache.UpdateEvent{ }, []UpdateEvent{
{ {
CorrelationID: gatewayServicesWatchID, CorrelationID: gatewayServicesWatchID,
Result: &structs.IndexedGatewayServices{ Result: &structs.IndexedGatewayServices{
@ -270,7 +269,7 @@ func TestConfigSnapshotIngressGatewaySDS_GatewayAndListenerLevel_HTTP(t testing.
}, },
}, },
} }
}, []cache.UpdateEvent{ }, []UpdateEvent{
{ {
CorrelationID: gatewayServicesWatchID, CorrelationID: gatewayServicesWatchID,
Result: &structs.IndexedGatewayServices{ Result: &structs.IndexedGatewayServices{
@ -344,7 +343,7 @@ func TestConfigSnapshotIngressGatewaySDS_ServiceLevel(t testing.T) *ConfigSnapsh
}, },
}, },
} }
}, []cache.UpdateEvent{ }, []UpdateEvent{
{ {
CorrelationID: gatewayServicesWatchID, CorrelationID: gatewayServicesWatchID,
Result: &structs.IndexedGatewayServices{ Result: &structs.IndexedGatewayServices{
@ -434,7 +433,7 @@ func TestConfigSnapshotIngressGatewaySDS_ListenerAndServiceLevel(t testing.T) *C
}, },
}, },
} }
}, []cache.UpdateEvent{ }, []UpdateEvent{
{ {
CorrelationID: gatewayServicesWatchID, CorrelationID: gatewayServicesWatchID,
Result: &structs.IndexedGatewayServices{ Result: &structs.IndexedGatewayServices{
@ -519,7 +518,7 @@ func TestConfigSnapshotIngressGatewaySDS_MixedNoTLS(t testing.T) *ConfigSnapshot
}, },
}, },
} }
}, []cache.UpdateEvent{ }, []UpdateEvent{
{ {
CorrelationID: gatewayServicesWatchID, CorrelationID: gatewayServicesWatchID,
Result: &structs.IndexedGatewayServices{ Result: &structs.IndexedGatewayServices{
@ -601,7 +600,7 @@ func TestConfigSnapshotIngressGateway_MixedListeners(t testing.T) *ConfigSnapsho
}, },
}, },
} }
}, []cache.UpdateEvent{ }, []UpdateEvent{
{ {
CorrelationID: gatewayServicesWatchID, CorrelationID: gatewayServicesWatchID,
Result: &structs.IndexedGatewayServices{ Result: &structs.IndexedGatewayServices{
@ -717,7 +716,7 @@ func TestConfigSnapshotIngress_HTTPMultipleServices(t testing.T) *ConfigSnapshot
}, },
}, },
} }
}, []cache.UpdateEvent{ }, []UpdateEvent{
{ {
CorrelationID: gatewayServicesWatchID, CorrelationID: gatewayServicesWatchID,
Result: &structs.IndexedGatewayServices{ Result: &structs.IndexedGatewayServices{
@ -830,7 +829,7 @@ func TestConfigSnapshotIngress_MultipleListenersDuplicateService(t testing.T) *C
}, },
}, },
} }
}, []cache.UpdateEvent{ }, []UpdateEvent{
{ {
CorrelationID: gatewayServicesWatchID, CorrelationID: gatewayServicesWatchID,
Result: &structs.IndexedGatewayServices{ Result: &structs.IndexedGatewayServices{
@ -893,7 +892,7 @@ func TestConfigSnapshotIngressGatewayWithChain(
} }
var ( var (
updates []cache.UpdateEvent updates []UpdateEvent
configFn func(entry *structs.IngressGatewayConfigEntry) configFn func(entry *structs.IngressGatewayConfigEntry)
populateServices bool populateServices bool
@ -1088,7 +1087,7 @@ func TestConfigSnapshotIngressGatewayWithChain(
fooEntMeta.PartitionOrDefault(), "dc1", fooEntMeta.PartitionOrDefault(), "dc1",
connect.TestClusterID+".consul", nil, entries...) connect.TestClusterID+".consul", nil, entries...)
updates = []cache.UpdateEvent{ updates = []UpdateEvent{
{ {
CorrelationID: gatewayServicesWatchID, CorrelationID: gatewayServicesWatchID,
Result: &structs.IndexedGatewayServices{ Result: &structs.IndexedGatewayServices{
@ -1218,7 +1217,7 @@ func TestConfigSnapshotIngressGateway_TLSMinVersionListenersGatewayDefaults(t te
}, },
}, },
} }
}, []cache.UpdateEvent{ }, []UpdateEvent{
{ {
CorrelationID: gatewayServicesWatchID, CorrelationID: gatewayServicesWatchID,
Result: &structs.IndexedGatewayServices{ Result: &structs.IndexedGatewayServices{
@ -1336,7 +1335,7 @@ func TestConfigSnapshotIngressGateway_SingleTLSListener(t testing.T) *ConfigSnap
}, },
}, },
} }
}, []cache.UpdateEvent{ }, []UpdateEvent{
{ {
CorrelationID: gatewayServicesWatchID, CorrelationID: gatewayServicesWatchID,
Result: &structs.IndexedGatewayServices{ Result: &structs.IndexedGatewayServices{
@ -1436,7 +1435,7 @@ func TestConfigSnapshotIngressGateway_TLSMixedMinVersionListeners(t testing.T) *
}, },
}, },
} }
}, []cache.UpdateEvent{ }, []UpdateEvent{
{ {
CorrelationID: gatewayServicesWatchID, CorrelationID: gatewayServicesWatchID,
Result: &structs.IndexedGatewayServices{ Result: &structs.IndexedGatewayServices{

View file

@ -6,11 +6,10 @@ import (
"github.com/mitchellh/go-testing-interface" "github.com/mitchellh/go-testing-interface"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
) )
func TestConfigSnapshotMeshGateway(t testing.T, variant string, nsFn func(ns *structs.NodeService), extraUpdates []cache.UpdateEvent) *ConfigSnapshot { func TestConfigSnapshotMeshGateway(t testing.T, variant string, nsFn func(ns *structs.NodeService), extraUpdates []UpdateEvent) *ConfigSnapshot {
roots, _ := TestCerts(t) roots, _ := TestCerts(t)
var ( var (
@ -38,7 +37,7 @@ func TestConfigSnapshotMeshGateway(t testing.T, variant string, nsFn func(ns *st
useFederationStates = false useFederationStates = false
deleteCrossDCEntry = false deleteCrossDCEntry = false
case "service-subsets": case "service-subsets":
extraUpdates = append(extraUpdates, cache.UpdateEvent{ extraUpdates = append(extraUpdates, UpdateEvent{
CorrelationID: serviceResolversWatchID, CorrelationID: serviceResolversWatchID,
Result: &structs.IndexedConfigEntries{ Result: &structs.IndexedConfigEntries{
Kind: structs.ServiceResolver, Kind: structs.ServiceResolver,
@ -60,7 +59,7 @@ func TestConfigSnapshotMeshGateway(t testing.T, variant string, nsFn func(ns *st
}, },
}) })
case "service-subsets2": // TODO(rb): make this merge with 'service-subsets' case "service-subsets2": // TODO(rb): make this merge with 'service-subsets'
extraUpdates = append(extraUpdates, cache.UpdateEvent{ extraUpdates = append(extraUpdates, UpdateEvent{
CorrelationID: serviceResolversWatchID, CorrelationID: serviceResolversWatchID,
Result: &structs.IndexedConfigEntries{ Result: &structs.IndexedConfigEntries{
Kind: structs.ServiceResolver, Kind: structs.ServiceResolver,
@ -95,7 +94,7 @@ func TestConfigSnapshotMeshGateway(t testing.T, variant string, nsFn func(ns *st
}, },
}) })
case "default-service-subsets2": // TODO(rb): rename to strip the 2 when the prior is merged with 'service-subsets' case "default-service-subsets2": // TODO(rb): rename to strip the 2 when the prior is merged with 'service-subsets'
extraUpdates = append(extraUpdates, cache.UpdateEvent{ extraUpdates = append(extraUpdates, UpdateEvent{
CorrelationID: serviceResolversWatchID, CorrelationID: serviceResolversWatchID,
Result: &structs.IndexedConfigEntries{ Result: &structs.IndexedConfigEntries{
Kind: structs.ServiceResolver, Kind: structs.ServiceResolver,
@ -132,7 +131,7 @@ func TestConfigSnapshotMeshGateway(t testing.T, variant string, nsFn func(ns *st
}, },
}) })
case "ignore-extra-resolvers": case "ignore-extra-resolvers":
extraUpdates = append(extraUpdates, cache.UpdateEvent{ extraUpdates = append(extraUpdates, UpdateEvent{
CorrelationID: serviceResolversWatchID, CorrelationID: serviceResolversWatchID,
Result: &structs.IndexedConfigEntries{ Result: &structs.IndexedConfigEntries{
Kind: structs.ServiceResolver, Kind: structs.ServiceResolver,
@ -169,7 +168,7 @@ func TestConfigSnapshotMeshGateway(t testing.T, variant string, nsFn func(ns *st
}, },
}) })
case "service-timeouts": case "service-timeouts":
extraUpdates = append(extraUpdates, cache.UpdateEvent{ extraUpdates = append(extraUpdates, UpdateEvent{
CorrelationID: serviceResolversWatchID, CorrelationID: serviceResolversWatchID,
Result: &structs.IndexedConfigEntries{ Result: &structs.IndexedConfigEntries{
Kind: structs.ServiceResolver, Kind: structs.ServiceResolver,
@ -192,7 +191,7 @@ func TestConfigSnapshotMeshGateway(t testing.T, variant string, nsFn func(ns *st
}, },
}) })
case "non-hash-lb-injected": case "non-hash-lb-injected":
extraUpdates = append(extraUpdates, cache.UpdateEvent{ extraUpdates = append(extraUpdates, UpdateEvent{
CorrelationID: "service-resolvers", // serviceResolversWatchID CorrelationID: "service-resolvers", // serviceResolversWatchID
Result: &structs.IndexedConfigEntries{ Result: &structs.IndexedConfigEntries{
Kind: structs.ServiceResolver, Kind: structs.ServiceResolver,
@ -220,7 +219,7 @@ func TestConfigSnapshotMeshGateway(t testing.T, variant string, nsFn func(ns *st
}, },
}) })
case "hash-lb-ignored": case "hash-lb-ignored":
extraUpdates = append(extraUpdates, cache.UpdateEvent{ extraUpdates = append(extraUpdates, UpdateEvent{
CorrelationID: "service-resolvers", // serviceResolversWatchID CorrelationID: "service-resolvers", // serviceResolversWatchID
Result: &structs.IndexedConfigEntries{ Result: &structs.IndexedConfigEntries{
Kind: structs.ServiceResolver, Kind: structs.ServiceResolver,
@ -253,7 +252,7 @@ func TestConfigSnapshotMeshGateway(t testing.T, variant string, nsFn func(ns *st
return nil return nil
} }
baseEvents := []cache.UpdateEvent{ baseEvents := []UpdateEvent{
{ {
CorrelationID: rootsWatchID, CorrelationID: rootsWatchID,
Result: roots, Result: roots,
@ -278,7 +277,7 @@ func TestConfigSnapshotMeshGateway(t testing.T, variant string, nsFn func(ns *st
} }
if populateServices || useFederationStates { if populateServices || useFederationStates {
baseEvents = testSpliceEvents(baseEvents, []cache.UpdateEvent{ baseEvents = testSpliceEvents(baseEvents, []UpdateEvent{
{ {
CorrelationID: datacentersWatchID, CorrelationID: datacentersWatchID,
Result: &[]string{"dc1", "dc2", "dc4", "dc6"}, Result: &[]string{"dc1", "dc2", "dc4", "dc6"},
@ -291,7 +290,7 @@ func TestConfigSnapshotMeshGateway(t testing.T, variant string, nsFn func(ns *st
foo = structs.NewServiceName("foo", nil) foo = structs.NewServiceName("foo", nil)
bar = structs.NewServiceName("bar", nil) bar = structs.NewServiceName("bar", nil)
) )
baseEvents = testSpliceEvents(baseEvents, []cache.UpdateEvent{ baseEvents = testSpliceEvents(baseEvents, []UpdateEvent{
{ {
CorrelationID: "mesh-gateway:dc2", CorrelationID: "mesh-gateway:dc2",
Result: &structs.IndexedNodesWithGateways{ Result: &structs.IndexedNodesWithGateways{
@ -349,7 +348,7 @@ func TestConfigSnapshotMeshGateway(t testing.T, variant string, nsFn func(ns *st
}) })
if deleteCrossDCEntry { if deleteCrossDCEntry {
baseEvents = testSpliceEvents(baseEvents, []cache.UpdateEvent{ baseEvents = testSpliceEvents(baseEvents, []UpdateEvent{
{ {
// Have the cross-dc query mechanism not work for dc2 so // Have the cross-dc query mechanism not work for dc2 so
// fedstates will infill. // fedstates will infill.
@ -399,7 +398,7 @@ func TestConfigSnapshotMeshGateway(t testing.T, variant string, nsFn func(ns *st
} }
} }
baseEvents = testSpliceEvents(baseEvents, []cache.UpdateEvent{ baseEvents = testSpliceEvents(baseEvents, []UpdateEvent{
{ {
CorrelationID: federationStateListGatewaysWatchID, CorrelationID: federationStateListGatewaysWatchID,
Result: &structs.DatacenterIndexedCheckServiceNodes{ Result: &structs.DatacenterIndexedCheckServiceNodes{

View file

@ -3,8 +3,6 @@ package proxycfg
import ( import (
"github.com/mitchellh/go-testing-interface" "github.com/mitchellh/go-testing-interface"
"github.com/hashicorp/consul/agent/cache"
agentcache "github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
) )
@ -12,7 +10,7 @@ func TestConfigSnapshotTerminatingGateway(
t testing.T, t testing.T,
populateServices bool, populateServices bool,
nsFn func(ns *structs.NodeService), nsFn func(ns *structs.NodeService),
extraUpdates []agentcache.UpdateEvent, extraUpdates []UpdateEvent,
) *ConfigSnapshot { ) *ConfigSnapshot {
roots, _ := TestCerts(t) roots, _ := TestCerts(t)
@ -23,7 +21,7 @@ func TestConfigSnapshotTerminatingGateway(
cache = structs.NewServiceName("cache", nil) cache = structs.NewServiceName("cache", nil)
) )
baseEvents := []agentcache.UpdateEvent{ baseEvents := []UpdateEvent{
{ {
CorrelationID: rootsWatchID, CorrelationID: rootsWatchID,
Result: roots, Result: roots,
@ -158,7 +156,7 @@ func TestConfigSnapshotTerminatingGateway(
}, },
} }
baseEvents = testSpliceEvents(baseEvents, []agentcache.UpdateEvent{ baseEvents = testSpliceEvents(baseEvents, []UpdateEvent{
{ {
CorrelationID: gatewayServicesWatchID, CorrelationID: gatewayServicesWatchID,
Result: &structs.IndexedGatewayServices{ Result: &structs.IndexedGatewayServices{
@ -356,7 +354,7 @@ func testConfigSnapshotTerminatingGatewayServiceSubsets(t testing.T, alsoAdjustC
cache = structs.NewServiceName("cache", nil) cache = structs.NewServiceName("cache", nil)
) )
events := []agentcache.UpdateEvent{ events := []UpdateEvent{
{ {
CorrelationID: serviceResolverIDPrefix + web.String(), CorrelationID: serviceResolverIDPrefix + web.String(),
Result: &structs.ConfigEntryResponse{ Result: &structs.ConfigEntryResponse{
@ -384,7 +382,7 @@ func testConfigSnapshotTerminatingGatewayServiceSubsets(t testing.T, alsoAdjustC
} }
if alsoAdjustCache { if alsoAdjustCache {
events = testSpliceEvents(events, []agentcache.UpdateEvent{ events = testSpliceEvents(events, []UpdateEvent{
{ {
CorrelationID: serviceResolverIDPrefix + cache.String(), CorrelationID: serviceResolverIDPrefix + cache.String(),
Result: &structs.ConfigEntryResponse{ Result: &structs.ConfigEntryResponse{
@ -414,7 +412,7 @@ func testConfigSnapshotTerminatingGatewayServiceSubsets(t testing.T, alsoAdjustC
func TestConfigSnapshotTerminatingGatewayDefaultServiceSubset(t testing.T) *ConfigSnapshot { func TestConfigSnapshotTerminatingGatewayDefaultServiceSubset(t testing.T) *ConfigSnapshot {
web := structs.NewServiceName("web", nil) web := structs.NewServiceName("web", nil)
return TestConfigSnapshotTerminatingGateway(t, true, nil, []agentcache.UpdateEvent{ return TestConfigSnapshotTerminatingGateway(t, true, nil, []UpdateEvent{
{ {
CorrelationID: serviceResolverIDPrefix + web.String(), CorrelationID: serviceResolverIDPrefix + web.String(),
Result: &structs.ConfigEntryResponse{ Result: &structs.ConfigEntryResponse{
@ -498,7 +496,7 @@ func testConfigSnapshotTerminatingGatewayLBConfig(t testing.T, variant string) *
return nil return nil
} }
return TestConfigSnapshotTerminatingGateway(t, true, nil, []cache.UpdateEvent{ return TestConfigSnapshotTerminatingGateway(t, true, nil, []UpdateEvent{
{ {
CorrelationID: serviceConfigIDPrefix + web.String(), CorrelationID: serviceConfigIDPrefix + web.String(),
Result: &structs.ServiceConfigResponse{ Result: &structs.ServiceConfigResponse{
@ -521,7 +519,7 @@ func testConfigSnapshotTerminatingGatewayLBConfig(t testing.T, variant string) *
} }
func TestConfigSnapshotTerminatingGatewaySNI(t testing.T) *ConfigSnapshot { func TestConfigSnapshotTerminatingGatewaySNI(t testing.T) *ConfigSnapshot {
return TestConfigSnapshotTerminatingGateway(t, true, nil, []cache.UpdateEvent{ return TestConfigSnapshotTerminatingGateway(t, true, nil, []UpdateEvent{
{ {
CorrelationID: "gateway-services", CorrelationID: "gateway-services",
Result: &structs.IndexedGatewayServices{ Result: &structs.IndexedGatewayServices{
@ -550,7 +548,7 @@ func TestConfigSnapshotTerminatingGatewayHostnameSubsets(t testing.T) *ConfigSna
cache = structs.NewServiceName("cache", nil) cache = structs.NewServiceName("cache", nil)
) )
return TestConfigSnapshotTerminatingGateway(t, true, nil, []agentcache.UpdateEvent{ return TestConfigSnapshotTerminatingGateway(t, true, nil, []UpdateEvent{
{ {
CorrelationID: serviceResolverIDPrefix + api.String(), CorrelationID: serviceResolverIDPrefix + api.String(),
Result: &structs.ConfigEntryResponse{ Result: &structs.ConfigEntryResponse{
@ -600,7 +598,7 @@ func TestConfigSnapshotTerminatingGatewayIgnoreExtraResolvers(t testing.T) *Conf
notfound = structs.NewServiceName("notfound", nil) notfound = structs.NewServiceName("notfound", nil)
) )
return TestConfigSnapshotTerminatingGateway(t, true, nil, []agentcache.UpdateEvent{ return TestConfigSnapshotTerminatingGateway(t, true, nil, []UpdateEvent{
{ {
CorrelationID: serviceResolverIDPrefix + web.String(), CorrelationID: serviceResolverIDPrefix + web.String(),
Result: &structs.ConfigEntryResponse{ Result: &structs.ConfigEntryResponse{
@ -648,9 +646,9 @@ func TestConfigSnapshotTerminatingGatewayIgnoreExtraResolvers(t testing.T) *Conf
}) })
} }
func TestConfigSnapshotTerminatingGatewayWithLambdaService(t testing.T, extraUpdateEvents ...agentcache.UpdateEvent) *ConfigSnapshot { func TestConfigSnapshotTerminatingGatewayWithLambdaService(t testing.T, extraUpdateEvents ...UpdateEvent) *ConfigSnapshot {
web := structs.NewServiceName("web", nil) web := structs.NewServiceName("web", nil)
updateEvents := append(extraUpdateEvents, agentcache.UpdateEvent{ updateEvents := append(extraUpdateEvents, UpdateEvent{
CorrelationID: serviceConfigIDPrefix + web.String(), CorrelationID: serviceConfigIDPrefix + web.String(),
Result: &structs.ServiceConfigResponse{ Result: &structs.ServiceConfigResponse{
ProxyConfig: map[string]interface{}{"protocol": "http"}, ProxyConfig: map[string]interface{}{"protocol": "http"},
@ -669,7 +667,7 @@ func TestConfigSnapshotTerminatingGatewayWithLambdaServiceAndServiceResolvers(t
web := structs.NewServiceName("web", nil) web := structs.NewServiceName("web", nil)
return TestConfigSnapshotTerminatingGatewayWithLambdaService(t, return TestConfigSnapshotTerminatingGatewayWithLambdaService(t,
agentcache.UpdateEvent{ UpdateEvent{
CorrelationID: serviceResolverIDPrefix + web.String(), CorrelationID: serviceResolverIDPrefix + web.String(),
Result: &structs.ConfigEntryResponse{ Result: &structs.ConfigEntryResponse{
Entry: &structs.ServiceResolverConfigEntry{ Entry: &structs.ServiceResolverConfigEntry{

View file

@ -5,7 +5,6 @@ import (
"github.com/mitchellh/go-testing-interface" "github.com/mitchellh/go-testing-interface"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/connect" "github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/agent/consul/discoverychain" "github.com/hashicorp/consul/agent/consul/discoverychain"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
@ -28,7 +27,7 @@ func TestConfigSnapshotTransparentProxy(t testing.T) *ConfigSnapshot {
return TestConfigSnapshot(t, func(ns *structs.NodeService) { return TestConfigSnapshot(t, func(ns *structs.NodeService) {
ns.Proxy.Mode = structs.ProxyModeTransparent ns.Proxy.Mode = structs.ProxyModeTransparent
}, []cache.UpdateEvent{ }, []UpdateEvent{
{ {
CorrelationID: meshConfigEntryID, CorrelationID: meshConfigEntryID,
Result: &structs.ConfigEntryResponse{ Result: &structs.ConfigEntryResponse{
@ -141,7 +140,7 @@ func TestConfigSnapshotTransparentProxyHTTPUpstream(t testing.T) *ConfigSnapshot
return TestConfigSnapshot(t, func(ns *structs.NodeService) { return TestConfigSnapshot(t, func(ns *structs.NodeService) {
ns.Proxy.Mode = structs.ProxyModeTransparent ns.Proxy.Mode = structs.ProxyModeTransparent
}, []cache.UpdateEvent{ }, []UpdateEvent{
{ {
CorrelationID: meshConfigEntryID, CorrelationID: meshConfigEntryID,
Result: &structs.ConfigEntryResponse{ Result: &structs.ConfigEntryResponse{
@ -245,7 +244,7 @@ func TestConfigSnapshotTransparentProxyCatalogDestinationsOnly(t testing.T) *Con
return TestConfigSnapshot(t, func(ns *structs.NodeService) { return TestConfigSnapshot(t, func(ns *structs.NodeService) {
ns.Proxy.Mode = structs.ProxyModeTransparent ns.Proxy.Mode = structs.ProxyModeTransparent
}, []cache.UpdateEvent{ }, []UpdateEvent{
{ {
CorrelationID: meshConfigEntryID, CorrelationID: meshConfigEntryID,
Result: &structs.ConfigEntryResponse{ Result: &structs.ConfigEntryResponse{
@ -335,7 +334,7 @@ func TestConfigSnapshotTransparentProxyDialDirectly(t testing.T) *ConfigSnapshot
return TestConfigSnapshot(t, func(ns *structs.NodeService) { return TestConfigSnapshot(t, func(ns *structs.NodeService) {
ns.Proxy.Mode = structs.ProxyModeTransparent ns.Proxy.Mode = structs.ProxyModeTransparent
}, []cache.UpdateEvent{ }, []UpdateEvent{
{ {
CorrelationID: meshConfigEntryID, CorrelationID: meshConfigEntryID,
Result: &structs.ConfigEntryResponse{ Result: &structs.ConfigEntryResponse{
@ -473,7 +472,7 @@ func TestConfigSnapshotTransparentProxyTerminatingGatewayCatalogDestinationsOnly
return TestConfigSnapshot(t, func(ns *structs.NodeService) { return TestConfigSnapshot(t, func(ns *structs.NodeService) {
ns.Proxy.Mode = structs.ProxyModeTransparent ns.Proxy.Mode = structs.ProxyModeTransparent
}, []cache.UpdateEvent{ }, []UpdateEvent{
{ {
CorrelationID: meshConfigEntryID, CorrelationID: meshConfigEntryID,
Result: &structs.ConfigEntryResponse{ Result: &structs.ConfigEntryResponse{

View file

@ -5,7 +5,6 @@ import (
"github.com/mitchellh/go-testing-interface" "github.com/mitchellh/go-testing-interface"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/connect" "github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/agent/consul/discoverychain" "github.com/hashicorp/consul/agent/consul/discoverychain"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
@ -16,7 +15,7 @@ func setupTestVariationConfigEntriesAndSnapshot(
variation string, variation string,
upstreams structs.Upstreams, upstreams structs.Upstreams,
additionalEntries ...structs.ConfigEntry, additionalEntries ...structs.ConfigEntry,
) []cache.UpdateEvent { ) []UpdateEvent {
var ( var (
dbUpstream = upstreams[0] dbUpstream = upstreams[0]
@ -25,7 +24,7 @@ func setupTestVariationConfigEntriesAndSnapshot(
dbChain := setupTestVariationDiscoveryChain(t, variation, additionalEntries...) dbChain := setupTestVariationDiscoveryChain(t, variation, additionalEntries...)
events := []cache.UpdateEvent{ events := []UpdateEvent{
{ {
CorrelationID: "discovery-chain:" + dbUID.String(), CorrelationID: "discovery-chain:" + dbUID.String(),
Result: &structs.DiscoveryChainResponse{ Result: &structs.DiscoveryChainResponse{
@ -46,14 +45,14 @@ func setupTestVariationConfigEntriesAndSnapshot(
case "simple": case "simple":
case "external-sni": case "external-sni":
case "failover": case "failover":
events = append(events, cache.UpdateEvent{ events = append(events, UpdateEvent{
CorrelationID: "upstream-target:fail.default.default.dc1:" + dbUID.String(), CorrelationID: "upstream-target:fail.default.default.dc1:" + dbUID.String(),
Result: &structs.IndexedCheckServiceNodes{ Result: &structs.IndexedCheckServiceNodes{
Nodes: TestUpstreamNodesAlternate(t), Nodes: TestUpstreamNodesAlternate(t),
}, },
}) })
case "failover-through-remote-gateway-triggered": case "failover-through-remote-gateway-triggered":
events = append(events, cache.UpdateEvent{ events = append(events, UpdateEvent{
CorrelationID: "upstream-target:db.default.default.dc1:" + dbUID.String(), CorrelationID: "upstream-target:db.default.default.dc1:" + dbUID.String(),
Result: &structs.IndexedCheckServiceNodes{ Result: &structs.IndexedCheckServiceNodes{
Nodes: TestUpstreamNodesInStatus(t, "critical"), Nodes: TestUpstreamNodesInStatus(t, "critical"),
@ -61,26 +60,26 @@ func setupTestVariationConfigEntriesAndSnapshot(
}) })
fallthrough fallthrough
case "failover-through-remote-gateway": case "failover-through-remote-gateway":
events = append(events, cache.UpdateEvent{ events = append(events, UpdateEvent{
CorrelationID: "upstream-target:db.default.default.dc2:" + dbUID.String(), CorrelationID: "upstream-target:db.default.default.dc2:" + dbUID.String(),
Result: &structs.IndexedCheckServiceNodes{ Result: &structs.IndexedCheckServiceNodes{
Nodes: TestUpstreamNodesDC2(t), Nodes: TestUpstreamNodesDC2(t),
}, },
}) })
events = append(events, cache.UpdateEvent{ events = append(events, UpdateEvent{
CorrelationID: "mesh-gateway:dc2:" + dbUID.String(), CorrelationID: "mesh-gateway:dc2:" + dbUID.String(),
Result: &structs.IndexedNodesWithGateways{ Result: &structs.IndexedNodesWithGateways{
Nodes: TestGatewayNodesDC2(t), Nodes: TestGatewayNodesDC2(t),
}, },
}) })
case "failover-through-double-remote-gateway-triggered": case "failover-through-double-remote-gateway-triggered":
events = append(events, cache.UpdateEvent{ events = append(events, UpdateEvent{
CorrelationID: "upstream-target:db.default.default.dc1:" + dbUID.String(), CorrelationID: "upstream-target:db.default.default.dc1:" + dbUID.String(),
Result: &structs.IndexedCheckServiceNodes{ Result: &structs.IndexedCheckServiceNodes{
Nodes: TestUpstreamNodesInStatus(t, "critical"), Nodes: TestUpstreamNodesInStatus(t, "critical"),
}, },
}) })
events = append(events, cache.UpdateEvent{ events = append(events, UpdateEvent{
CorrelationID: "upstream-target:db.default.default.dc2:" + dbUID.String(), CorrelationID: "upstream-target:db.default.default.dc2:" + dbUID.String(),
Result: &structs.IndexedCheckServiceNodes{ Result: &structs.IndexedCheckServiceNodes{
Nodes: TestUpstreamNodesInStatusDC2(t, "critical"), Nodes: TestUpstreamNodesInStatusDC2(t, "critical"),
@ -88,26 +87,26 @@ func setupTestVariationConfigEntriesAndSnapshot(
}) })
fallthrough fallthrough
case "failover-through-double-remote-gateway": case "failover-through-double-remote-gateway":
events = append(events, cache.UpdateEvent{ events = append(events, UpdateEvent{
CorrelationID: "upstream-target:db.default.default.dc3:" + dbUID.String(), CorrelationID: "upstream-target:db.default.default.dc3:" + dbUID.String(),
Result: &structs.IndexedCheckServiceNodes{ Result: &structs.IndexedCheckServiceNodes{
Nodes: TestUpstreamNodesDC2(t), Nodes: TestUpstreamNodesDC2(t),
}, },
}) })
events = append(events, cache.UpdateEvent{ events = append(events, UpdateEvent{
CorrelationID: "mesh-gateway:dc2:" + dbUID.String(), CorrelationID: "mesh-gateway:dc2:" + dbUID.String(),
Result: &structs.IndexedNodesWithGateways{ Result: &structs.IndexedNodesWithGateways{
Nodes: TestGatewayNodesDC2(t), Nodes: TestGatewayNodesDC2(t),
}, },
}) })
events = append(events, cache.UpdateEvent{ events = append(events, UpdateEvent{
CorrelationID: "mesh-gateway:dc3:" + dbUID.String(), CorrelationID: "mesh-gateway:dc3:" + dbUID.String(),
Result: &structs.IndexedNodesWithGateways{ Result: &structs.IndexedNodesWithGateways{
Nodes: TestGatewayNodesDC3(t), Nodes: TestGatewayNodesDC3(t),
}, },
}) })
case "failover-through-local-gateway-triggered": case "failover-through-local-gateway-triggered":
events = append(events, cache.UpdateEvent{ events = append(events, UpdateEvent{
CorrelationID: "upstream-target:db.default.default.dc1:" + dbUID.String(), CorrelationID: "upstream-target:db.default.default.dc1:" + dbUID.String(),
Result: &structs.IndexedCheckServiceNodes{ Result: &structs.IndexedCheckServiceNodes{
Nodes: TestUpstreamNodesInStatus(t, "critical"), Nodes: TestUpstreamNodesInStatus(t, "critical"),
@ -115,26 +114,26 @@ func setupTestVariationConfigEntriesAndSnapshot(
}) })
fallthrough fallthrough
case "failover-through-local-gateway": case "failover-through-local-gateway":
events = append(events, cache.UpdateEvent{ events = append(events, UpdateEvent{
CorrelationID: "upstream-target:db.default.default.dc2:" + dbUID.String(), CorrelationID: "upstream-target:db.default.default.dc2:" + dbUID.String(),
Result: &structs.IndexedCheckServiceNodes{ Result: &structs.IndexedCheckServiceNodes{
Nodes: TestUpstreamNodesDC2(t), Nodes: TestUpstreamNodesDC2(t),
}, },
}) })
events = append(events, cache.UpdateEvent{ events = append(events, UpdateEvent{
CorrelationID: "mesh-gateway:dc1:" + dbUID.String(), CorrelationID: "mesh-gateway:dc1:" + dbUID.String(),
Result: &structs.IndexedNodesWithGateways{ Result: &structs.IndexedNodesWithGateways{
Nodes: TestGatewayNodesDC1(t), Nodes: TestGatewayNodesDC1(t),
}, },
}) })
case "failover-through-double-local-gateway-triggered": case "failover-through-double-local-gateway-triggered":
events = append(events, cache.UpdateEvent{ events = append(events, UpdateEvent{
CorrelationID: "upstream-target:db.default.default.dc1:" + dbUID.String(), CorrelationID: "upstream-target:db.default.default.dc1:" + dbUID.String(),
Result: &structs.IndexedCheckServiceNodes{ Result: &structs.IndexedCheckServiceNodes{
Nodes: TestUpstreamNodesInStatus(t, "critical"), Nodes: TestUpstreamNodesInStatus(t, "critical"),
}, },
}) })
events = append(events, cache.UpdateEvent{ events = append(events, UpdateEvent{
CorrelationID: "upstream-target:db.default.default.dc2:" + dbUID.String(), CorrelationID: "upstream-target:db.default.default.dc2:" + dbUID.String(),
Result: &structs.IndexedCheckServiceNodes{ Result: &structs.IndexedCheckServiceNodes{
Nodes: TestUpstreamNodesInStatusDC2(t, "critical"), Nodes: TestUpstreamNodesInStatusDC2(t, "critical"),
@ -142,26 +141,26 @@ func setupTestVariationConfigEntriesAndSnapshot(
}) })
fallthrough fallthrough
case "failover-through-double-local-gateway": case "failover-through-double-local-gateway":
events = append(events, cache.UpdateEvent{ events = append(events, UpdateEvent{
CorrelationID: "upstream-target:db.default.default.dc3:" + dbUID.String(), CorrelationID: "upstream-target:db.default.default.dc3:" + dbUID.String(),
Result: &structs.IndexedCheckServiceNodes{ Result: &structs.IndexedCheckServiceNodes{
Nodes: TestUpstreamNodesDC2(t), Nodes: TestUpstreamNodesDC2(t),
}, },
}) })
events = append(events, cache.UpdateEvent{ events = append(events, UpdateEvent{
CorrelationID: "mesh-gateway:dc1:" + dbUID.String(), CorrelationID: "mesh-gateway:dc1:" + dbUID.String(),
Result: &structs.IndexedNodesWithGateways{ Result: &structs.IndexedNodesWithGateways{
Nodes: TestGatewayNodesDC1(t), Nodes: TestGatewayNodesDC1(t),
}, },
}) })
case "splitter-with-resolver-redirect-multidc": case "splitter-with-resolver-redirect-multidc":
events = append(events, cache.UpdateEvent{ events = append(events, UpdateEvent{
CorrelationID: "upstream-target:v1.db.default.default.dc1:" + dbUID.String(), CorrelationID: "upstream-target:v1.db.default.default.dc1:" + dbUID.String(),
Result: &structs.IndexedCheckServiceNodes{ Result: &structs.IndexedCheckServiceNodes{
Nodes: TestUpstreamNodes(t, "db"), Nodes: TestUpstreamNodes(t, "db"),
}, },
}) })
events = append(events, cache.UpdateEvent{ events = append(events, UpdateEvent{
CorrelationID: "upstream-target:v2.db.default.default.dc2:" + dbUID.String(), CorrelationID: "upstream-target:v2.db.default.default.dc2:" + dbUID.String(),
Result: &structs.IndexedCheckServiceNodes{ Result: &structs.IndexedCheckServiceNodes{
Nodes: TestUpstreamNodesDC2(t), Nodes: TestUpstreamNodesDC2(t),

View file

@ -9,7 +9,6 @@ import (
"github.com/mitchellh/mapstructure" "github.com/mitchellh/mapstructure"
"github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/cache"
cachetype "github.com/hashicorp/consul/agent/cache-types" cachetype "github.com/hashicorp/consul/agent/cache-types"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
) )
@ -18,7 +17,7 @@ type handlerUpstreams struct {
handlerState handlerState
} }
func (s *handlerUpstreams) handleUpdateUpstreams(ctx context.Context, u cache.UpdateEvent, snap *ConfigSnapshot) error { func (s *handlerUpstreams) handleUpdateUpstreams(ctx context.Context, u UpdateEvent, snap *ConfigSnapshot) error {
if u.Err != nil { if u.Err != nil {
return fmt.Errorf("error filling agent cache: %v", u.Err) return fmt.Errorf("error filling agent cache: %v", u.Err)
} }

View file

@ -26,12 +26,12 @@ type NetRPC interface {
type CacheGetter interface { type CacheGetter interface {
Get(ctx context.Context, t string, r cache.Request) (interface{}, cache.ResultMeta, error) Get(ctx context.Context, t string, r cache.Request) (interface{}, cache.ResultMeta, error)
Notify(ctx context.Context, t string, r cache.Request, cID string, ch chan<- cache.UpdateEvent) error NotifyCallback(ctx context.Context, t string, r cache.Request, cID string, cb cache.Callback) error
} }
type MaterializedViewStore interface { type MaterializedViewStore interface {
Get(ctx context.Context, req submatview.Request) (submatview.Result, error) Get(ctx context.Context, req submatview.Request) (submatview.Result, error)
Notify(ctx context.Context, req submatview.Request, cID string, ch chan<- cache.UpdateEvent) error NotifyCallback(ctx context.Context, req submatview.Request, cID string, cb cache.Callback) error
} }
func (c *Client) ServiceNodes( func (c *Client) ServiceNodes(
@ -91,14 +91,14 @@ func (c *Client) Notify(
ctx context.Context, ctx context.Context,
req structs.ServiceSpecificRequest, req structs.ServiceSpecificRequest,
correlationID string, correlationID string,
ch chan<- cache.UpdateEvent, cb cache.Callback,
) error { ) error {
if c.useStreaming(req) { if c.useStreaming(req) {
sr := c.newServiceRequest(req) sr := c.newServiceRequest(req)
return c.ViewStore.Notify(ctx, sr, correlationID, ch) return c.ViewStore.NotifyCallback(ctx, sr, correlationID, cb)
} }
return c.Cache.Notify(ctx, c.CacheName, &req, correlationID, ch) return c.Cache.NotifyCallback(ctx, c.CacheName, &req, correlationID, cb)
} }
func (c *Client) useStreaming(req structs.ServiceSpecificRequest) bool { func (c *Client) useStreaming(req structs.ServiceSpecificRequest) bool {

View file

@ -152,7 +152,7 @@ func (f *fakeCache) Get(_ context.Context, t string, _ cache.Request) (interface
return result, cache.ResultMeta{}, nil return result, cache.ResultMeta{}, nil
} }
func (f *fakeCache) Notify(_ context.Context, t string, _ cache.Request, _ string, _ chan<- cache.UpdateEvent) error { func (f *fakeCache) NotifyCallback(_ context.Context, t string, _ cache.Request, _ string, _ cache.Callback) error {
f.calls = append(f.calls, t) f.calls = append(f.calls, t)
return nil return nil
} }
@ -175,7 +175,7 @@ func (f *fakeViewStore) Get(_ context.Context, req submatview.Request) (submatvi
return submatview.Result{Value: &structs.IndexedCheckServiceNodes{}}, nil return submatview.Result{Value: &structs.IndexedCheckServiceNodes{}}, nil
} }
func (f *fakeViewStore) Notify(_ context.Context, req submatview.Request, _ string, _ chan<- cache.UpdateEvent) error { func (f *fakeViewStore) NotifyCallback(_ context.Context, req submatview.Request, _ string, _ cache.Callback) error {
f.calls = append(f.calls, req) f.calls = append(f.calls, req)
return nil return nil
} }

View file

@ -149,6 +149,23 @@ func (s *Store) Notify(
req Request, req Request,
correlationID string, correlationID string,
updateCh chan<- cache.UpdateEvent, updateCh chan<- cache.UpdateEvent,
) error {
return s.NotifyCallback(ctx, req, correlationID, func(ctx context.Context, event cache.UpdateEvent) {
select {
case updateCh <- event:
case <-ctx.Done():
return
}
})
}
// NotifyCallback subscribes to updates of the entry identified by req in the
// same way as Notify, but accepts a callback function instead of a channel.
func (s *Store) NotifyCallback(
ctx context.Context,
req Request,
correlationID string,
cb cache.Callback,
) error { ) error {
info := req.CacheInfo() info := req.CacheInfo()
key, materializer, err := s.readEntry(req) key, materializer, err := s.readEntry(req)
@ -174,16 +191,11 @@ func (s *Store) Notify(
} }
index = result.Index index = result.Index
u := cache.UpdateEvent{ cb(ctx, cache.UpdateEvent{
CorrelationID: correlationID, CorrelationID: correlationID,
Result: result.Value, Result: result.Value,
Meta: cache.ResultMeta{Index: result.Index, Hit: result.Cached}, Meta: cache.ResultMeta{Index: result.Index, Hit: result.Cached},
} })
select {
case updateCh <- u:
case <-ctx.Done():
return
}
} }
}() }()
return nil return nil

View file

@ -320,7 +320,12 @@ func (c *consumer) Consume(ctx context.Context, maxIndex uint64) error {
group, cctx := errgroup.WithContext(ctx) group, cctx := errgroup.WithContext(ctx)
group.Go(func() error { group.Go(func() error {
return c.healthClient.Notify(cctx, req, "", updateCh) return c.healthClient.Notify(cctx, req, "", func(ctx context.Context, event cache.UpdateEvent) {
select {
case updateCh <- event:
case <-ctx.Done():
}
})
}) })
group.Go(func() error { group.Go(func() error {
var idx uint64 var idx uint64

View file

@ -13,7 +13,6 @@ import (
testinf "github.com/mitchellh/go-testing-interface" testinf "github.com/mitchellh/go-testing-interface"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/proxycfg" "github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/xds/proxysupport" "github.com/hashicorp/consul/agent/xds/proxysupport"
@ -41,7 +40,7 @@ func TestClustersFromSnapshot(t *testing.T) {
{ {
name: "connect-proxy-with-tls-outgoing-min-version-auto", name: "connect-proxy-with-tls-outgoing-min-version-auto",
create: func(t testinf.T) *proxycfg.ConfigSnapshot { create: func(t testinf.T) *proxycfg.ConfigSnapshot {
return proxycfg.TestConfigSnapshot(t, nil, []cache.UpdateEvent{ return proxycfg.TestConfigSnapshot(t, nil, []proxycfg.UpdateEvent{
{ {
CorrelationID: "mesh", CorrelationID: "mesh",
Result: &structs.ConfigEntryResponse{ Result: &structs.ConfigEntryResponse{
@ -60,7 +59,7 @@ func TestClustersFromSnapshot(t *testing.T) {
{ {
name: "connect-proxy-with-tls-outgoing-min-version", name: "connect-proxy-with-tls-outgoing-min-version",
create: func(t testinf.T) *proxycfg.ConfigSnapshot { create: func(t testinf.T) *proxycfg.ConfigSnapshot {
return proxycfg.TestConfigSnapshot(t, nil, []cache.UpdateEvent{ return proxycfg.TestConfigSnapshot(t, nil, []proxycfg.UpdateEvent{
{ {
CorrelationID: "mesh", CorrelationID: "mesh",
Result: &structs.ConfigEntryResponse{ Result: &structs.ConfigEntryResponse{
@ -79,7 +78,7 @@ func TestClustersFromSnapshot(t *testing.T) {
{ {
name: "connect-proxy-with-tls-outgoing-max-version", name: "connect-proxy-with-tls-outgoing-max-version",
create: func(t testinf.T) *proxycfg.ConfigSnapshot { create: func(t testinf.T) *proxycfg.ConfigSnapshot {
return proxycfg.TestConfigSnapshot(t, nil, []cache.UpdateEvent{ return proxycfg.TestConfigSnapshot(t, nil, []proxycfg.UpdateEvent{
{ {
CorrelationID: "mesh", CorrelationID: "mesh",
Result: &structs.ConfigEntryResponse{ Result: &structs.ConfigEntryResponse{
@ -98,7 +97,7 @@ func TestClustersFromSnapshot(t *testing.T) {
{ {
name: "connect-proxy-with-tls-outgoing-cipher-suites", name: "connect-proxy-with-tls-outgoing-cipher-suites",
create: func(t testinf.T) *proxycfg.ConfigSnapshot { create: func(t testinf.T) *proxycfg.ConfigSnapshot {
return proxycfg.TestConfigSnapshot(t, nil, []cache.UpdateEvent{ return proxycfg.TestConfigSnapshot(t, nil, []proxycfg.UpdateEvent{
{ {
CorrelationID: "mesh", CorrelationID: "mesh",
Result: &structs.ConfigEntryResponse{ Result: &structs.ConfigEntryResponse{
@ -406,7 +405,7 @@ func TestClustersFromSnapshot(t *testing.T) {
{ {
name: "ingress-gateway-with-tls-outgoing-min-version", name: "ingress-gateway-with-tls-outgoing-min-version",
create: func(t testinf.T) *proxycfg.ConfigSnapshot { create: func(t testinf.T) *proxycfg.ConfigSnapshot {
return proxycfg.TestConfigSnapshotIngressGateway(t, true, "tcp", "default", nil, nil, []cache.UpdateEvent{ return proxycfg.TestConfigSnapshotIngressGateway(t, true, "tcp", "default", nil, nil, []proxycfg.UpdateEvent{
{ {
CorrelationID: "mesh", CorrelationID: "mesh",
Result: &structs.ConfigEntryResponse{ Result: &structs.ConfigEntryResponse{
@ -425,7 +424,7 @@ func TestClustersFromSnapshot(t *testing.T) {
{ {
name: "ingress-gateway-with-tls-outgoing-max-version", name: "ingress-gateway-with-tls-outgoing-max-version",
create: func(t testinf.T) *proxycfg.ConfigSnapshot { create: func(t testinf.T) *proxycfg.ConfigSnapshot {
return proxycfg.TestConfigSnapshotIngressGateway(t, true, "tcp", "default", nil, nil, []cache.UpdateEvent{ return proxycfg.TestConfigSnapshotIngressGateway(t, true, "tcp", "default", nil, nil, []proxycfg.UpdateEvent{
{ {
CorrelationID: "mesh", CorrelationID: "mesh",
Result: &structs.ConfigEntryResponse{ Result: &structs.ConfigEntryResponse{
@ -444,7 +443,7 @@ func TestClustersFromSnapshot(t *testing.T) {
{ {
name: "ingress-gateway-with-tls-outgoing-cipher-suites", name: "ingress-gateway-with-tls-outgoing-cipher-suites",
create: func(t testinf.T) *proxycfg.ConfigSnapshot { create: func(t testinf.T) *proxycfg.ConfigSnapshot {
return proxycfg.TestConfigSnapshotIngressGateway(t, true, "tcp", "default", nil, nil, []cache.UpdateEvent{ return proxycfg.TestConfigSnapshotIngressGateway(t, true, "tcp", "default", nil, nil, []proxycfg.UpdateEvent{
{ {
CorrelationID: "mesh", CorrelationID: "mesh",
Result: &structs.ConfigEntryResponse{ Result: &structs.ConfigEntryResponse{

View file

@ -13,7 +13,6 @@ import (
testinf "github.com/mitchellh/go-testing-interface" testinf "github.com/mitchellh/go-testing-interface"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/proxycfg" "github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/xds/proxysupport" "github.com/hashicorp/consul/agent/xds/proxysupport"
@ -46,7 +45,7 @@ func TestListenersFromSnapshot(t *testing.T) {
{ {
name: "connect-proxy-with-tls-outgoing-min-version-auto", name: "connect-proxy-with-tls-outgoing-min-version-auto",
create: func(t testinf.T) *proxycfg.ConfigSnapshot { create: func(t testinf.T) *proxycfg.ConfigSnapshot {
return proxycfg.TestConfigSnapshot(t, nil, []cache.UpdateEvent{ return proxycfg.TestConfigSnapshot(t, nil, []proxycfg.UpdateEvent{
{ {
CorrelationID: "mesh", CorrelationID: "mesh",
Result: &structs.ConfigEntryResponse{ Result: &structs.ConfigEntryResponse{
@ -65,7 +64,7 @@ func TestListenersFromSnapshot(t *testing.T) {
{ {
name: "connect-proxy-with-tls-incoming-min-version", name: "connect-proxy-with-tls-incoming-min-version",
create: func(t testinf.T) *proxycfg.ConfigSnapshot { create: func(t testinf.T) *proxycfg.ConfigSnapshot {
return proxycfg.TestConfigSnapshot(t, nil, []cache.UpdateEvent{ return proxycfg.TestConfigSnapshot(t, nil, []proxycfg.UpdateEvent{
{ {
CorrelationID: "mesh", CorrelationID: "mesh",
Result: &structs.ConfigEntryResponse{ Result: &structs.ConfigEntryResponse{
@ -84,7 +83,7 @@ func TestListenersFromSnapshot(t *testing.T) {
{ {
name: "connect-proxy-with-tls-incoming-max-version", name: "connect-proxy-with-tls-incoming-max-version",
create: func(t testinf.T) *proxycfg.ConfigSnapshot { create: func(t testinf.T) *proxycfg.ConfigSnapshot {
return proxycfg.TestConfigSnapshot(t, nil, []cache.UpdateEvent{ return proxycfg.TestConfigSnapshot(t, nil, []proxycfg.UpdateEvent{
{ {
CorrelationID: "mesh", CorrelationID: "mesh",
Result: &structs.ConfigEntryResponse{ Result: &structs.ConfigEntryResponse{
@ -103,7 +102,7 @@ func TestListenersFromSnapshot(t *testing.T) {
{ {
name: "connect-proxy-with-tls-incoming-cipher-suites", name: "connect-proxy-with-tls-incoming-cipher-suites",
create: func(t testinf.T) *proxycfg.ConfigSnapshot { create: func(t testinf.T) *proxycfg.ConfigSnapshot {
return proxycfg.TestConfigSnapshot(t, nil, []cache.UpdateEvent{ return proxycfg.TestConfigSnapshot(t, nil, []proxycfg.UpdateEvent{
{ {
CorrelationID: "mesh", CorrelationID: "mesh",
Result: &structs.ConfigEntryResponse{ Result: &structs.ConfigEntryResponse{
@ -173,7 +172,7 @@ func TestListenersFromSnapshot(t *testing.T) {
func(ns *structs.NodeService) { func(ns *structs.NodeService) {
ns.Proxy.Config["protocol"] = "http" ns.Proxy.Config["protocol"] = "http"
}, },
[]cache.UpdateEvent{ []proxycfg.UpdateEvent{
{ {
CorrelationID: "mesh", CorrelationID: "mesh",
Result: &structs.ConfigEntryResponse{ Result: &structs.ConfigEntryResponse{
@ -580,7 +579,7 @@ func TestListenersFromSnapshot(t *testing.T) {
{ {
name: "terminating-gateway-with-tls-incoming-min-version", name: "terminating-gateway-with-tls-incoming-min-version",
create: func(t testinf.T) *proxycfg.ConfigSnapshot { create: func(t testinf.T) *proxycfg.ConfigSnapshot {
return proxycfg.TestConfigSnapshotTerminatingGateway(t, true, nil, []cache.UpdateEvent{ return proxycfg.TestConfigSnapshotTerminatingGateway(t, true, nil, []proxycfg.UpdateEvent{
{ {
CorrelationID: "mesh", CorrelationID: "mesh",
Result: &structs.ConfigEntryResponse{ Result: &structs.ConfigEntryResponse{
@ -599,7 +598,7 @@ func TestListenersFromSnapshot(t *testing.T) {
{ {
name: "terminating-gateway-with-tls-incoming-max-version", name: "terminating-gateway-with-tls-incoming-max-version",
create: func(t testinf.T) *proxycfg.ConfigSnapshot { create: func(t testinf.T) *proxycfg.ConfigSnapshot {
return proxycfg.TestConfigSnapshotTerminatingGateway(t, true, nil, []cache.UpdateEvent{ return proxycfg.TestConfigSnapshotTerminatingGateway(t, true, nil, []proxycfg.UpdateEvent{
{ {
CorrelationID: "mesh", CorrelationID: "mesh",
Result: &structs.ConfigEntryResponse{ Result: &structs.ConfigEntryResponse{
@ -618,7 +617,7 @@ func TestListenersFromSnapshot(t *testing.T) {
{ {
name: "terminating-gateway-with-tls-incoming-cipher-suites", name: "terminating-gateway-with-tls-incoming-cipher-suites",
create: func(t testinf.T) *proxycfg.ConfigSnapshot { create: func(t testinf.T) *proxycfg.ConfigSnapshot {
return proxycfg.TestConfigSnapshotTerminatingGateway(t, true, nil, []cache.UpdateEvent{ return proxycfg.TestConfigSnapshotTerminatingGateway(t, true, nil, []proxycfg.UpdateEvent{
{ {
CorrelationID: "mesh", CorrelationID: "mesh",
Result: &structs.ConfigEntryResponse{ Result: &structs.ConfigEntryResponse{
@ -677,7 +676,7 @@ func TestListenersFromSnapshot(t *testing.T) {
name: "terminating-gateway-no-api-cert", name: "terminating-gateway-no-api-cert",
create: func(t testinf.T) *proxycfg.ConfigSnapshot { create: func(t testinf.T) *proxycfg.ConfigSnapshot {
api := structs.NewServiceName("api", nil) api := structs.NewServiceName("api", nil)
return proxycfg.TestConfigSnapshotTerminatingGateway(t, true, nil, []cache.UpdateEvent{ return proxycfg.TestConfigSnapshotTerminatingGateway(t, true, nil, []proxycfg.UpdateEvent{
{ {
CorrelationID: "service-leaf:" + api.String(), // serviceLeafIDPrefix CorrelationID: "service-leaf:" + api.String(), // serviceLeafIDPrefix
Result: nil, // tombstone this Result: nil, // tombstone this