diff --git a/.changelog/14908.txt b/.changelog/14908.txt new file mode 100644 index 000000000..154337a7e --- /dev/null +++ b/.changelog/14908.txt @@ -0,0 +1,3 @@ +```release-note:bug +cache: prevent goroutine leak in agent cache +``` diff --git a/agent/cache/cache.go b/agent/cache/cache.go index 0252c2dcf..ea537cc9e 100644 --- a/agent/cache/cache.go +++ b/agent/cache/cache.go @@ -139,6 +139,10 @@ type Cache struct { entries map[string]cacheEntry entriesExpiryHeap *ttlcache.ExpiryHeap + fetchLock sync.Mutex + lastFetchID uint64 + fetchHandles map[string]fetchHandle + // stopped is used as an atomic flag to signal that the Cache has been // discarded so background fetches and expiry processing should stop. stopped uint32 @@ -150,6 +154,11 @@ type Cache struct { rateLimitCancel context.CancelFunc } +type fetchHandle struct { + id uint64 + stopCh chan struct{} +} + // typeEntry is a single type that is registered with a Cache. type typeEntry struct { // Name that was used to register the Type @@ -225,6 +234,7 @@ func New(options Options) *Cache { types: make(map[string]typeEntry), entries: make(map[string]cacheEntry), entriesExpiryHeap: ttlcache.NewExpiryHeap(), + fetchHandles: make(map[string]fetchHandle), stopCh: make(chan struct{}), options: options, rateLimitContext: ctx, @@ -614,8 +624,18 @@ func (c *Cache) fetch(key string, r getOptions, allowNew bool, attempt uint, ign metrics.SetGauge([]string{"cache", "entries_count"}, float32(len(c.entries))) tEntry := r.TypeEntry - // The actual Fetch must be performed in a goroutine. - go func() { + + // The actual Fetch must be performed in a goroutine. Ensure that we only + // have one in-flight at a time, but don't use a deferred + // context.WithCancel style termination so that these things outlive their + // requester. + // + // By the time we get here the system WANTS to make a replacement fetcher, so + // we terminate the prior one and replace it. + handle := c.getOrReplaceFetchHandle(key) + go func(handle fetchHandle) { + defer c.deleteFetchHandle(key, handle.id) + // If we have background refresh and currently are in "disconnected" state, // waiting for a response might mean we mark our results as stale for up to // 10 minutes (max blocking timeout) after connection is restored. To reduce @@ -666,6 +686,14 @@ func (c *Cache) fetch(key string, r getOptions, allowNew bool, attempt uint, ign connectedTimer.Stop() } + // If we were stopped while waiting on a blocking query now would be a + // good time to detect that. + select { + case <-handle.stopCh: + return + default: + } + // Copy the existing entry to start. newEntry := entry newEntry.Fetching = false @@ -820,13 +848,15 @@ func (c *Cache) fetch(key string, r getOptions, allowNew bool, attempt uint, ign } // If we're over the attempt minimum, start an exponential backoff. - if wait := backOffWait(attempt); wait > 0 { - time.Sleep(wait) - } + wait := backOffWait(attempt) // If we have a timer, wait for it - if tEntry.Opts.RefreshTimer > 0 { - time.Sleep(tEntry.Opts.RefreshTimer) + wait += tEntry.Opts.RefreshTimer + + select { + case <-time.After(wait): + case <-handle.stopCh: + return } // Trigger. The "allowNew" field is false because in the time we were @@ -836,11 +866,46 @@ func (c *Cache) fetch(key string, r getOptions, allowNew bool, attempt uint, ign r.Info.MinIndex = 0 c.fetch(key, r, false, attempt, true) } - }() + }(handle) return entry.Waiter } +func (c *Cache) getOrReplaceFetchHandle(key string) fetchHandle { + c.fetchLock.Lock() + defer c.fetchLock.Unlock() + + if prevHandle, ok := c.fetchHandles[key]; ok { + close(prevHandle.stopCh) + } + + c.lastFetchID++ + + handle := fetchHandle{ + id: c.lastFetchID, + stopCh: make(chan struct{}), + } + + c.fetchHandles[key] = handle + + return handle +} + +func (c *Cache) deleteFetchHandle(key string, fetchID uint64) { + c.fetchLock.Lock() + defer c.fetchLock.Unlock() + + // Only remove a fetchHandle if it's YOUR fetchHandle. + handle, ok := c.fetchHandles[key] + if !ok { + return + } + + if handle.id == fetchID { + delete(c.fetchHandles, key) + } +} + func backOffWait(failures uint) time.Duration { if failures > CacheRefreshBackoffMin { shift := failures - CacheRefreshBackoffMin diff --git a/agent/cache/cache_test.go b/agent/cache/cache_test.go index a93969c2c..6f8805be0 100644 --- a/agent/cache/cache_test.go +++ b/agent/cache/cache_test.go @@ -18,6 +18,7 @@ import ( "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/lib/ttlcache" "github.com/hashicorp/consul/sdk/testutil" + "github.com/hashicorp/consul/sdk/testutil/retry" ) // Test a basic Get with no indexes (and therefore no blocking queries). @@ -1750,12 +1751,22 @@ func TestCache_RefreshLifeCycle(t *testing.T) { require.NoError(t, err) require.Equal(t, true, result) + waitUntilFetching := func(expectValue bool) { + retry.Run(t, func(t *retry.R) { + c.entriesLock.Lock() + defer c.entriesLock.Unlock() + entry, ok := c.entries[key] + require.True(t, ok) + if expectValue { + require.True(t, entry.Fetching) + } else { + require.False(t, entry.Fetching) + } + }) + } + // ensure that the entry is fetching again - c.entriesLock.Lock() - entry, ok := c.entries[key] - require.True(t, ok) - require.True(t, entry.Fetching) - c.entriesLock.Unlock() + waitUntilFetching(true) requestChan := make(chan error) @@ -1789,11 +1800,7 @@ func TestCache_RefreshLifeCycle(t *testing.T) { } // ensure that the entry is fetching again - c.entriesLock.Lock() - entry, ok = c.entries[key] - require.True(t, ok) - require.True(t, entry.Fetching) - c.entriesLock.Unlock() + waitUntilFetching(true) // background a call that will wait for a newer version - will result in an acl not found error go getError(5) @@ -1814,11 +1821,7 @@ func TestCache_RefreshLifeCycle(t *testing.T) { // ensure that the ACL not found error killed off the background refresh // but didn't remove it from the cache - c.entriesLock.Lock() - entry, ok = c.entries[key] - require.True(t, ok) - require.False(t, entry.Fetching) - c.entriesLock.Unlock() + waitUntilFetching(false) } type fakeType struct {