From 9f41cc4a252785b02eaa24eca4170b2b12e6a3d4 Mon Sep 17 00:00:00 2001 From: "R.B. Boyer" <4903+rboyer@users.noreply.github.com> Date: Mon, 17 Oct 2022 14:38:10 -0500 Subject: [PATCH] cache: prevent goroutine leak in agent cache (#14908) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit There is a bug in the error handling code for the Agent cache subsystem discovered: 1. NotifyCallback calls notifyBlockingQuery which calls getWithIndex in a loop (which backs off on-error up to 1 minute) 2. getWithIndex calls fetch if there’s no valid entry in the cache 3. fetch starts a goroutine which calls Fetch on the cache-type, waits for a while (again with backoff up to 1 minute for errors) and then calls fetch to trigger a refresh The end result being that every 1 minute notifyBlockingQuery spawns an ancestry of goroutines that essentially lives forever. This PR ensures that the goroutine started by `fetch` cancels any prior goroutine spawned by the same line for the same key. In isolated testing where a cache type was tweaked to indefinitely error, this patch prevented goroutine counts from skyrocketing. --- .changelog/14908.txt | 3 ++ agent/cache/cache.go | 81 +++++++++++++++++++++++++++++++++++---- agent/cache/cache_test.go | 33 ++++++++-------- 3 files changed, 94 insertions(+), 23 deletions(-) create mode 100644 .changelog/14908.txt diff --git a/.changelog/14908.txt b/.changelog/14908.txt new file mode 100644 index 000000000..154337a7e --- /dev/null +++ b/.changelog/14908.txt @@ -0,0 +1,3 @@ +```release-note:bug +cache: prevent goroutine leak in agent cache +``` diff --git a/agent/cache/cache.go b/agent/cache/cache.go index 0252c2dcf..ea537cc9e 100644 --- a/agent/cache/cache.go +++ b/agent/cache/cache.go @@ -139,6 +139,10 @@ type Cache struct { entries map[string]cacheEntry entriesExpiryHeap *ttlcache.ExpiryHeap + fetchLock sync.Mutex + lastFetchID uint64 + fetchHandles map[string]fetchHandle + // stopped is used as an atomic flag to signal that the Cache has been // discarded so background fetches and expiry processing should stop. stopped uint32 @@ -150,6 +154,11 @@ type Cache struct { rateLimitCancel context.CancelFunc } +type fetchHandle struct { + id uint64 + stopCh chan struct{} +} + // typeEntry is a single type that is registered with a Cache. type typeEntry struct { // Name that was used to register the Type @@ -225,6 +234,7 @@ func New(options Options) *Cache { types: make(map[string]typeEntry), entries: make(map[string]cacheEntry), entriesExpiryHeap: ttlcache.NewExpiryHeap(), + fetchHandles: make(map[string]fetchHandle), stopCh: make(chan struct{}), options: options, rateLimitContext: ctx, @@ -614,8 +624,18 @@ func (c *Cache) fetch(key string, r getOptions, allowNew bool, attempt uint, ign metrics.SetGauge([]string{"cache", "entries_count"}, float32(len(c.entries))) tEntry := r.TypeEntry - // The actual Fetch must be performed in a goroutine. - go func() { + + // The actual Fetch must be performed in a goroutine. Ensure that we only + // have one in-flight at a time, but don't use a deferred + // context.WithCancel style termination so that these things outlive their + // requester. + // + // By the time we get here the system WANTS to make a replacement fetcher, so + // we terminate the prior one and replace it. + handle := c.getOrReplaceFetchHandle(key) + go func(handle fetchHandle) { + defer c.deleteFetchHandle(key, handle.id) + // If we have background refresh and currently are in "disconnected" state, // waiting for a response might mean we mark our results as stale for up to // 10 minutes (max blocking timeout) after connection is restored. To reduce @@ -666,6 +686,14 @@ func (c *Cache) fetch(key string, r getOptions, allowNew bool, attempt uint, ign connectedTimer.Stop() } + // If we were stopped while waiting on a blocking query now would be a + // good time to detect that. + select { + case <-handle.stopCh: + return + default: + } + // Copy the existing entry to start. newEntry := entry newEntry.Fetching = false @@ -820,13 +848,15 @@ func (c *Cache) fetch(key string, r getOptions, allowNew bool, attempt uint, ign } // If we're over the attempt minimum, start an exponential backoff. - if wait := backOffWait(attempt); wait > 0 { - time.Sleep(wait) - } + wait := backOffWait(attempt) // If we have a timer, wait for it - if tEntry.Opts.RefreshTimer > 0 { - time.Sleep(tEntry.Opts.RefreshTimer) + wait += tEntry.Opts.RefreshTimer + + select { + case <-time.After(wait): + case <-handle.stopCh: + return } // Trigger. The "allowNew" field is false because in the time we were @@ -836,11 +866,46 @@ func (c *Cache) fetch(key string, r getOptions, allowNew bool, attempt uint, ign r.Info.MinIndex = 0 c.fetch(key, r, false, attempt, true) } - }() + }(handle) return entry.Waiter } +func (c *Cache) getOrReplaceFetchHandle(key string) fetchHandle { + c.fetchLock.Lock() + defer c.fetchLock.Unlock() + + if prevHandle, ok := c.fetchHandles[key]; ok { + close(prevHandle.stopCh) + } + + c.lastFetchID++ + + handle := fetchHandle{ + id: c.lastFetchID, + stopCh: make(chan struct{}), + } + + c.fetchHandles[key] = handle + + return handle +} + +func (c *Cache) deleteFetchHandle(key string, fetchID uint64) { + c.fetchLock.Lock() + defer c.fetchLock.Unlock() + + // Only remove a fetchHandle if it's YOUR fetchHandle. + handle, ok := c.fetchHandles[key] + if !ok { + return + } + + if handle.id == fetchID { + delete(c.fetchHandles, key) + } +} + func backOffWait(failures uint) time.Duration { if failures > CacheRefreshBackoffMin { shift := failures - CacheRefreshBackoffMin diff --git a/agent/cache/cache_test.go b/agent/cache/cache_test.go index a93969c2c..6f8805be0 100644 --- a/agent/cache/cache_test.go +++ b/agent/cache/cache_test.go @@ -18,6 +18,7 @@ import ( "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/lib/ttlcache" "github.com/hashicorp/consul/sdk/testutil" + "github.com/hashicorp/consul/sdk/testutil/retry" ) // Test a basic Get with no indexes (and therefore no blocking queries). @@ -1750,12 +1751,22 @@ func TestCache_RefreshLifeCycle(t *testing.T) { require.NoError(t, err) require.Equal(t, true, result) + waitUntilFetching := func(expectValue bool) { + retry.Run(t, func(t *retry.R) { + c.entriesLock.Lock() + defer c.entriesLock.Unlock() + entry, ok := c.entries[key] + require.True(t, ok) + if expectValue { + require.True(t, entry.Fetching) + } else { + require.False(t, entry.Fetching) + } + }) + } + // ensure that the entry is fetching again - c.entriesLock.Lock() - entry, ok := c.entries[key] - require.True(t, ok) - require.True(t, entry.Fetching) - c.entriesLock.Unlock() + waitUntilFetching(true) requestChan := make(chan error) @@ -1789,11 +1800,7 @@ func TestCache_RefreshLifeCycle(t *testing.T) { } // ensure that the entry is fetching again - c.entriesLock.Lock() - entry, ok = c.entries[key] - require.True(t, ok) - require.True(t, entry.Fetching) - c.entriesLock.Unlock() + waitUntilFetching(true) // background a call that will wait for a newer version - will result in an acl not found error go getError(5) @@ -1814,11 +1821,7 @@ func TestCache_RefreshLifeCycle(t *testing.T) { // ensure that the ACL not found error killed off the background refresh // but didn't remove it from the cache - c.entriesLock.Lock() - entry, ok = c.entries[key] - require.True(t, ok) - require.False(t, entry.Fetching) - c.entriesLock.Unlock() + waitUntilFetching(false) } type fakeType struct {