cache: prevent goroutine leak in agent cache (#14908)

There is a bug in the error handling code for the Agent cache subsystem discovered:

1. NotifyCallback calls notifyBlockingQuery which calls getWithIndex in
   a loop (which backs off on-error up to 1 minute)

2. getWithIndex calls fetch if there’s no valid entry in the cache

3. fetch starts a goroutine which calls Fetch on the cache-type, waits
   for a while (again with backoff up to 1 minute for errors) and then
   calls fetch to trigger a refresh

The end result being that every 1 minute notifyBlockingQuery spawns an
ancestry of goroutines that essentially lives forever.

This PR ensures that the goroutine started by `fetch` cancels any prior
goroutine spawned by the same line for the same key.

In isolated testing where a cache type was tweaked to indefinitely
error, this patch prevented goroutine counts from skyrocketing.
This commit is contained in:
R.B. Boyer 2022-10-17 14:38:10 -05:00 committed by GitHub
parent ca916eec32
commit 9f41cc4a25
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 94 additions and 23 deletions

3
.changelog/14908.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:bug
cache: prevent goroutine leak in agent cache
```

81
agent/cache/cache.go vendored
View File

@ -139,6 +139,10 @@ type Cache struct {
entries map[string]cacheEntry
entriesExpiryHeap *ttlcache.ExpiryHeap
fetchLock sync.Mutex
lastFetchID uint64
fetchHandles map[string]fetchHandle
// stopped is used as an atomic flag to signal that the Cache has been
// discarded so background fetches and expiry processing should stop.
stopped uint32
@ -150,6 +154,11 @@ type Cache struct {
rateLimitCancel context.CancelFunc
}
type fetchHandle struct {
id uint64
stopCh chan struct{}
}
// typeEntry is a single type that is registered with a Cache.
type typeEntry struct {
// Name that was used to register the Type
@ -225,6 +234,7 @@ func New(options Options) *Cache {
types: make(map[string]typeEntry),
entries: make(map[string]cacheEntry),
entriesExpiryHeap: ttlcache.NewExpiryHeap(),
fetchHandles: make(map[string]fetchHandle),
stopCh: make(chan struct{}),
options: options,
rateLimitContext: ctx,
@ -614,8 +624,18 @@ func (c *Cache) fetch(key string, r getOptions, allowNew bool, attempt uint, ign
metrics.SetGauge([]string{"cache", "entries_count"}, float32(len(c.entries)))
tEntry := r.TypeEntry
// The actual Fetch must be performed in a goroutine.
go func() {
// The actual Fetch must be performed in a goroutine. Ensure that we only
// have one in-flight at a time, but don't use a deferred
// context.WithCancel style termination so that these things outlive their
// requester.
//
// By the time we get here the system WANTS to make a replacement fetcher, so
// we terminate the prior one and replace it.
handle := c.getOrReplaceFetchHandle(key)
go func(handle fetchHandle) {
defer c.deleteFetchHandle(key, handle.id)
// If we have background refresh and currently are in "disconnected" state,
// waiting for a response might mean we mark our results as stale for up to
// 10 minutes (max blocking timeout) after connection is restored. To reduce
@ -666,6 +686,14 @@ func (c *Cache) fetch(key string, r getOptions, allowNew bool, attempt uint, ign
connectedTimer.Stop()
}
// If we were stopped while waiting on a blocking query now would be a
// good time to detect that.
select {
case <-handle.stopCh:
return
default:
}
// Copy the existing entry to start.
newEntry := entry
newEntry.Fetching = false
@ -820,13 +848,15 @@ func (c *Cache) fetch(key string, r getOptions, allowNew bool, attempt uint, ign
}
// If we're over the attempt minimum, start an exponential backoff.
if wait := backOffWait(attempt); wait > 0 {
time.Sleep(wait)
}
wait := backOffWait(attempt)
// If we have a timer, wait for it
if tEntry.Opts.RefreshTimer > 0 {
time.Sleep(tEntry.Opts.RefreshTimer)
wait += tEntry.Opts.RefreshTimer
select {
case <-time.After(wait):
case <-handle.stopCh:
return
}
// Trigger. The "allowNew" field is false because in the time we were
@ -836,11 +866,46 @@ func (c *Cache) fetch(key string, r getOptions, allowNew bool, attempt uint, ign
r.Info.MinIndex = 0
c.fetch(key, r, false, attempt, true)
}
}()
}(handle)
return entry.Waiter
}
func (c *Cache) getOrReplaceFetchHandle(key string) fetchHandle {
c.fetchLock.Lock()
defer c.fetchLock.Unlock()
if prevHandle, ok := c.fetchHandles[key]; ok {
close(prevHandle.stopCh)
}
c.lastFetchID++
handle := fetchHandle{
id: c.lastFetchID,
stopCh: make(chan struct{}),
}
c.fetchHandles[key] = handle
return handle
}
func (c *Cache) deleteFetchHandle(key string, fetchID uint64) {
c.fetchLock.Lock()
defer c.fetchLock.Unlock()
// Only remove a fetchHandle if it's YOUR fetchHandle.
handle, ok := c.fetchHandles[key]
if !ok {
return
}
if handle.id == fetchID {
delete(c.fetchHandles, key)
}
}
func backOffWait(failures uint) time.Duration {
if failures > CacheRefreshBackoffMin {
shift := failures - CacheRefreshBackoffMin

View File

@ -18,6 +18,7 @@ import (
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/lib/ttlcache"
"github.com/hashicorp/consul/sdk/testutil"
"github.com/hashicorp/consul/sdk/testutil/retry"
)
// Test a basic Get with no indexes (and therefore no blocking queries).
@ -1750,12 +1751,22 @@ func TestCache_RefreshLifeCycle(t *testing.T) {
require.NoError(t, err)
require.Equal(t, true, result)
// ensure that the entry is fetching again
waitUntilFetching := func(expectValue bool) {
retry.Run(t, func(t *retry.R) {
c.entriesLock.Lock()
defer c.entriesLock.Unlock()
entry, ok := c.entries[key]
require.True(t, ok)
if expectValue {
require.True(t, entry.Fetching)
c.entriesLock.Unlock()
} else {
require.False(t, entry.Fetching)
}
})
}
// ensure that the entry is fetching again
waitUntilFetching(true)
requestChan := make(chan error)
@ -1789,11 +1800,7 @@ func TestCache_RefreshLifeCycle(t *testing.T) {
}
// ensure that the entry is fetching again
c.entriesLock.Lock()
entry, ok = c.entries[key]
require.True(t, ok)
require.True(t, entry.Fetching)
c.entriesLock.Unlock()
waitUntilFetching(true)
// background a call that will wait for a newer version - will result in an acl not found error
go getError(5)
@ -1814,11 +1821,7 @@ func TestCache_RefreshLifeCycle(t *testing.T) {
// ensure that the ACL not found error killed off the background refresh
// but didn't remove it from the cache
c.entriesLock.Lock()
entry, ok = c.entries[key]
require.True(t, ok)
require.False(t, entry.Fetching)
c.entriesLock.Unlock()
waitUntilFetching(false)
}
type fakeType struct {