diff --git a/.changelog/9978.txt b/.changelog/9978.txt new file mode 100644 index 000000000..de37677f5 --- /dev/null +++ b/.changelog/9978.txt @@ -0,0 +1,3 @@ +```release-note:bug +cache: fix a bug in the client agent cache where streaming could potentially leak resources. [[GH-9978](https://github.com/hashicorp/consul/pull/9978)]. +``` diff --git a/agent/cache/cache.go b/agent/cache/cache.go index 8c64b8ac8..a73db9b4e 100644 --- a/agent/cache/cache.go +++ b/agent/cache/cache.go @@ -723,6 +723,22 @@ func (c *Cache) fetch(key string, r getOptions, allowNew bool, attempt uint, ign // Set our entry c.entriesLock.Lock() + if _, ok := c.entries[key]; !ok { + // This entry was evicted during our fetch. DON'T re-insert it or fall + // through to the refresh loop below otherwise it will live forever! In + // theory there should not be any Get calls waiting on entry.Waiter since + // they would have prevented the eviction, but in practice there may be + // due to timing and the fact that we don't update the TTL on the entry if + // errors are being returned for a while. So we do need to unblock them, + // which will mean they recreate the entry again right away and so "reset" + // to a good state anyway! + c.entriesLock.Unlock() + + // Trigger any waiters that are around. + close(entry.Waiter) + return + } + // If this is a new entry (not in the heap yet), then setup the // initial expiry information and insert. If we're already in // the heap we do nothing since we're reusing the same entry. diff --git a/agent/cache/cache_test.go b/agent/cache/cache_test.go index e7b6c939d..a21c7c7a0 100644 --- a/agent/cache/cache_test.go +++ b/agent/cache/cache_test.go @@ -802,7 +802,7 @@ func TestCacheGet_expire(t *testing.T) { // Wait for a non-trivial amount of time to sanity check the age increases at // least this amount. Note that this is not a fudge for some timing-dependent // background work it's just ensuring a non-trivial time elapses between the - // request above and below serilaly in this thread so short time is OK. + // request above and below serially in this thread so short time is OK. time.Sleep(5 * time.Millisecond) // Get, should not fetch, verified via the mock assertions above @@ -823,6 +823,160 @@ func TestCacheGet_expire(t *testing.T) { require.Equal(42, result) require.False(meta.Hit) + // Sleep a tiny bit just to let maybe some background calls happen then verify + // that we still only got the one call + time.Sleep(20 * time.Millisecond) + typ.AssertExpectations(t) +} + +// Test that entries expire for background refresh types that cancel fetch on +// eviction. This is really a special case of the test below where the close +// behavior of the type forces the timing that causes the race but it's worth +// keeping explicitly anyway to make sure this behavior is supported and +// doesn't introduce any different races. +func TestCacheGet_expireBackgroudRefreshCancel(t *testing.T) { + t.Parallel() + + require := require.New(t) + + typ := &MockType{} + typ.On("RegisterOptions").Return(RegisterOptions{ + LastGetTTL: 400 * time.Millisecond, + Refresh: true, + RefreshTimer: 0, + SupportsBlocking: true, + }) + defer typ.AssertExpectations(t) + c := New(Options{}) + + // Register the type with a timeout + c.RegisterType("t", typ) + + // Create a cache state that is a closer that cancels the context on close + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + closer := &testCloser{ + closeFn: func() { + cancel() + }, + } + + // Configure the type + typ.On("Fetch", mock.Anything, mock.Anything). + Return(func(o FetchOptions, r Request) FetchResult { + return FetchResult{Value: 8, Index: 4, State: closer} + }, func(o FetchOptions, r Request) error { + if o.MinIndex == 4 { + // Simulate waiting for a new value on second call until the cache type + // is evicted + <-ctx.Done() + return ctx.Err() + } + return nil + }) + + // Get, should fetch + req := TestRequest(t, RequestInfo{Key: "hello"}) + result, meta, err := c.Get(context.Background(), "t", req) + require.NoError(err) + require.Equal(8, result) + require.Equal(uint64(4), meta.Index) + require.False(meta.Hit) + + // Get, should not fetch, verified via the mock assertions above + req = TestRequest(t, RequestInfo{Key: "hello"}) + result, meta, err = c.Get(context.Background(), "t", req) + require.NoError(err) + require.Equal(8, result) + require.Equal(uint64(4), meta.Index) + require.True(meta.Hit) + + // Sleep for the expiry + time.Sleep(500 * time.Millisecond) + + // Get, should fetch + req = TestRequest(t, RequestInfo{Key: "hello"}) + result, meta, err = c.Get(context.Background(), "t", req) + require.NoError(err) + require.Equal(8, result) + require.Equal(uint64(4), meta.Index) + require.False(meta.Hit, "the fetch should not have re-populated the cache "+ + "entry after it expired so this get should be a miss") + + // Sleep a tiny bit just to let maybe some background calls happen + // then verify that we still only got the one call + time.Sleep(20 * time.Millisecond) + typ.AssertExpectations(t) +} + +// Test that entries expire for background refresh types that return before any +// watcher re-fetches. +func TestCacheGet_expireBackgroudRefresh(t *testing.T) { + t.Parallel() + + require := require.New(t) + + typ := &MockType{} + typ.On("RegisterOptions").Return(RegisterOptions{ + LastGetTTL: 400 * time.Millisecond, + Refresh: true, + RefreshTimer: 0, + SupportsBlocking: true, + }) + defer typ.AssertExpectations(t) + c := New(Options{}) + + // Register the type with a timeout + c.RegisterType("t", typ) + + ctrlCh := make(chan struct{}) + + // Configure the type + typ.On("Fetch", mock.Anything, mock.Anything). + Return(func(o FetchOptions, r Request) FetchResult { + if o.MinIndex == 4 { + // Simulate returning from fetch (after a timeout with no value change) + // at a time controlled by the test to ensure we interleave requests. + <-ctrlCh + } + return FetchResult{Value: 8, Index: 4} + }, func(o FetchOptions, r Request) error { + return nil + }) + + // Get, should fetch + req := TestRequest(t, RequestInfo{Key: "hello"}) + result, meta, err := c.Get(context.Background(), "t", req) + require.NoError(err) + require.Equal(8, result) + require.Equal(uint64(4), meta.Index) + require.False(meta.Hit) + + // Get, should not fetch, verified via the mock assertions above + req = TestRequest(t, RequestInfo{Key: "hello"}) + result, meta, err = c.Get(context.Background(), "t", req) + require.NoError(err) + require.Equal(8, result) + require.Equal(uint64(4), meta.Index) + require.True(meta.Hit) + + // Sleep for the expiry + time.Sleep(500 * time.Millisecond) + + // Now (after expiry) let the fetch call return + close(ctrlCh) + + // Get, should fetch (it didn't originally because the fetch return would + // re-insert the value back into the cache and make it live forever). + req = TestRequest(t, RequestInfo{Key: "hello"}) + result, meta, err = c.Get(context.Background(), "t", req) + require.NoError(err) + require.Equal(8, result) + require.Equal(uint64(4), meta.Index) + require.False(meta.Hit, "the fetch should not have re-populated the cache "+ + "entry after it expired so this get should be a miss") + // Sleep a tiny bit just to let maybe some background calls happen // then verify that we still only got the one call time.Sleep(20 * time.Millisecond) @@ -930,11 +1084,15 @@ func TestCacheGet_expireClose(t *testing.T) { } type testCloser struct { - closed uint32 + closed uint32 + closeFn func() } func (t *testCloser) Close() error { atomic.SwapUint32(&t.closed, 1) + if t.closeFn != nil { + t.closeFn() + } return nil }