cache: fix bug where TTLs were ignored leading to leaked memory in client agents (#9978)

* Fix bug in cache where TTLs are effectively ignored

This mostly affects streaming since streaming will immediately return from Fetch calls when the state is Closed on eviction which causes the race condition every time.

However this also affects all other cache types if the fetch call happens to return between the eviction and then next time around the Get loop by any client.

There is a separate bug that allows cache items to be evicted even when there are active clients which is the trigger here.

* Add changelog entry

* Update .changelog/9978.txt
This commit is contained in:
Paul Banks 2021-04-08 11:08:56 +01:00 committed by GitHub
parent 9115884c50
commit b61e00b772
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 179 additions and 2 deletions

3
.changelog/9978.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:bug
cache: fix a bug in the client agent cache where streaming could potentially leak resources. [[GH-9978](https://github.com/hashicorp/consul/pull/9978)].
```

16
agent/cache/cache.go vendored
View File

@ -723,6 +723,22 @@ func (c *Cache) fetch(key string, r getOptions, allowNew bool, attempt uint, ign
// Set our entry // Set our entry
c.entriesLock.Lock() c.entriesLock.Lock()
if _, ok := c.entries[key]; !ok {
// This entry was evicted during our fetch. DON'T re-insert it or fall
// through to the refresh loop below otherwise it will live forever! In
// theory there should not be any Get calls waiting on entry.Waiter since
// they would have prevented the eviction, but in practice there may be
// due to timing and the fact that we don't update the TTL on the entry if
// errors are being returned for a while. So we do need to unblock them,
// which will mean they recreate the entry again right away and so "reset"
// to a good state anyway!
c.entriesLock.Unlock()
// Trigger any waiters that are around.
close(entry.Waiter)
return
}
// If this is a new entry (not in the heap yet), then setup the // If this is a new entry (not in the heap yet), then setup the
// initial expiry information and insert. If we're already in // initial expiry information and insert. If we're already in
// the heap we do nothing since we're reusing the same entry. // the heap we do nothing since we're reusing the same entry.

View File

@ -802,7 +802,7 @@ func TestCacheGet_expire(t *testing.T) {
// Wait for a non-trivial amount of time to sanity check the age increases at // Wait for a non-trivial amount of time to sanity check the age increases at
// least this amount. Note that this is not a fudge for some timing-dependent // least this amount. Note that this is not a fudge for some timing-dependent
// background work it's just ensuring a non-trivial time elapses between the // background work it's just ensuring a non-trivial time elapses between the
// request above and below serilaly in this thread so short time is OK. // request above and below serially in this thread so short time is OK.
time.Sleep(5 * time.Millisecond) time.Sleep(5 * time.Millisecond)
// Get, should not fetch, verified via the mock assertions above // Get, should not fetch, verified via the mock assertions above
@ -823,6 +823,160 @@ func TestCacheGet_expire(t *testing.T) {
require.Equal(42, result) require.Equal(42, result)
require.False(meta.Hit) require.False(meta.Hit)
// Sleep a tiny bit just to let maybe some background calls happen then verify
// that we still only got the one call
time.Sleep(20 * time.Millisecond)
typ.AssertExpectations(t)
}
// Test that entries expire for background refresh types that cancel fetch on
// eviction. This is really a special case of the test below where the close
// behavior of the type forces the timing that causes the race but it's worth
// keeping explicitly anyway to make sure this behavior is supported and
// doesn't introduce any different races.
func TestCacheGet_expireBackgroudRefreshCancel(t *testing.T) {
t.Parallel()
require := require.New(t)
typ := &MockType{}
typ.On("RegisterOptions").Return(RegisterOptions{
LastGetTTL: 400 * time.Millisecond,
Refresh: true,
RefreshTimer: 0,
SupportsBlocking: true,
})
defer typ.AssertExpectations(t)
c := New(Options{})
// Register the type with a timeout
c.RegisterType("t", typ)
// Create a cache state that is a closer that cancels the context on close
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
closer := &testCloser{
closeFn: func() {
cancel()
},
}
// Configure the type
typ.On("Fetch", mock.Anything, mock.Anything).
Return(func(o FetchOptions, r Request) FetchResult {
return FetchResult{Value: 8, Index: 4, State: closer}
}, func(o FetchOptions, r Request) error {
if o.MinIndex == 4 {
// Simulate waiting for a new value on second call until the cache type
// is evicted
<-ctx.Done()
return ctx.Err()
}
return nil
})
// Get, should fetch
req := TestRequest(t, RequestInfo{Key: "hello"})
result, meta, err := c.Get(context.Background(), "t", req)
require.NoError(err)
require.Equal(8, result)
require.Equal(uint64(4), meta.Index)
require.False(meta.Hit)
// Get, should not fetch, verified via the mock assertions above
req = TestRequest(t, RequestInfo{Key: "hello"})
result, meta, err = c.Get(context.Background(), "t", req)
require.NoError(err)
require.Equal(8, result)
require.Equal(uint64(4), meta.Index)
require.True(meta.Hit)
// Sleep for the expiry
time.Sleep(500 * time.Millisecond)
// Get, should fetch
req = TestRequest(t, RequestInfo{Key: "hello"})
result, meta, err = c.Get(context.Background(), "t", req)
require.NoError(err)
require.Equal(8, result)
require.Equal(uint64(4), meta.Index)
require.False(meta.Hit, "the fetch should not have re-populated the cache "+
"entry after it expired so this get should be a miss")
// Sleep a tiny bit just to let maybe some background calls happen
// then verify that we still only got the one call
time.Sleep(20 * time.Millisecond)
typ.AssertExpectations(t)
}
// Test that entries expire for background refresh types that return before any
// watcher re-fetches.
func TestCacheGet_expireBackgroudRefresh(t *testing.T) {
t.Parallel()
require := require.New(t)
typ := &MockType{}
typ.On("RegisterOptions").Return(RegisterOptions{
LastGetTTL: 400 * time.Millisecond,
Refresh: true,
RefreshTimer: 0,
SupportsBlocking: true,
})
defer typ.AssertExpectations(t)
c := New(Options{})
// Register the type with a timeout
c.RegisterType("t", typ)
ctrlCh := make(chan struct{})
// Configure the type
typ.On("Fetch", mock.Anything, mock.Anything).
Return(func(o FetchOptions, r Request) FetchResult {
if o.MinIndex == 4 {
// Simulate returning from fetch (after a timeout with no value change)
// at a time controlled by the test to ensure we interleave requests.
<-ctrlCh
}
return FetchResult{Value: 8, Index: 4}
}, func(o FetchOptions, r Request) error {
return nil
})
// Get, should fetch
req := TestRequest(t, RequestInfo{Key: "hello"})
result, meta, err := c.Get(context.Background(), "t", req)
require.NoError(err)
require.Equal(8, result)
require.Equal(uint64(4), meta.Index)
require.False(meta.Hit)
// Get, should not fetch, verified via the mock assertions above
req = TestRequest(t, RequestInfo{Key: "hello"})
result, meta, err = c.Get(context.Background(), "t", req)
require.NoError(err)
require.Equal(8, result)
require.Equal(uint64(4), meta.Index)
require.True(meta.Hit)
// Sleep for the expiry
time.Sleep(500 * time.Millisecond)
// Now (after expiry) let the fetch call return
close(ctrlCh)
// Get, should fetch (it didn't originally because the fetch return would
// re-insert the value back into the cache and make it live forever).
req = TestRequest(t, RequestInfo{Key: "hello"})
result, meta, err = c.Get(context.Background(), "t", req)
require.NoError(err)
require.Equal(8, result)
require.Equal(uint64(4), meta.Index)
require.False(meta.Hit, "the fetch should not have re-populated the cache "+
"entry after it expired so this get should be a miss")
// Sleep a tiny bit just to let maybe some background calls happen // Sleep a tiny bit just to let maybe some background calls happen
// then verify that we still only got the one call // then verify that we still only got the one call
time.Sleep(20 * time.Millisecond) time.Sleep(20 * time.Millisecond)
@ -931,10 +1085,14 @@ func TestCacheGet_expireClose(t *testing.T) {
type testCloser struct { type testCloser struct {
closed uint32 closed uint32
closeFn func()
} }
func (t *testCloser) Close() error { func (t *testCloser) Close() error {
atomic.SwapUint32(&t.closed, 1) atomic.SwapUint32(&t.closed, 1)
if t.closeFn != nil {
t.closeFn()
}
return nil return nil
} }