From a01936442c4484967ad8165d126944ea47e4d6cc Mon Sep 17 00:00:00 2001 From: "R.B. Boyer" <4903+rboyer@users.noreply.github.com> Date: Tue, 25 Oct 2022 10:27:26 -0500 Subject: [PATCH] cache: refactor agent cache fetching to prevent unnecessary fetches on error (#14956) This continues the work done in #14908 where a crude solution to prevent a goroutine leak was implemented. The former code would launch a perpetual goroutine family every iteration (+1 +1) and the fixed code simply caused a new goroutine family to first cancel the prior one to prevent the leak (-1 +1 == 0). This PR refactors this code completely to: - make it more understandable - remove the recursion-via-goroutine strangeness - prevent unnecessary RPC fetches when the prior one has errored. The core issue arose from a conflation of the entry.Fetching field to mean: - there is an RPC (blocking query) in flight right now - there is a goroutine running to manage the RPC fetch retry loop The problem is that the goroutine-leak-avoidance check would treat Fetching like (2), but within the body of a goroutine it would flip that boolean back to false before the retry sleep. This would cause a new chain of goroutines to launch which #14908 would correct crudely. The refactored code uses a plain for-loop and changes the semantics to track state for "is there a goroutine associated with this cache entry" instead of the former. We use a uint64 unique identity per goroutine instead of a boolean so that any orphaned goroutines can tell when they've been replaced when the expiry loop deletes a cache entry while the goroutine is still running and is later replaced. --- .changelog/14956.txt | 3 + agent/agent_test.go | 12 +- agent/cache/cache.go | 308 +++++++++--------- agent/cache/cache_test.go | 33 +- agent/cache/entry.go | 6 +- agent/cache/watch.go | 4 +- .../TestRuntimeConfig_Sanitize.golden | 2 + agent/testagent.go | 3 + 8 files changed, 196 insertions(+), 175 deletions(-) create mode 100644 .changelog/14956.txt diff --git a/.changelog/14956.txt b/.changelog/14956.txt new file mode 100644 index 000000000..fac4bc12e --- /dev/null +++ b/.changelog/14956.txt @@ -0,0 +1,3 @@ +```release-note:bug +cache: refactor agent cache fetching to prevent unnecessary fetches on error +``` diff --git a/agent/agent_test.go b/agent/agent_test.go index 2c941e905..ff36a4223 100644 --- a/agent/agent_test.go +++ b/agent/agent_test.go @@ -4795,19 +4795,19 @@ services { deadlineCh := time.After(10 * time.Second) start := time.Now() -LOOP: for { select { case evt := <-ch: // We may receive several notifications of an error until we get the // first successful reply. require.Equal(t, "foo", evt.CorrelationID) - if evt.Err != nil { - break LOOP + if evt.Err == nil { + require.NoError(t, evt.Err) + require.NotNil(t, evt.Result) + t.Logf("took %s to get first success", time.Since(start)) + return } - require.NoError(t, evt.Err) - require.NotNil(t, evt.Result) - t.Logf("took %s to get first success", time.Since(start)) + t.Logf("saw error: %v", evt.Err) case <-deadlineCh: t.Fatal("did not get notified successfully") } diff --git a/agent/cache/cache.go b/agent/cache/cache.go index ea537cc9e..55b1654af 100644 --- a/agent/cache/cache.go +++ b/agent/cache/cache.go @@ -84,8 +84,8 @@ var Counters = []prometheus.CounterDefinition{ // Constants related to refresh backoff. We probably don't ever need to // make these configurable knobs since they primarily exist to lower load. const ( - CacheRefreshBackoffMin = 3 // 3 attempts before backing off - CacheRefreshMaxWait = 1 * time.Minute // maximum backoff wait time + DefaultCacheRefreshBackoffMin = 3 // 3 attempts before backing off + DefaultCacheRefreshMaxWait = 1 * time.Minute // maximum backoff wait time // The following constants are default values for the cache entry // rate limiter settings. @@ -138,10 +138,7 @@ type Cache struct { entriesLock sync.RWMutex entries map[string]cacheEntry entriesExpiryHeap *ttlcache.ExpiryHeap - - fetchLock sync.Mutex - lastFetchID uint64 - fetchHandles map[string]fetchHandle + lastGoroutineID uint64 // stopped is used as an atomic flag to signal that the Cache has been // discarded so background fetches and expiry processing should stop. @@ -154,11 +151,6 @@ type Cache struct { rateLimitCancel context.CancelFunc } -type fetchHandle struct { - id uint64 - stopCh chan struct{} -} - // typeEntry is a single type that is registered with a Cache. type typeEntry struct { // Name that was used to register the Type @@ -204,6 +196,13 @@ type Options struct { EntryFetchMaxBurst int // EntryFetchRate represents the max calls/sec for a single cache entry EntryFetchRate rate.Limit + + // CacheRefreshBackoffMin is the number of attempts to wait before backing off. + // Mostly configurable just for testing. + CacheRefreshBackoffMin uint + // CacheRefreshMaxWait is the maximum backoff wait time. + // Mostly configurable just for testing. + CacheRefreshMaxWait time.Duration } // Equal return true if both options are equivalent @@ -219,6 +218,12 @@ func applyDefaultValuesOnOptions(options Options) Options { if options.EntryFetchMaxBurst == 0 { options.EntryFetchMaxBurst = DefaultEntryFetchMaxBurst } + if options.CacheRefreshBackoffMin == 0 { + options.CacheRefreshBackoffMin = DefaultCacheRefreshBackoffMin + } + if options.CacheRefreshMaxWait == 0 { + options.CacheRefreshMaxWait = DefaultCacheRefreshMaxWait + } if options.Logger == nil { options.Logger = hclog.New(nil) } @@ -234,7 +239,6 @@ func New(options Options) *Cache { types: make(map[string]typeEntry), entries: make(map[string]cacheEntry), entriesExpiryHeap: ttlcache.NewExpiryHeap(), - fetchHandles: make(map[string]fetchHandle), stopCh: make(chan struct{}), options: options, rateLimitContext: ctx, @@ -404,11 +408,23 @@ func (c *Cache) getEntryLocked( // Check if re-validate is requested. If so the first time round the // loop is not a hit but subsequent ones should be treated normally. if !tEntry.Opts.Refresh && info.MustRevalidate { - if entry.Fetching { - // There is an active blocking query for this data, which has not - // returned. We can logically deduce that the contents of the cache - // are actually current, and we can simply return this while - // leaving the blocking query alone. + // It is important to note that this block ONLY applies when we are not + // in indefinite refresh mode (where the underlying goroutine will + // continue to re-query for data). + // + // In this mode goroutines have a 1:1 relationship to RPCs that get + // executed, and importantly they DO NOT SLEEP after executing. + // + // This means that a running goroutine for this cache entry extremely + // strongly implies that the RPC has not yet completed, which is why + // this check works for the revalidation-avoidance optimization here. + if entry.GoroutineID != 0 { + // There is an active goroutine performing a blocking query for + // this data, which has not returned. + // + // We can logically deduce that the contents of the cache are + // actually current, and we can simply return this while leaving + // the blocking query alone. return true, true, entry } return true, false, entry @@ -538,7 +554,7 @@ RETRY_GET: // At this point, we know we either don't have a value at all or the // value we have is too old. We need to wait for new data. - waiterCh := c.fetch(key, r, true, 0, false) + waiterCh := c.fetch(key, r) // No longer our first time through first = false @@ -565,46 +581,36 @@ func makeEntryKey(t, dc, peerName, token, key string) string { return fmt.Sprintf("%s/%s/%s/%s", t, dc, token, key) } -// fetch triggers a new background fetch for the given Request. If a -// background fetch is already running for a matching Request, the waiter -// channel for that request is returned. The effect of this is that there -// is only ever one blocking query for any matching requests. -// -// If allowNew is true then the fetch should create the cache entry -// if it doesn't exist. If this is false, then fetch will do nothing -// if the entry doesn't exist. This latter case is to support refreshing. -func (c *Cache) fetch(key string, r getOptions, allowNew bool, attempt uint, ignoreExisting bool) <-chan struct{} { - // We acquire a write lock because we may have to set Fetching to true. +// fetch triggers a new background fetch for the given Request. If a background +// fetch is already running or a goroutine to manage that still exists for a +// matching Request, the waiter channel for that request is returned. The +// effect of this is that there is only ever one blocking query and goroutine +// for any matching requests. +func (c *Cache) fetch(key string, r getOptions) <-chan struct{} { c.entriesLock.Lock() defer c.entriesLock.Unlock() + ok, entryValid, entry := c.getEntryLocked(r.TypeEntry, key, r.Info) - // This handles the case where a fetch succeeded after checking for its existence in - // getWithIndex. This ensures that we don't miss updates. - if ok && entryValid && !ignoreExisting { + switch { + case ok && entryValid: + // This handles the case where a fetch succeeded after checking for its + // existence in getWithIndex. This ensures that we don't miss updates. ch := make(chan struct{}) close(ch) return ch - } - // If we aren't allowing new values and we don't have an existing value, - // return immediately. We return an immediately-closed channel so nothing - // blocks. - if !ok && !allowNew { - ch := make(chan struct{}) - close(ch) - return ch - } - - // If we already have an entry and it is actively fetching, then return - // the currently active waiter. - if ok && entry.Fetching { + case ok && entry.GoroutineID != 0: + // If we already have an entry and there's a goroutine to keep it + // refreshed then don't spawn another one to do the same work. + // + // Return the currently active waiter. return entry.Waiter - } - // If we don't have an entry, then create it. The entry must be marked - // as invalid so that it isn't returned as a valid value for a zero index. - if !ok { + case !ok: + // If we don't have an entry, then create it. The entry must be marked + // as invalid so that it isn't returned as a valid value for a zero + // index. entry = cacheEntry{ Valid: false, Waiter: make(chan struct{}), @@ -615,27 +621,100 @@ func (c *Cache) fetch(key string, r getOptions, allowNew bool, attempt uint, ign } } - // Set that we're fetching to true, which makes it so that future - // identical calls to fetch will return the same waiter rather than - // perform multiple fetches. - entry.Fetching = true + // Assign each background fetching goroutine a unique ID and fingerprint + // the cache entry with the same ID. This way if the cache entry is ever + // cleaned up due to expiry and later recreated the old goroutine can + // detect that and terminate rather than leak and do double work. + c.lastGoroutineID++ + entry.GoroutineID = c.lastGoroutineID c.entries[key] = entry metrics.SetGauge([]string{"consul", "cache", "entries_count"}, float32(len(c.entries))) metrics.SetGauge([]string{"cache", "entries_count"}, float32(len(c.entries))) + // The actual Fetch must be performed in a goroutine. + go c.launchBackgroundFetcher(entry.GoroutineID, key, r) + + return entry.Waiter +} + +func (c *Cache) launchBackgroundFetcher(goroutineID uint64, key string, r getOptions) { + defer func() { + c.entriesLock.Lock() + defer c.entriesLock.Unlock() + entry, ok := c.entries[key] + if ok && entry.GoroutineID == goroutineID { + entry.GoroutineID = 0 + c.entries[key] = entry + } + }() + + var attempt uint + for { + shouldStop, shouldBackoff := c.runBackgroundFetcherOnce(goroutineID, key, r) + if shouldStop { + return + } + + if shouldBackoff { + attempt++ + } else { + attempt = 0 + } + // If we're over the attempt minimum, start an exponential backoff. + wait := backOffWait(c.options, attempt) + + // If we have a timer, wait for it + wait += r.TypeEntry.Opts.RefreshTimer + + select { + case <-time.After(wait): + case <-c.stopCh: + return // Check if cache was stopped + } + + // Trigger. + r.Info.MustRevalidate = false + r.Info.MinIndex = 0 + + // We acquire a write lock because we may have to set Fetching to true. + c.entriesLock.Lock() + + entry, ok := c.entries[key] + if !ok || entry.GoroutineID != goroutineID { + // If we don't have an existing entry, return immediately. + // + // Also if we already have an entry and it is actively fetching, then + // return immediately. + // + // If we've somehow lost control of the entry, also return. + c.entriesLock.Unlock() + return + } + + c.entries[key] = entry + metrics.SetGauge([]string{"consul", "cache", "entries_count"}, float32(len(c.entries))) + metrics.SetGauge([]string{"cache", "entries_count"}, float32(len(c.entries))) + c.entriesLock.Unlock() + } +} + +func (c *Cache) runBackgroundFetcherOnce(goroutineID uint64, key string, r getOptions) (shouldStop, shouldBackoff bool) { + // Freshly re-read this, rather than relying upon the caller to fetch it + // and pass it in. + c.entriesLock.RLock() + entry, ok := c.entries[key] + c.entriesLock.RUnlock() + + if !ok || entry.GoroutineID != goroutineID { + // If we don't have an existing entry, return immediately. + // + // Also if something weird has happened to orphan this goroutine, also + // return immediately. + return true, false + } + tEntry := r.TypeEntry - - // The actual Fetch must be performed in a goroutine. Ensure that we only - // have one in-flight at a time, but don't use a deferred - // context.WithCancel style termination so that these things outlive their - // requester. - // - // By the time we get here the system WANTS to make a replacement fetcher, so - // we terminate the prior one and replace it. - handle := c.getOrReplaceFetchHandle(key) - go func(handle fetchHandle) { - defer c.deleteFetchHandle(key, handle.id) - + { // NOTE: this indentation is here to facilitate the PR review diff only // If we have background refresh and currently are in "disconnected" state, // waiting for a response might mean we mark our results as stale for up to // 10 minutes (max blocking timeout) after connection is restored. To reduce @@ -649,7 +728,7 @@ func (c *Cache) fetch(key string, r getOptions, allowNew bool, attempt uint, ign c.entriesLock.Lock() defer c.entriesLock.Unlock() entry, ok := c.entries[key] - if !ok || entry.RefreshLostContact.IsZero() { + if !ok || entry.RefreshLostContact.IsZero() || entry.GoroutineID != goroutineID { return } entry.RefreshLostContact = time.Time{} @@ -673,12 +752,15 @@ func (c *Cache) fetch(key string, r getOptions, allowNew bool, attempt uint, ign Index: entry.Index, } } + if err := entry.FetchRateLimiter.Wait(c.rateLimitContext); err != nil { if connectedTimer != nil { connectedTimer.Stop() } entry.Error = fmt.Errorf("rateLimitContext canceled: %s", err.Error()) - return + // NOTE: this can only happen when the entire cache is being + // shutdown and isn't something that can happen normally. + return true, false } // Start building the new entry by blocking on the fetch. result, err := r.Fetch(fOpts) @@ -686,17 +768,8 @@ func (c *Cache) fetch(key string, r getOptions, allowNew bool, attempt uint, ign connectedTimer.Stop() } - // If we were stopped while waiting on a blocking query now would be a - // good time to detect that. - select { - case <-handle.stopCh: - return - default: - } - // Copy the existing entry to start. newEntry := entry - newEntry.Fetching = false // Importantly, always reset the Error. Having both Error and a Value that // are non-nil is allowed in the cache entry but it indicates that the Error @@ -752,7 +825,7 @@ func (c *Cache) fetch(key string, r getOptions, allowNew bool, attempt uint, ign if result.Index > 0 { // Reset the attempts counter so we don't have any backoff - attempt = 0 + shouldBackoff = false } else { // Result having a zero index is an implicit error case. There was no // actual error but it implies the RPC found in index (nothing written @@ -767,7 +840,7 @@ func (c *Cache) fetch(key string, r getOptions, allowNew bool, attempt uint, ign // state it can be considered a bug in the RPC implementation (to ever // return a zero index) however since it can happen this is a safety net // for the future. - attempt++ + shouldBackoff = true } // If we have refresh active, this successful response means cache is now @@ -787,7 +860,7 @@ func (c *Cache) fetch(key string, r getOptions, allowNew bool, attempt uint, ign metrics.IncrCounterWithLabels([]string{"cache", tEntry.Name, "fetch_error"}, 1, labels) // Increment attempt counter - attempt++ + shouldBackoff = true // If we are refreshing and just failed, updated the lost contact time as // our cache will be stale until we get successfully reconnected. We only @@ -804,7 +877,7 @@ func (c *Cache) fetch(key string, r getOptions, allowNew bool, attempt uint, ign // Set our entry c.entriesLock.Lock() - if _, ok := c.entries[key]; !ok { + if currEntry, ok := c.entries[key]; !ok || currEntry.GoroutineID != goroutineID { // This entry was evicted during our fetch. DON'T re-insert it or fall // through to the refresh loop below otherwise it will live forever! In // theory there should not be any Get calls waiting on entry.Waiter since @@ -817,7 +890,7 @@ func (c *Cache) fetch(key string, r getOptions, allowNew bool, attempt uint, ign // Trigger any waiters that are around. close(entry.Waiter) - return + return true, false } // If this is a new entry (not in the heap yet), then setup the @@ -842,79 +915,22 @@ func (c *Cache) fetch(key string, r getOptions, allowNew bool, attempt uint, ign // request back up again shortly but in the general case this prevents // spamming the logs with tons of ACL not found errors for days. if tEntry.Opts.Refresh && !preventRefresh { - // Check if cache was stopped - if atomic.LoadUint32(&c.stopped) == 1 { - return - } - - // If we're over the attempt minimum, start an exponential backoff. - wait := backOffWait(attempt) - - // If we have a timer, wait for it - wait += tEntry.Opts.RefreshTimer - - select { - case <-time.After(wait): - case <-handle.stopCh: - return - } - - // Trigger. The "allowNew" field is false because in the time we were - // waiting to refresh we may have expired and got evicted. If that - // happened, we don't want to create a new entry. - r.Info.MustRevalidate = false - r.Info.MinIndex = 0 - c.fetch(key, r, false, attempt, true) + return false, shouldBackoff } - }(handle) + } - return entry.Waiter + return true, false } -func (c *Cache) getOrReplaceFetchHandle(key string) fetchHandle { - c.fetchLock.Lock() - defer c.fetchLock.Unlock() - - if prevHandle, ok := c.fetchHandles[key]; ok { - close(prevHandle.stopCh) - } - - c.lastFetchID++ - - handle := fetchHandle{ - id: c.lastFetchID, - stopCh: make(chan struct{}), - } - - c.fetchHandles[key] = handle - - return handle -} - -func (c *Cache) deleteFetchHandle(key string, fetchID uint64) { - c.fetchLock.Lock() - defer c.fetchLock.Unlock() - - // Only remove a fetchHandle if it's YOUR fetchHandle. - handle, ok := c.fetchHandles[key] - if !ok { - return - } - - if handle.id == fetchID { - delete(c.fetchHandles, key) - } -} - -func backOffWait(failures uint) time.Duration { - if failures > CacheRefreshBackoffMin { - shift := failures - CacheRefreshBackoffMin - waitTime := CacheRefreshMaxWait +func backOffWait(opts Options, failures uint) time.Duration { + if failures > opts.CacheRefreshBackoffMin { + shift := failures - opts.CacheRefreshBackoffMin + waitTime := opts.CacheRefreshMaxWait if shift < 31 { waitTime = (1 << shift) * time.Second } - if waitTime > CacheRefreshMaxWait { - waitTime = CacheRefreshMaxWait + if waitTime > opts.CacheRefreshMaxWait { + waitTime = opts.CacheRefreshMaxWait } return waitTime + lib.RandomStagger(waitTime) } diff --git a/agent/cache/cache_test.go b/agent/cache/cache_test.go index 6f8805be0..98b04ee9a 100644 --- a/agent/cache/cache_test.go +++ b/agent/cache/cache_test.go @@ -18,7 +18,6 @@ import ( "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/lib/ttlcache" "github.com/hashicorp/consul/sdk/testutil" - "github.com/hashicorp/consul/sdk/testutil/retry" ) // Test a basic Get with no indexes (and therefore no blocking queries). @@ -1751,22 +1750,12 @@ func TestCache_RefreshLifeCycle(t *testing.T) { require.NoError(t, err) require.Equal(t, true, result) - waitUntilFetching := func(expectValue bool) { - retry.Run(t, func(t *retry.R) { - c.entriesLock.Lock() - defer c.entriesLock.Unlock() - entry, ok := c.entries[key] - require.True(t, ok) - if expectValue { - require.True(t, entry.Fetching) - } else { - require.False(t, entry.Fetching) - } - }) - } - // ensure that the entry is fetching again - waitUntilFetching(true) + c.entriesLock.Lock() + entry, ok := c.entries[key] + require.True(t, ok) + require.True(t, entry.GoroutineID > 0) + c.entriesLock.Unlock() requestChan := make(chan error) @@ -1800,7 +1789,11 @@ func TestCache_RefreshLifeCycle(t *testing.T) { } // ensure that the entry is fetching again - waitUntilFetching(true) + c.entriesLock.Lock() + entry, ok = c.entries[key] + require.True(t, ok) + require.True(t, entry.GoroutineID > 0) + c.entriesLock.Unlock() // background a call that will wait for a newer version - will result in an acl not found error go getError(5) @@ -1821,7 +1814,11 @@ func TestCache_RefreshLifeCycle(t *testing.T) { // ensure that the ACL not found error killed off the background refresh // but didn't remove it from the cache - waitUntilFetching(false) + c.entriesLock.Lock() + entry, ok = c.entries[key] + require.True(t, ok) + require.False(t, entry.GoroutineID > 0) + c.entriesLock.Unlock() } type fakeType struct { diff --git a/agent/cache/entry.go b/agent/cache/entry.go index 0c71e9443..7130381de 100644 --- a/agent/cache/entry.go +++ b/agent/cache/entry.go @@ -26,9 +26,9 @@ type cacheEntry struct { Index uint64 // Metadata that is used for internal accounting - Valid bool // True if the Value is set - Fetching bool // True if a fetch is already active - Waiter chan struct{} // Closed when this entry is invalidated + Valid bool // True if the Value is set + GoroutineID uint64 // Nonzero if a fetch goroutine is running. + Waiter chan struct{} // Closed when this entry is invalidated // Expiry contains information about the expiration of this // entry. This is a pointer as its shared as a value in the diff --git a/agent/cache/watch.go b/agent/cache/watch.go index f99f85c04..a981c01e4 100644 --- a/agent/cache/watch.go +++ b/agent/cache/watch.go @@ -136,7 +136,7 @@ func (c *Cache) notifyBlockingQuery(ctx context.Context, r getOptions, correlati failures = 0 } else { failures++ - wait = backOffWait(failures) + wait = backOffWait(c.options, failures) c.options.Logger. With("error", err). @@ -223,7 +223,7 @@ func (c *Cache) notifyPollingQuery(ctx context.Context, r getOptions, correlatio // as this would eliminate the single-flighting of these requests in the cache and // the efficiencies gained by it. if failures > 0 { - wait = backOffWait(failures) + wait = backOffWait(c.options, failures) } else { // Calculate when the cached data's Age will get too stale and // need to be re-queried. When the data's Age already exceeds the diff --git a/agent/config/testdata/TestRuntimeConfig_Sanitize.golden b/agent/config/testdata/TestRuntimeConfig_Sanitize.golden index a4c35e4b9..23aff3a88 100644 --- a/agent/config/testdata/TestRuntimeConfig_Sanitize.golden +++ b/agent/config/testdata/TestRuntimeConfig_Sanitize.golden @@ -78,6 +78,8 @@ "BootstrapExpect": 0, "BuildDate": "2019-11-20 05:00:00 +0000 UTC", "Cache": { + "CacheRefreshBackoffMin": 0, + "CacheRefreshMaxWait": "0s", "EntryFetchMaxBurst": 42, "EntryFetchRate": 0.334, "Logger": null diff --git a/agent/testagent.go b/agent/testagent.go index 7cffe9225..9642fca66 100644 --- a/agent/testagent.go +++ b/agent/testagent.go @@ -216,6 +216,9 @@ func (a *TestAgent) Start(t *testing.T) error { } else { result.RuntimeConfig.Telemetry.Disable = true } + // Lower the maximum backoff period of a cache refresh just for + // tests see #14956 for more. + result.RuntimeConfig.Cache.CacheRefreshMaxWait = 1 * time.Second } return result, err }