Stop background refresh of cached data for requests that result in ACL not found errors (#9738)
This commit is contained in:
parent
5a50b26767
commit
19c99dc104
|
@ -0,0 +1,3 @@
|
||||||
|
```release-note:bug
|
||||||
|
cache: Prevent spamming the logs for days when a cached request encounters an "ACL not found" error.
|
||||||
|
```
|
|
@ -27,6 +27,7 @@ import (
|
||||||
"github.com/armon/go-metrics/prometheus"
|
"github.com/armon/go-metrics/prometheus"
|
||||||
"golang.org/x/time/rate"
|
"golang.org/x/time/rate"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/acl"
|
||||||
"github.com/hashicorp/consul/lib"
|
"github.com/hashicorp/consul/lib"
|
||||||
"github.com/hashicorp/consul/lib/ttlcache"
|
"github.com/hashicorp/consul/lib/ttlcache"
|
||||||
)
|
)
|
||||||
|
@ -655,6 +656,8 @@ func (c *Cache) fetch(key string, r getOptions, allowNew bool, attempt uint, ign
|
||||||
newEntry.State = result.State
|
newEntry.State = result.State
|
||||||
}
|
}
|
||||||
|
|
||||||
|
preventRefresh := acl.IsErrNotFound(err)
|
||||||
|
|
||||||
// Error handling
|
// Error handling
|
||||||
if err == nil {
|
if err == nil {
|
||||||
labels := []metrics.Label{{Name: "result_not_modified", Value: strconv.FormatBool(result.NotModified)}}
|
labels := []metrics.Label{{Name: "result_not_modified", Value: strconv.FormatBool(result.NotModified)}}
|
||||||
|
@ -688,9 +691,13 @@ func (c *Cache) fetch(key string, r getOptions, allowNew bool, attempt uint, ign
|
||||||
newEntry.RefreshLostContact = time.Time{}
|
newEntry.RefreshLostContact = time.Time{}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
// TODO (mkeeler) maybe change the name of this label to be more indicative of it just
|
||||||
|
// stopping the background refresh
|
||||||
|
labels := []metrics.Label{{Name: "fatal", Value: strconv.FormatBool(preventRefresh)}}
|
||||||
|
|
||||||
// TODO(kit): Add tEntry.Name to label on fetch_error and deprecate second write
|
// TODO(kit): Add tEntry.Name to label on fetch_error and deprecate second write
|
||||||
metrics.IncrCounter([]string{"consul", "cache", "fetch_error"}, 1)
|
metrics.IncrCounterWithLabels([]string{"consul", "cache", "fetch_error"}, 1, labels)
|
||||||
metrics.IncrCounter([]string{"consul", "cache", tEntry.Name, "fetch_error"}, 1)
|
metrics.IncrCounterWithLabels([]string{"consul", "cache", tEntry.Name, "fetch_error"}, 1, labels)
|
||||||
|
|
||||||
// Increment attempt counter
|
// Increment attempt counter
|
||||||
attempt++
|
attempt++
|
||||||
|
@ -725,7 +732,13 @@ func (c *Cache) fetch(key string, r getOptions, allowNew bool, attempt uint, ign
|
||||||
|
|
||||||
// If refresh is enabled, run the refresh in due time. The refresh
|
// If refresh is enabled, run the refresh in due time. The refresh
|
||||||
// below might block, but saves us from spawning another goroutine.
|
// below might block, but saves us from spawning another goroutine.
|
||||||
if tEntry.Opts.Refresh {
|
//
|
||||||
|
// We want to have ACL not found errors stop cache refresh for the cases
|
||||||
|
// where the token used for the query was deleted. If the request
|
||||||
|
// was coming from a cache notification then it will start the
|
||||||
|
// request back up again shortly but in the general case this prevents
|
||||||
|
// spamming the logs with tons of ACL not found errors for days.
|
||||||
|
if tEntry.Opts.Refresh && !preventRefresh {
|
||||||
// Check if cache was stopped
|
// Check if cache was stopped
|
||||||
if atomic.LoadUint32(&c.stopped) == 1 {
|
if atomic.LoadUint32(&c.stopped) == 1 {
|
||||||
return
|
return
|
||||||
|
|
|
@ -15,6 +15,7 @@ import (
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
"golang.org/x/time/rate"
|
"golang.org/x/time/rate"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/acl"
|
||||||
"github.com/hashicorp/consul/lib/ttlcache"
|
"github.com/hashicorp/consul/lib/ttlcache"
|
||||||
"github.com/hashicorp/consul/sdk/testutil"
|
"github.com/hashicorp/consul/sdk/testutil"
|
||||||
)
|
)
|
||||||
|
@ -1512,6 +1513,124 @@ func TestCache_Prepopulate(t *testing.T) {
|
||||||
require.Equal(t, 17, result)
|
require.Equal(t, 17, result)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestCache_RefreshLifeCycle(t *testing.T) {
|
||||||
|
typ := &MockType{}
|
||||||
|
t.Cleanup(func() { typ.AssertExpectations(t) })
|
||||||
|
|
||||||
|
typ.On("RegisterOptions").Times(0).Return(RegisterOptions{
|
||||||
|
// Maintain a blocking query, retry dropped connections quickly
|
||||||
|
Refresh: true,
|
||||||
|
SupportsBlocking: true,
|
||||||
|
RefreshTimer: 0 * time.Second,
|
||||||
|
QueryTimeout: 10 * time.Minute,
|
||||||
|
})
|
||||||
|
|
||||||
|
makeRequest := func(index uint64) fakeRequest {
|
||||||
|
return fakeRequest{
|
||||||
|
info: RequestInfo{
|
||||||
|
Key: "v1",
|
||||||
|
Token: "token",
|
||||||
|
Datacenter: "dc1",
|
||||||
|
MinIndex: index,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
typ.On("Fetch", mock.Anything, mock.Anything).Once().Return(FetchResult{
|
||||||
|
Value: true,
|
||||||
|
Index: 2,
|
||||||
|
}, nil)
|
||||||
|
|
||||||
|
releaseSecondReq := make(chan time.Time)
|
||||||
|
typ.On("Fetch", mock.Anything, mock.Anything).Once().Return(FetchResult{}, acl.PermissionDenied("forced error")).WaitUntil(releaseSecondReq)
|
||||||
|
|
||||||
|
releaseThirdReq := make(chan time.Time)
|
||||||
|
typ.On("Fetch", mock.Anything, mock.Anything).Once().Return(FetchResult{}, acl.ErrNotFound).WaitUntil(releaseThirdReq)
|
||||||
|
|
||||||
|
c := New(Options{})
|
||||||
|
c.RegisterType("t", typ)
|
||||||
|
|
||||||
|
key := makeEntryKey("t", "dc1", "token", "v1")
|
||||||
|
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
// get the background refresh going
|
||||||
|
result, _, err := c.Get(ctx, "t", makeRequest(1))
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, true, result)
|
||||||
|
|
||||||
|
// ensure that the entry is fetching again
|
||||||
|
c.entriesLock.Lock()
|
||||||
|
entry, ok := c.entries[key]
|
||||||
|
require.True(t, ok)
|
||||||
|
require.True(t, entry.Fetching)
|
||||||
|
c.entriesLock.Unlock()
|
||||||
|
|
||||||
|
requestChan := make(chan error)
|
||||||
|
|
||||||
|
getError := func(index uint64) {
|
||||||
|
_, _, err := c.Get(ctx, "t", makeRequest(index))
|
||||||
|
if ctx.Err() != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
requestChan <- err
|
||||||
|
}
|
||||||
|
|
||||||
|
// background a call that will wait for a newer version
|
||||||
|
go getError(2)
|
||||||
|
|
||||||
|
// I really dislike the arbitrary sleep here. However we want to test out some of the
|
||||||
|
// branching in getWithIndex (called by Get) and that doesn't expose any way for us to
|
||||||
|
// know when that go routine has gotten far enough and is waiting on the latest value.
|
||||||
|
// Therefore the only thing we can do for now is to sleep long enough to let that
|
||||||
|
// go routine progress far enough.
|
||||||
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
|
||||||
|
// release the blocking query to simulate an ACL permission denied error
|
||||||
|
close(releaseSecondReq)
|
||||||
|
|
||||||
|
// ensure we were woken up and see the permission denied error
|
||||||
|
select {
|
||||||
|
case err := <-requestChan:
|
||||||
|
require.True(t, acl.IsErrPermissionDenied(err))
|
||||||
|
case <-time.After(500 * time.Millisecond):
|
||||||
|
require.Fail(t, "blocking cache Get never returned")
|
||||||
|
}
|
||||||
|
|
||||||
|
// ensure that the entry is fetching again
|
||||||
|
c.entriesLock.Lock()
|
||||||
|
entry, ok = c.entries[key]
|
||||||
|
require.True(t, ok)
|
||||||
|
require.True(t, entry.Fetching)
|
||||||
|
c.entriesLock.Unlock()
|
||||||
|
|
||||||
|
// background a call that will wait for a newer version - will result in an acl not found error
|
||||||
|
go getError(5)
|
||||||
|
|
||||||
|
// Same arbitrary sleep as the one after the second request and the same reasoning.
|
||||||
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
|
||||||
|
// release the blocking query to simulate an ACL not found error
|
||||||
|
close(releaseThirdReq)
|
||||||
|
|
||||||
|
// ensure we were woken up and see the ACL not found error
|
||||||
|
select {
|
||||||
|
case err := <-requestChan:
|
||||||
|
require.True(t, acl.IsErrNotFound(err))
|
||||||
|
case <-time.After(500 * time.Millisecond):
|
||||||
|
require.Fail(t, "blocking cache Get never returned")
|
||||||
|
}
|
||||||
|
|
||||||
|
// ensure that the ACL not found error killed off the background refresh
|
||||||
|
// but didn't remove it from the cache
|
||||||
|
c.entriesLock.Lock()
|
||||||
|
entry, ok = c.entries[key]
|
||||||
|
require.True(t, ok)
|
||||||
|
require.False(t, entry.Fetching)
|
||||||
|
c.entriesLock.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
type fakeType struct {
|
type fakeType struct {
|
||||||
index uint64
|
index uint64
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue