diff --git a/.changelog/16818.txt b/.changelog/16818.txt new file mode 100644 index 000000000..665c11034 --- /dev/null +++ b/.changelog/16818.txt @@ -0,0 +1,3 @@ +```release-note:bug +cache: revert cache refactor which could cause blocking queries to never return +``` diff --git a/agent/agent_test.go b/agent/agent_test.go index 441b93966..a0f688f5f 100644 --- a/agent/agent_test.go +++ b/agent/agent_test.go @@ -4823,19 +4823,19 @@ services { deadlineCh := time.After(10 * time.Second) start := time.Now() +LOOP: for { select { case evt := <-ch: // We may receive several notifications of an error until we get the // first successful reply. require.Equal(t, "foo", evt.CorrelationID) - if evt.Err == nil { - require.NoError(t, evt.Err) - require.NotNil(t, evt.Result) - t.Logf("took %s to get first success", time.Since(start)) - return + if evt.Err != nil { + break LOOP } - t.Logf("saw error: %v", evt.Err) + require.NoError(t, evt.Err) + require.NotNil(t, evt.Result) + t.Logf("took %s to get first success", time.Since(start)) case <-deadlineCh: t.Fatal("did not get notified successfully") } diff --git a/agent/cache/cache.go b/agent/cache/cache.go index 4d9ed01fb..ed1e4f911 100644 --- a/agent/cache/cache.go +++ b/agent/cache/cache.go @@ -87,8 +87,8 @@ var Counters = []prometheus.CounterDefinition{ // Constants related to refresh backoff. We probably don't ever need to // make these configurable knobs since they primarily exist to lower load. const ( - DefaultCacheRefreshBackoffMin = 3 // 3 attempts before backing off - DefaultCacheRefreshMaxWait = 1 * time.Minute // maximum backoff wait time + CacheRefreshBackoffMin = 3 // 3 attempts before backing off + CacheRefreshMaxWait = 1 * time.Minute // maximum backoff wait time // The following constants are default values for the cache entry // rate limiter settings. @@ -141,7 +141,10 @@ type Cache struct { entriesLock sync.RWMutex entries map[string]cacheEntry entriesExpiryHeap *ttlcache.ExpiryHeap - lastGoroutineID uint64 + + fetchLock sync.Mutex + lastFetchID uint64 + fetchHandles map[string]fetchHandle // stopped is used as an atomic flag to signal that the Cache has been // discarded so background fetches and expiry processing should stop. @@ -154,6 +157,11 @@ type Cache struct { rateLimitCancel context.CancelFunc } +type fetchHandle struct { + id uint64 + stopCh chan struct{} +} + // typeEntry is a single type that is registered with a Cache. type typeEntry struct { // Name that was used to register the Type @@ -199,13 +207,6 @@ type Options struct { EntryFetchMaxBurst int // EntryFetchRate represents the max calls/sec for a single cache entry EntryFetchRate rate.Limit - - // CacheRefreshBackoffMin is the number of attempts to wait before backing off. - // Mostly configurable just for testing. - CacheRefreshBackoffMin uint - // CacheRefreshMaxWait is the maximum backoff wait time. - // Mostly configurable just for testing. - CacheRefreshMaxWait time.Duration } // Equal return true if both options are equivalent @@ -221,12 +222,6 @@ func applyDefaultValuesOnOptions(options Options) Options { if options.EntryFetchMaxBurst == 0 { options.EntryFetchMaxBurst = DefaultEntryFetchMaxBurst } - if options.CacheRefreshBackoffMin == 0 { - options.CacheRefreshBackoffMin = DefaultCacheRefreshBackoffMin - } - if options.CacheRefreshMaxWait == 0 { - options.CacheRefreshMaxWait = DefaultCacheRefreshMaxWait - } if options.Logger == nil { options.Logger = hclog.New(nil) } @@ -242,6 +237,7 @@ func New(options Options) *Cache { types: make(map[string]typeEntry), entries: make(map[string]cacheEntry), entriesExpiryHeap: ttlcache.NewExpiryHeap(), + fetchHandles: make(map[string]fetchHandle), stopCh: make(chan struct{}), options: options, rateLimitContext: ctx, @@ -411,23 +407,11 @@ func (c *Cache) getEntryLocked( // Check if re-validate is requested. If so the first time round the // loop is not a hit but subsequent ones should be treated normally. if !tEntry.Opts.Refresh && info.MustRevalidate { - // It is important to note that this block ONLY applies when we are not - // in indefinite refresh mode (where the underlying goroutine will - // continue to re-query for data). - // - // In this mode goroutines have a 1:1 relationship to RPCs that get - // executed, and importantly they DO NOT SLEEP after executing. - // - // This means that a running goroutine for this cache entry extremely - // strongly implies that the RPC has not yet completed, which is why - // this check works for the revalidation-avoidance optimization here. - if entry.GoroutineID != 0 { - // There is an active goroutine performing a blocking query for - // this data, which has not returned. - // - // We can logically deduce that the contents of the cache are - // actually current, and we can simply return this while leaving - // the blocking query alone. + if entry.Fetching { + // There is an active blocking query for this data, which has not + // returned. We can logically deduce that the contents of the cache + // are actually current, and we can simply return this while + // leaving the blocking query alone. return true, true, entry } return true, false, entry @@ -557,7 +541,7 @@ RETRY_GET: // At this point, we know we either don't have a value at all or the // value we have is too old. We need to wait for new data. - waiterCh := c.fetch(key, r) + waiterCh := c.fetch(key, r, true, 0, false) // No longer our first time through first = false @@ -584,36 +568,46 @@ func makeEntryKey(t, dc, peerName, token, key string) string { return fmt.Sprintf("%s/%s/%s/%s", t, dc, token, key) } -// fetch triggers a new background fetch for the given Request. If a background -// fetch is already running or a goroutine to manage that still exists for a -// matching Request, the waiter channel for that request is returned. The -// effect of this is that there is only ever one blocking query and goroutine -// for any matching requests. -func (c *Cache) fetch(key string, r getOptions) <-chan struct{} { +// fetch triggers a new background fetch for the given Request. If a +// background fetch is already running for a matching Request, the waiter +// channel for that request is returned. The effect of this is that there +// is only ever one blocking query for any matching requests. +// +// If allowNew is true then the fetch should create the cache entry +// if it doesn't exist. If this is false, then fetch will do nothing +// if the entry doesn't exist. This latter case is to support refreshing. +func (c *Cache) fetch(key string, r getOptions, allowNew bool, attempt uint, ignoreExisting bool) <-chan struct{} { + // We acquire a write lock because we may have to set Fetching to true. c.entriesLock.Lock() defer c.entriesLock.Unlock() - ok, entryValid, entry := c.getEntryLocked(r.TypeEntry, key, r.Info) - switch { - case ok && entryValid: - // This handles the case where a fetch succeeded after checking for its - // existence in getWithIndex. This ensures that we don't miss updates. + // This handles the case where a fetch succeeded after checking for its existence in + // getWithIndex. This ensures that we don't miss updates. + if ok && entryValid && !ignoreExisting { ch := make(chan struct{}) close(ch) return ch + } - case ok && entry.GoroutineID != 0: - // If we already have an entry and there's a goroutine to keep it - // refreshed then don't spawn another one to do the same work. - // - // Return the currently active waiter. + // If we aren't allowing new values and we don't have an existing value, + // return immediately. We return an immediately-closed channel so nothing + // blocks. + if !ok && !allowNew { + ch := make(chan struct{}) + close(ch) + return ch + } + + // If we already have an entry and it is actively fetching, then return + // the currently active waiter. + if ok && entry.Fetching { return entry.Waiter + } - case !ok: - // If we don't have an entry, then create it. The entry must be marked - // as invalid so that it isn't returned as a valid value for a zero - // index. + // If we don't have an entry, then create it. The entry must be marked + // as invalid so that it isn't returned as a valid value for a zero index. + if !ok { entry = cacheEntry{ Valid: false, Waiter: make(chan struct{}), @@ -624,100 +618,27 @@ func (c *Cache) fetch(key string, r getOptions) <-chan struct{} { } } - // Assign each background fetching goroutine a unique ID and fingerprint - // the cache entry with the same ID. This way if the cache entry is ever - // cleaned up due to expiry and later recreated the old goroutine can - // detect that and terminate rather than leak and do double work. - c.lastGoroutineID++ - entry.GoroutineID = c.lastGoroutineID + // Set that we're fetching to true, which makes it so that future + // identical calls to fetch will return the same waiter rather than + // perform multiple fetches. + entry.Fetching = true c.entries[key] = entry metrics.SetGauge([]string{"consul", "cache", "entries_count"}, float32(len(c.entries))) metrics.SetGauge([]string{"cache", "entries_count"}, float32(len(c.entries))) - // The actual Fetch must be performed in a goroutine. - go c.launchBackgroundFetcher(entry.GoroutineID, key, r) - - return entry.Waiter -} - -func (c *Cache) launchBackgroundFetcher(goroutineID uint64, key string, r getOptions) { - defer func() { - c.entriesLock.Lock() - defer c.entriesLock.Unlock() - entry, ok := c.entries[key] - if ok && entry.GoroutineID == goroutineID { - entry.GoroutineID = 0 - c.entries[key] = entry - } - }() - - var attempt uint - for { - shouldStop, shouldBackoff := c.runBackgroundFetcherOnce(goroutineID, key, r) - if shouldStop { - return - } - - if shouldBackoff { - attempt++ - } else { - attempt = 0 - } - // If we're over the attempt minimum, start an exponential backoff. - wait := backOffWait(c.options, attempt) - - // If we have a timer, wait for it - wait += r.TypeEntry.Opts.RefreshTimer - - select { - case <-time.After(wait): - case <-c.stopCh: - return // Check if cache was stopped - } - - // Trigger. - r.Info.MustRevalidate = false - r.Info.MinIndex = 0 - - // We acquire a write lock because we may have to set Fetching to true. - c.entriesLock.Lock() - - entry, ok := c.entries[key] - if !ok || entry.GoroutineID != goroutineID { - // If we don't have an existing entry, return immediately. - // - // Also if we already have an entry and it is actively fetching, then - // return immediately. - // - // If we've somehow lost control of the entry, also return. - c.entriesLock.Unlock() - return - } - - c.entries[key] = entry - metrics.SetGauge([]string{"consul", "cache", "entries_count"}, float32(len(c.entries))) - metrics.SetGauge([]string{"cache", "entries_count"}, float32(len(c.entries))) - c.entriesLock.Unlock() - } -} - -func (c *Cache) runBackgroundFetcherOnce(goroutineID uint64, key string, r getOptions) (shouldStop, shouldBackoff bool) { - // Freshly re-read this, rather than relying upon the caller to fetch it - // and pass it in. - c.entriesLock.RLock() - entry, ok := c.entries[key] - c.entriesLock.RUnlock() - - if !ok || entry.GoroutineID != goroutineID { - // If we don't have an existing entry, return immediately. - // - // Also if something weird has happened to orphan this goroutine, also - // return immediately. - return true, false - } - tEntry := r.TypeEntry - { // NOTE: this indentation is here to facilitate the PR review diff only + + // The actual Fetch must be performed in a goroutine. Ensure that we only + // have one in-flight at a time, but don't use a deferred + // context.WithCancel style termination so that these things outlive their + // requester. + // + // By the time we get here the system WANTS to make a replacement fetcher, so + // we terminate the prior one and replace it. + handle := c.getOrReplaceFetchHandle(key) + go func(handle fetchHandle) { + defer c.deleteFetchHandle(key, handle.id) + // If we have background refresh and currently are in "disconnected" state, // waiting for a response might mean we mark our results as stale for up to // 10 minutes (max blocking timeout) after connection is restored. To reduce @@ -731,7 +652,7 @@ func (c *Cache) runBackgroundFetcherOnce(goroutineID uint64, key string, r getOp c.entriesLock.Lock() defer c.entriesLock.Unlock() entry, ok := c.entries[key] - if !ok || entry.RefreshLostContact.IsZero() || entry.GoroutineID != goroutineID { + if !ok || entry.RefreshLostContact.IsZero() { return } entry.RefreshLostContact = time.Time{} @@ -755,15 +676,12 @@ func (c *Cache) runBackgroundFetcherOnce(goroutineID uint64, key string, r getOp Index: entry.Index, } } - if err := entry.FetchRateLimiter.Wait(c.rateLimitContext); err != nil { if connectedTimer != nil { connectedTimer.Stop() } entry.Error = fmt.Errorf("rateLimitContext canceled: %s", err.Error()) - // NOTE: this can only happen when the entire cache is being - // shutdown and isn't something that can happen normally. - return true, false + return } // Start building the new entry by blocking on the fetch. result, err := r.Fetch(fOpts) @@ -771,8 +689,17 @@ func (c *Cache) runBackgroundFetcherOnce(goroutineID uint64, key string, r getOp connectedTimer.Stop() } + // If we were stopped while waiting on a blocking query now would be a + // good time to detect that. + select { + case <-handle.stopCh: + return + default: + } + // Copy the existing entry to start. newEntry := entry + newEntry.Fetching = false // Importantly, always reset the Error. Having both Error and a Value that // are non-nil is allowed in the cache entry but it indicates that the Error @@ -828,7 +755,7 @@ func (c *Cache) runBackgroundFetcherOnce(goroutineID uint64, key string, r getOp if result.Index > 0 { // Reset the attempts counter so we don't have any backoff - shouldBackoff = false + attempt = 0 } else { // Result having a zero index is an implicit error case. There was no // actual error but it implies the RPC found in index (nothing written @@ -843,7 +770,7 @@ func (c *Cache) runBackgroundFetcherOnce(goroutineID uint64, key string, r getOp // state it can be considered a bug in the RPC implementation (to ever // return a zero index) however since it can happen this is a safety net // for the future. - shouldBackoff = true + attempt++ } // If we have refresh active, this successful response means cache is now @@ -863,7 +790,7 @@ func (c *Cache) runBackgroundFetcherOnce(goroutineID uint64, key string, r getOp metrics.IncrCounterWithLabels([]string{"cache", tEntry.Name, "fetch_error"}, 1, labels) // Increment attempt counter - shouldBackoff = true + attempt++ // If we are refreshing and just failed, updated the lost contact time as // our cache will be stale until we get successfully reconnected. We only @@ -880,7 +807,7 @@ func (c *Cache) runBackgroundFetcherOnce(goroutineID uint64, key string, r getOp // Set our entry c.entriesLock.Lock() - if currEntry, ok := c.entries[key]; !ok || currEntry.GoroutineID != goroutineID { + if _, ok := c.entries[key]; !ok { // This entry was evicted during our fetch. DON'T re-insert it or fall // through to the refresh loop below otherwise it will live forever! In // theory there should not be any Get calls waiting on entry.Waiter since @@ -893,7 +820,7 @@ func (c *Cache) runBackgroundFetcherOnce(goroutineID uint64, key string, r getOp // Trigger any waiters that are around. close(entry.Waiter) - return true, false + return } // If this is a new entry (not in the heap yet), then setup the @@ -918,22 +845,79 @@ func (c *Cache) runBackgroundFetcherOnce(goroutineID uint64, key string, r getOp // request back up again shortly but in the general case this prevents // spamming the logs with tons of ACL not found errors for days. if tEntry.Opts.Refresh && !preventRefresh { - return false, shouldBackoff - } - } + // Check if cache was stopped + if atomic.LoadUint32(&c.stopped) == 1 { + return + } - return true, false + // If we're over the attempt minimum, start an exponential backoff. + wait := backOffWait(attempt) + + // If we have a timer, wait for it + wait += tEntry.Opts.RefreshTimer + + select { + case <-time.After(wait): + case <-handle.stopCh: + return + } + + // Trigger. The "allowNew" field is false because in the time we were + // waiting to refresh we may have expired and got evicted. If that + // happened, we don't want to create a new entry. + r.Info.MustRevalidate = false + r.Info.MinIndex = 0 + c.fetch(key, r, false, attempt, true) + } + }(handle) + + return entry.Waiter } -func backOffWait(opts Options, failures uint) time.Duration { - if failures > opts.CacheRefreshBackoffMin { - shift := failures - opts.CacheRefreshBackoffMin - waitTime := opts.CacheRefreshMaxWait +func (c *Cache) getOrReplaceFetchHandle(key string) fetchHandle { + c.fetchLock.Lock() + defer c.fetchLock.Unlock() + + if prevHandle, ok := c.fetchHandles[key]; ok { + close(prevHandle.stopCh) + } + + c.lastFetchID++ + + handle := fetchHandle{ + id: c.lastFetchID, + stopCh: make(chan struct{}), + } + + c.fetchHandles[key] = handle + + return handle +} + +func (c *Cache) deleteFetchHandle(key string, fetchID uint64) { + c.fetchLock.Lock() + defer c.fetchLock.Unlock() + + // Only remove a fetchHandle if it's YOUR fetchHandle. + handle, ok := c.fetchHandles[key] + if !ok { + return + } + + if handle.id == fetchID { + delete(c.fetchHandles, key) + } +} + +func backOffWait(failures uint) time.Duration { + if failures > CacheRefreshBackoffMin { + shift := failures - CacheRefreshBackoffMin + waitTime := CacheRefreshMaxWait if shift < 31 { waitTime = (1 << shift) * time.Second } - if waitTime > opts.CacheRefreshMaxWait { - waitTime = opts.CacheRefreshMaxWait + if waitTime > CacheRefreshMaxWait { + waitTime = CacheRefreshMaxWait } return waitTime + lib.RandomStagger(waitTime) } diff --git a/agent/cache/cache_test.go b/agent/cache/cache_test.go index 56413e2d5..4ab66a29d 100644 --- a/agent/cache/cache_test.go +++ b/agent/cache/cache_test.go @@ -21,6 +21,7 @@ import ( "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/lib/ttlcache" "github.com/hashicorp/consul/sdk/testutil" + "github.com/hashicorp/consul/sdk/testutil/retry" ) // Test a basic Get with no indexes (and therefore no blocking queries). @@ -1753,12 +1754,22 @@ func TestCache_RefreshLifeCycle(t *testing.T) { require.NoError(t, err) require.Equal(t, true, result) + waitUntilFetching := func(expectValue bool) { + retry.Run(t, func(t *retry.R) { + c.entriesLock.Lock() + defer c.entriesLock.Unlock() + entry, ok := c.entries[key] + require.True(t, ok) + if expectValue { + require.True(t, entry.Fetching) + } else { + require.False(t, entry.Fetching) + } + }) + } + // ensure that the entry is fetching again - c.entriesLock.Lock() - entry, ok := c.entries[key] - require.True(t, ok) - require.True(t, entry.GoroutineID > 0) - c.entriesLock.Unlock() + waitUntilFetching(true) requestChan := make(chan error) @@ -1792,11 +1803,7 @@ func TestCache_RefreshLifeCycle(t *testing.T) { } // ensure that the entry is fetching again - c.entriesLock.Lock() - entry, ok = c.entries[key] - require.True(t, ok) - require.True(t, entry.GoroutineID > 0) - c.entriesLock.Unlock() + waitUntilFetching(true) // background a call that will wait for a newer version - will result in an acl not found error go getError(5) @@ -1817,11 +1824,7 @@ func TestCache_RefreshLifeCycle(t *testing.T) { // ensure that the ACL not found error killed off the background refresh // but didn't remove it from the cache - c.entriesLock.Lock() - entry, ok = c.entries[key] - require.True(t, ok) - require.False(t, entry.GoroutineID > 0) - c.entriesLock.Unlock() + waitUntilFetching(false) } type fakeType struct { diff --git a/agent/cache/entry.go b/agent/cache/entry.go index b17c008fe..fb8008d8c 100644 --- a/agent/cache/entry.go +++ b/agent/cache/entry.go @@ -29,9 +29,9 @@ type cacheEntry struct { Index uint64 // Metadata that is used for internal accounting - Valid bool // True if the Value is set - GoroutineID uint64 // Nonzero if a fetch goroutine is running. - Waiter chan struct{} // Closed when this entry is invalidated + Valid bool // True if the Value is set + Fetching bool // True if a fetch is already active + Waiter chan struct{} // Closed when this entry is invalidated // Expiry contains information about the expiration of this // entry. This is a pointer as its shared as a value in the diff --git a/agent/cache/watch.go b/agent/cache/watch.go index 5bea14d90..d8693ad03 100644 --- a/agent/cache/watch.go +++ b/agent/cache/watch.go @@ -140,7 +140,7 @@ func (c *Cache) notifyBlockingQuery(ctx context.Context, r getOptions, correlati failures = 0 } else { failures++ - wait = backOffWait(c.options, failures) + wait = backOffWait(failures) c.options.Logger. With("error", err). @@ -227,7 +227,7 @@ func (c *Cache) notifyPollingQuery(ctx context.Context, r getOptions, correlatio // as this would eliminate the single-flighting of these requests in the cache and // the efficiencies gained by it. if failures > 0 { - wait = backOffWait(c.options, failures) + wait = backOffWait(failures) } else { // Calculate when the cached data's Age will get too stale and // need to be re-queried. When the data's Age already exceeds the diff --git a/agent/config/testdata/TestRuntimeConfig_Sanitize.golden b/agent/config/testdata/TestRuntimeConfig_Sanitize.golden index 37c3e692f..054a284ae 100644 --- a/agent/config/testdata/TestRuntimeConfig_Sanitize.golden +++ b/agent/config/testdata/TestRuntimeConfig_Sanitize.golden @@ -79,8 +79,6 @@ "BootstrapExpect": 0, "BuildDate": "2019-11-20 05:00:00 +0000 UTC", "Cache": { - "CacheRefreshBackoffMin": 0, - "CacheRefreshMaxWait": "0s", "EntryFetchMaxBurst": 42, "EntryFetchRate": 0.334, "Logger": null diff --git a/agent/testagent.go b/agent/testagent.go index 499f4d4dc..57b9c1b11 100644 --- a/agent/testagent.go +++ b/agent/testagent.go @@ -214,9 +214,6 @@ func (a *TestAgent) Start(t *testing.T) error { } else { result.RuntimeConfig.Telemetry.Disable = true } - // Lower the maximum backoff period of a cache refresh just for - // tests see #14956 for more. - result.RuntimeConfig.Cache.CacheRefreshMaxWait = 1 * time.Second // Lower the resync interval for tests. result.RuntimeConfig.LocalProxyConfigResyncInterval = 250 * time.Millisecond