cache: Fix bug where connection errors can cause early cache expiry (#9979)
Fixes a cache bug where TTL is not updated while a value isn't changing or cache entry is returning fetch errors.
This commit is contained in:
parent
b61e00b772
commit
ae5c0aad39
|
@ -0,0 +1,4 @@
|
|||
```release-note:bug
|
||||
cache: fix a bug in the client agent cache where streaming would disconnect every
|
||||
20 minutes and cause delivery delays. [[GH-9979](https://github.com/hashicorp/consul/pull/9979)].
|
||||
```
|
|
@ -413,6 +413,27 @@ RETRY_GET:
|
|||
_, entryValid, entry := c.getEntryLocked(r.TypeEntry, key, r.Info)
|
||||
c.entriesLock.RUnlock()
|
||||
|
||||
if entry.Expiry != nil {
|
||||
// The entry already exists in the TTL heap, touch it to keep it alive since
|
||||
// this Get is still interested in the value. Note that we used to only do
|
||||
// this in the `entryValid` block below but that means that a cache entry
|
||||
// will expire after it's TTL regardless of how many callers are waiting for
|
||||
// updates in this method in a couple of cases:
|
||||
// 1. If the agent is disconnected from servers for the TTL then the client
|
||||
// will be in backoff getting errors on each call to Get and since an
|
||||
// errored cache entry has Valid = false it won't be touching the TTL.
|
||||
// 2. If the value is just not changing then the client's current index
|
||||
// will be equal to the entry index and entryValid will be false. This
|
||||
// is a common case!
|
||||
//
|
||||
// But regardless of the state of the entry, assuming it's already in the
|
||||
// TTL heap, we should touch it every time around here since this caller at
|
||||
// least still cares about the value!
|
||||
c.entriesLock.Lock()
|
||||
c.entriesExpiryHeap.Update(entry.Expiry.Index(), r.TypeEntry.Opts.LastGetTTL)
|
||||
c.entriesLock.Unlock()
|
||||
}
|
||||
|
||||
if entryValid {
|
||||
meta := ResultMeta{Index: entry.Index}
|
||||
if first {
|
||||
|
@ -435,11 +456,6 @@ RETRY_GET:
|
|||
}
|
||||
}
|
||||
|
||||
// Touch the expiration and fix the heap.
|
||||
c.entriesLock.Lock()
|
||||
c.entriesExpiryHeap.Update(entry.Expiry.Index(), r.TypeEntry.Opts.LastGetTTL)
|
||||
c.entriesLock.Unlock()
|
||||
|
||||
// We purposely do not return an error here since the cache only works with
|
||||
// fetching values that either have a value or have an error, but not both.
|
||||
// The Error may be non-nil in the entry in the case that an error has
|
||||
|
|
|
@ -1042,6 +1042,72 @@ func TestCacheGet_expireResetGet(t *testing.T) {
|
|||
typ.AssertExpectations(t)
|
||||
}
|
||||
|
||||
// Test that entries reset their TTL on Get even when the value isn't changing
|
||||
func TestCacheGet_expireResetGetNoChange(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
require := require.New(t)
|
||||
|
||||
// Create a closer so we can tell if the entry gets evicted.
|
||||
closer := &testCloser{}
|
||||
|
||||
typ := &MockType{}
|
||||
typ.On("RegisterOptions").Return(RegisterOptions{
|
||||
LastGetTTL: 150 * time.Millisecond,
|
||||
SupportsBlocking: true,
|
||||
Refresh: true,
|
||||
})
|
||||
typ.On("Fetch", mock.Anything, mock.Anything).
|
||||
Return(func(o FetchOptions, r Request) FetchResult {
|
||||
if o.MinIndex == 10 {
|
||||
// Simulate a very fast timeout from the backend. This must be shorter
|
||||
// than the TTL above (as it would be in real life) so that fetch returns
|
||||
// a few times with the same value which _should_ cause the blocking watch
|
||||
// to go round the Get loop and so keep the cache entry from being
|
||||
// evicted.
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
}
|
||||
return FetchResult{Value: 42, Index: 10, State: closer}
|
||||
}, func(o FetchOptions, r Request) error {
|
||||
return nil
|
||||
})
|
||||
defer typ.AssertExpectations(t)
|
||||
c := New(Options{})
|
||||
|
||||
// Register the type with a timeout
|
||||
c.RegisterType("t", typ)
|
||||
|
||||
// Get, should fetch
|
||||
req := TestRequest(t, RequestInfo{Key: "hello"})
|
||||
result, meta, err := c.Get(context.Background(), "t", req)
|
||||
require.NoError(err)
|
||||
require.Equal(42, result)
|
||||
require.Equal(uint64(10), meta.Index)
|
||||
require.False(meta.Hit)
|
||||
|
||||
// Do a blocking watch of the value that won't time out until after the TTL.
|
||||
start := time.Now()
|
||||
req = TestRequest(t, RequestInfo{Key: "hello", MinIndex: 10, Timeout: 300 * time.Millisecond})
|
||||
result, meta, err = c.Get(context.Background(), "t", req)
|
||||
require.NoError(err)
|
||||
require.Equal(42, result)
|
||||
require.Equal(uint64(10), meta.Index)
|
||||
require.GreaterOrEqual(time.Since(start).Milliseconds(), int64(300))
|
||||
|
||||
// This is the point of this test! Even though we waited for a change for
|
||||
// longer than the TTL, we should have been updating the TTL so that the cache
|
||||
// entry should not have been evicted. We can't verify that with meta.Hit
|
||||
// since that is not set for blocking Get calls but we can assert that the
|
||||
// entry was never closed (which assuming the test for eviction closing is
|
||||
// also passing is a reliable signal).
|
||||
require.False(closer.isClosed(), "cache entry should not have been evicted")
|
||||
|
||||
// Sleep a tiny bit just to let maybe some background calls happen
|
||||
// then verify that we still only got the one call
|
||||
time.Sleep(20 * time.Millisecond)
|
||||
typ.AssertExpectations(t)
|
||||
}
|
||||
|
||||
// Test that entries with state that satisfies io.Closer get cleaned up
|
||||
func TestCacheGet_expireClose(t *testing.T) {
|
||||
if testing.Short() {
|
||||
|
|
Loading…
Reference in New Issue