From ae5c0aad39c710381162f8ec9cf68810919da81f Mon Sep 17 00:00:00 2001 From: Paul Banks Date: Thu, 8 Apr 2021 11:11:15 +0100 Subject: [PATCH] cache: Fix bug where connection errors can cause early cache expiry (#9979) Fixes a cache bug where TTL is not updated while a value isn't changing or cache entry is returning fetch errors. --- .changelog/9979.txt | 4 +++ agent/cache/cache.go | 26 ++++++++++++--- agent/cache/cache_test.go | 66 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 91 insertions(+), 5 deletions(-) create mode 100644 .changelog/9979.txt diff --git a/.changelog/9979.txt b/.changelog/9979.txt new file mode 100644 index 000000000..10880576b --- /dev/null +++ b/.changelog/9979.txt @@ -0,0 +1,4 @@ +```release-note:bug +cache: fix a bug in the client agent cache where streaming would disconnect every +20 minutes and cause delivery delays. [[GH-9979](https://github.com/hashicorp/consul/pull/9979)]. +``` diff --git a/agent/cache/cache.go b/agent/cache/cache.go index a73db9b4e..d104cdd3b 100644 --- a/agent/cache/cache.go +++ b/agent/cache/cache.go @@ -413,6 +413,27 @@ RETRY_GET: _, entryValid, entry := c.getEntryLocked(r.TypeEntry, key, r.Info) c.entriesLock.RUnlock() + if entry.Expiry != nil { + // The entry already exists in the TTL heap, touch it to keep it alive since + // this Get is still interested in the value. Note that we used to only do + // this in the `entryValid` block below but that means that a cache entry + // will expire after it's TTL regardless of how many callers are waiting for + // updates in this method in a couple of cases: + // 1. If the agent is disconnected from servers for the TTL then the client + // will be in backoff getting errors on each call to Get and since an + // errored cache entry has Valid = false it won't be touching the TTL. + // 2. If the value is just not changing then the client's current index + // will be equal to the entry index and entryValid will be false. This + // is a common case! + // + // But regardless of the state of the entry, assuming it's already in the + // TTL heap, we should touch it every time around here since this caller at + // least still cares about the value! + c.entriesLock.Lock() + c.entriesExpiryHeap.Update(entry.Expiry.Index(), r.TypeEntry.Opts.LastGetTTL) + c.entriesLock.Unlock() + } + if entryValid { meta := ResultMeta{Index: entry.Index} if first { @@ -435,11 +456,6 @@ RETRY_GET: } } - // Touch the expiration and fix the heap. - c.entriesLock.Lock() - c.entriesExpiryHeap.Update(entry.Expiry.Index(), r.TypeEntry.Opts.LastGetTTL) - c.entriesLock.Unlock() - // We purposely do not return an error here since the cache only works with // fetching values that either have a value or have an error, but not both. // The Error may be non-nil in the entry in the case that an error has diff --git a/agent/cache/cache_test.go b/agent/cache/cache_test.go index a21c7c7a0..c986f19fd 100644 --- a/agent/cache/cache_test.go +++ b/agent/cache/cache_test.go @@ -1042,6 +1042,72 @@ func TestCacheGet_expireResetGet(t *testing.T) { typ.AssertExpectations(t) } +// Test that entries reset their TTL on Get even when the value isn't changing +func TestCacheGet_expireResetGetNoChange(t *testing.T) { + t.Parallel() + + require := require.New(t) + + // Create a closer so we can tell if the entry gets evicted. + closer := &testCloser{} + + typ := &MockType{} + typ.On("RegisterOptions").Return(RegisterOptions{ + LastGetTTL: 150 * time.Millisecond, + SupportsBlocking: true, + Refresh: true, + }) + typ.On("Fetch", mock.Anything, mock.Anything). + Return(func(o FetchOptions, r Request) FetchResult { + if o.MinIndex == 10 { + // Simulate a very fast timeout from the backend. This must be shorter + // than the TTL above (as it would be in real life) so that fetch returns + // a few times with the same value which _should_ cause the blocking watch + // to go round the Get loop and so keep the cache entry from being + // evicted. + time.Sleep(10 * time.Millisecond) + } + return FetchResult{Value: 42, Index: 10, State: closer} + }, func(o FetchOptions, r Request) error { + return nil + }) + defer typ.AssertExpectations(t) + c := New(Options{}) + + // Register the type with a timeout + c.RegisterType("t", typ) + + // Get, should fetch + req := TestRequest(t, RequestInfo{Key: "hello"}) + result, meta, err := c.Get(context.Background(), "t", req) + require.NoError(err) + require.Equal(42, result) + require.Equal(uint64(10), meta.Index) + require.False(meta.Hit) + + // Do a blocking watch of the value that won't time out until after the TTL. + start := time.Now() + req = TestRequest(t, RequestInfo{Key: "hello", MinIndex: 10, Timeout: 300 * time.Millisecond}) + result, meta, err = c.Get(context.Background(), "t", req) + require.NoError(err) + require.Equal(42, result) + require.Equal(uint64(10), meta.Index) + require.GreaterOrEqual(time.Since(start).Milliseconds(), int64(300)) + + // This is the point of this test! Even though we waited for a change for + // longer than the TTL, we should have been updating the TTL so that the cache + // entry should not have been evicted. We can't verify that with meta.Hit + // since that is not set for blocking Get calls but we can assert that the + // entry was never closed (which assuming the test for eviction closing is + // also passing is a reliable signal). + require.False(closer.isClosed(), "cache entry should not have been evicted") + + // Sleep a tiny bit just to let maybe some background calls happen + // then verify that we still only got the one call + time.Sleep(20 * time.Millisecond) + typ.AssertExpectations(t) +} + // Test that entries with state that satisfies io.Closer get cleaned up func TestCacheGet_expireClose(t *testing.T) { if testing.Short() {