|
|
@ -87,8 +87,8 @@ var Counters = []prometheus.CounterDefinition{
|
|
|
|
// Constants related to refresh backoff. We probably don't ever need to
|
|
|
|
// Constants related to refresh backoff. We probably don't ever need to
|
|
|
|
// make these configurable knobs since they primarily exist to lower load.
|
|
|
|
// make these configurable knobs since they primarily exist to lower load.
|
|
|
|
const (
|
|
|
|
const (
|
|
|
|
DefaultCacheRefreshBackoffMin = 3 // 3 attempts before backing off
|
|
|
|
CacheRefreshBackoffMin = 3 // 3 attempts before backing off
|
|
|
|
DefaultCacheRefreshMaxWait = 1 * time.Minute // maximum backoff wait time
|
|
|
|
CacheRefreshMaxWait = 1 * time.Minute // maximum backoff wait time
|
|
|
|
|
|
|
|
|
|
|
|
// The following constants are default values for the cache entry
|
|
|
|
// The following constants are default values for the cache entry
|
|
|
|
// rate limiter settings.
|
|
|
|
// rate limiter settings.
|
|
|
@ -141,7 +141,10 @@ type Cache struct {
|
|
|
|
entriesLock sync.RWMutex
|
|
|
|
entriesLock sync.RWMutex
|
|
|
|
entries map[string]cacheEntry
|
|
|
|
entries map[string]cacheEntry
|
|
|
|
entriesExpiryHeap *ttlcache.ExpiryHeap
|
|
|
|
entriesExpiryHeap *ttlcache.ExpiryHeap
|
|
|
|
lastGoroutineID uint64
|
|
|
|
|
|
|
|
|
|
|
|
fetchLock sync.Mutex
|
|
|
|
|
|
|
|
lastFetchID uint64
|
|
|
|
|
|
|
|
fetchHandles map[string]fetchHandle
|
|
|
|
|
|
|
|
|
|
|
|
// stopped is used as an atomic flag to signal that the Cache has been
|
|
|
|
// stopped is used as an atomic flag to signal that the Cache has been
|
|
|
|
// discarded so background fetches and expiry processing should stop.
|
|
|
|
// discarded so background fetches and expiry processing should stop.
|
|
|
@ -154,6 +157,11 @@ type Cache struct {
|
|
|
|
rateLimitCancel context.CancelFunc
|
|
|
|
rateLimitCancel context.CancelFunc
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
type fetchHandle struct {
|
|
|
|
|
|
|
|
id uint64
|
|
|
|
|
|
|
|
stopCh chan struct{}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// typeEntry is a single type that is registered with a Cache.
|
|
|
|
// typeEntry is a single type that is registered with a Cache.
|
|
|
|
type typeEntry struct {
|
|
|
|
type typeEntry struct {
|
|
|
|
// Name that was used to register the Type
|
|
|
|
// Name that was used to register the Type
|
|
|
@ -199,13 +207,6 @@ type Options struct {
|
|
|
|
EntryFetchMaxBurst int
|
|
|
|
EntryFetchMaxBurst int
|
|
|
|
// EntryFetchRate represents the max calls/sec for a single cache entry
|
|
|
|
// EntryFetchRate represents the max calls/sec for a single cache entry
|
|
|
|
EntryFetchRate rate.Limit
|
|
|
|
EntryFetchRate rate.Limit
|
|
|
|
|
|
|
|
|
|
|
|
// CacheRefreshBackoffMin is the number of attempts to wait before backing off.
|
|
|
|
|
|
|
|
// Mostly configurable just for testing.
|
|
|
|
|
|
|
|
CacheRefreshBackoffMin uint
|
|
|
|
|
|
|
|
// CacheRefreshMaxWait is the maximum backoff wait time.
|
|
|
|
|
|
|
|
// Mostly configurable just for testing.
|
|
|
|
|
|
|
|
CacheRefreshMaxWait time.Duration
|
|
|
|
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// Equal return true if both options are equivalent
|
|
|
|
// Equal return true if both options are equivalent
|
|
|
@ -221,12 +222,6 @@ func applyDefaultValuesOnOptions(options Options) Options {
|
|
|
|
if options.EntryFetchMaxBurst == 0 {
|
|
|
|
if options.EntryFetchMaxBurst == 0 {
|
|
|
|
options.EntryFetchMaxBurst = DefaultEntryFetchMaxBurst
|
|
|
|
options.EntryFetchMaxBurst = DefaultEntryFetchMaxBurst
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if options.CacheRefreshBackoffMin == 0 {
|
|
|
|
|
|
|
|
options.CacheRefreshBackoffMin = DefaultCacheRefreshBackoffMin
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
if options.CacheRefreshMaxWait == 0 {
|
|
|
|
|
|
|
|
options.CacheRefreshMaxWait = DefaultCacheRefreshMaxWait
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
if options.Logger == nil {
|
|
|
|
if options.Logger == nil {
|
|
|
|
options.Logger = hclog.New(nil)
|
|
|
|
options.Logger = hclog.New(nil)
|
|
|
|
}
|
|
|
|
}
|
|
|
@ -242,6 +237,7 @@ func New(options Options) *Cache {
|
|
|
|
types: make(map[string]typeEntry),
|
|
|
|
types: make(map[string]typeEntry),
|
|
|
|
entries: make(map[string]cacheEntry),
|
|
|
|
entries: make(map[string]cacheEntry),
|
|
|
|
entriesExpiryHeap: ttlcache.NewExpiryHeap(),
|
|
|
|
entriesExpiryHeap: ttlcache.NewExpiryHeap(),
|
|
|
|
|
|
|
|
fetchHandles: make(map[string]fetchHandle),
|
|
|
|
stopCh: make(chan struct{}),
|
|
|
|
stopCh: make(chan struct{}),
|
|
|
|
options: options,
|
|
|
|
options: options,
|
|
|
|
rateLimitContext: ctx,
|
|
|
|
rateLimitContext: ctx,
|
|
|
@ -411,23 +407,11 @@ func (c *Cache) getEntryLocked(
|
|
|
|
// Check if re-validate is requested. If so the first time round the
|
|
|
|
// Check if re-validate is requested. If so the first time round the
|
|
|
|
// loop is not a hit but subsequent ones should be treated normally.
|
|
|
|
// loop is not a hit but subsequent ones should be treated normally.
|
|
|
|
if !tEntry.Opts.Refresh && info.MustRevalidate {
|
|
|
|
if !tEntry.Opts.Refresh && info.MustRevalidate {
|
|
|
|
// It is important to note that this block ONLY applies when we are not
|
|
|
|
if entry.Fetching {
|
|
|
|
// in indefinite refresh mode (where the underlying goroutine will
|
|
|
|
// There is an active blocking query for this data, which has not
|
|
|
|
// continue to re-query for data).
|
|
|
|
// returned. We can logically deduce that the contents of the cache
|
|
|
|
//
|
|
|
|
// are actually current, and we can simply return this while
|
|
|
|
// In this mode goroutines have a 1:1 relationship to RPCs that get
|
|
|
|
// leaving the blocking query alone.
|
|
|
|
// executed, and importantly they DO NOT SLEEP after executing.
|
|
|
|
|
|
|
|
//
|
|
|
|
|
|
|
|
// This means that a running goroutine for this cache entry extremely
|
|
|
|
|
|
|
|
// strongly implies that the RPC has not yet completed, which is why
|
|
|
|
|
|
|
|
// this check works for the revalidation-avoidance optimization here.
|
|
|
|
|
|
|
|
if entry.GoroutineID != 0 {
|
|
|
|
|
|
|
|
// There is an active goroutine performing a blocking query for
|
|
|
|
|
|
|
|
// this data, which has not returned.
|
|
|
|
|
|
|
|
//
|
|
|
|
|
|
|
|
// We can logically deduce that the contents of the cache are
|
|
|
|
|
|
|
|
// actually current, and we can simply return this while leaving
|
|
|
|
|
|
|
|
// the blocking query alone.
|
|
|
|
|
|
|
|
return true, true, entry
|
|
|
|
return true, true, entry
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return true, false, entry
|
|
|
|
return true, false, entry
|
|
|
@ -557,7 +541,7 @@ RETRY_GET:
|
|
|
|
|
|
|
|
|
|
|
|
// At this point, we know we either don't have a value at all or the
|
|
|
|
// At this point, we know we either don't have a value at all or the
|
|
|
|
// value we have is too old. We need to wait for new data.
|
|
|
|
// value we have is too old. We need to wait for new data.
|
|
|
|
waiterCh := c.fetch(key, r)
|
|
|
|
waiterCh := c.fetch(key, r, true, 0, false)
|
|
|
|
|
|
|
|
|
|
|
|
// No longer our first time through
|
|
|
|
// No longer our first time through
|
|
|
|
first = false
|
|
|
|
first = false
|
|
|
@ -584,36 +568,46 @@ func makeEntryKey(t, dc, peerName, token, key string) string {
|
|
|
|
return fmt.Sprintf("%s/%s/%s/%s", t, dc, token, key)
|
|
|
|
return fmt.Sprintf("%s/%s/%s/%s", t, dc, token, key)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// fetch triggers a new background fetch for the given Request. If a background
|
|
|
|
// fetch triggers a new background fetch for the given Request. If a
|
|
|
|
// fetch is already running or a goroutine to manage that still exists for a
|
|
|
|
// background fetch is already running for a matching Request, the waiter
|
|
|
|
// matching Request, the waiter channel for that request is returned. The
|
|
|
|
// channel for that request is returned. The effect of this is that there
|
|
|
|
// effect of this is that there is only ever one blocking query and goroutine
|
|
|
|
// is only ever one blocking query for any matching requests.
|
|
|
|
// for any matching requests.
|
|
|
|
//
|
|
|
|
func (c *Cache) fetch(key string, r getOptions) <-chan struct{} {
|
|
|
|
// If allowNew is true then the fetch should create the cache entry
|
|
|
|
|
|
|
|
// if it doesn't exist. If this is false, then fetch will do nothing
|
|
|
|
|
|
|
|
// if the entry doesn't exist. This latter case is to support refreshing.
|
|
|
|
|
|
|
|
func (c *Cache) fetch(key string, r getOptions, allowNew bool, attempt uint, ignoreExisting bool) <-chan struct{} {
|
|
|
|
|
|
|
|
// We acquire a write lock because we may have to set Fetching to true.
|
|
|
|
c.entriesLock.Lock()
|
|
|
|
c.entriesLock.Lock()
|
|
|
|
defer c.entriesLock.Unlock()
|
|
|
|
defer c.entriesLock.Unlock()
|
|
|
|
|
|
|
|
|
|
|
|
ok, entryValid, entry := c.getEntryLocked(r.TypeEntry, key, r.Info)
|
|
|
|
ok, entryValid, entry := c.getEntryLocked(r.TypeEntry, key, r.Info)
|
|
|
|
|
|
|
|
|
|
|
|
switch {
|
|
|
|
// This handles the case where a fetch succeeded after checking for its existence in
|
|
|
|
case ok && entryValid:
|
|
|
|
// getWithIndex. This ensures that we don't miss updates.
|
|
|
|
// This handles the case where a fetch succeeded after checking for its
|
|
|
|
if ok && entryValid && !ignoreExisting {
|
|
|
|
// existence in getWithIndex. This ensures that we don't miss updates.
|
|
|
|
|
|
|
|
ch := make(chan struct{})
|
|
|
|
ch := make(chan struct{})
|
|
|
|
close(ch)
|
|
|
|
close(ch)
|
|
|
|
return ch
|
|
|
|
return ch
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
case ok && entry.GoroutineID != 0:
|
|
|
|
// If we aren't allowing new values and we don't have an existing value,
|
|
|
|
// If we already have an entry and there's a goroutine to keep it
|
|
|
|
// return immediately. We return an immediately-closed channel so nothing
|
|
|
|
// refreshed then don't spawn another one to do the same work.
|
|
|
|
// blocks.
|
|
|
|
//
|
|
|
|
if !ok && !allowNew {
|
|
|
|
// Return the currently active waiter.
|
|
|
|
ch := make(chan struct{})
|
|
|
|
|
|
|
|
close(ch)
|
|
|
|
|
|
|
|
return ch
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// If we already have an entry and it is actively fetching, then return
|
|
|
|
|
|
|
|
// the currently active waiter.
|
|
|
|
|
|
|
|
if ok && entry.Fetching {
|
|
|
|
return entry.Waiter
|
|
|
|
return entry.Waiter
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
case !ok:
|
|
|
|
|
|
|
|
// If we don't have an entry, then create it. The entry must be marked
|
|
|
|
// If we don't have an entry, then create it. The entry must be marked
|
|
|
|
// as invalid so that it isn't returned as a valid value for a zero
|
|
|
|
// as invalid so that it isn't returned as a valid value for a zero index.
|
|
|
|
// index.
|
|
|
|
if !ok {
|
|
|
|
entry = cacheEntry{
|
|
|
|
entry = cacheEntry{
|
|
|
|
Valid: false,
|
|
|
|
Valid: false,
|
|
|
|
Waiter: make(chan struct{}),
|
|
|
|
Waiter: make(chan struct{}),
|
|
|
@ -624,100 +618,27 @@ func (c *Cache) fetch(key string, r getOptions) <-chan struct{} {
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// Assign each background fetching goroutine a unique ID and fingerprint
|
|
|
|
// Set that we're fetching to true, which makes it so that future
|
|
|
|
// the cache entry with the same ID. This way if the cache entry is ever
|
|
|
|
// identical calls to fetch will return the same waiter rather than
|
|
|
|
// cleaned up due to expiry and later recreated the old goroutine can
|
|
|
|
// perform multiple fetches.
|
|
|
|
// detect that and terminate rather than leak and do double work.
|
|
|
|
entry.Fetching = true
|
|
|
|
c.lastGoroutineID++
|
|
|
|
|
|
|
|
entry.GoroutineID = c.lastGoroutineID
|
|
|
|
|
|
|
|
c.entries[key] = entry
|
|
|
|
c.entries[key] = entry
|
|
|
|
metrics.SetGauge([]string{"consul", "cache", "entries_count"}, float32(len(c.entries)))
|
|
|
|
metrics.SetGauge([]string{"consul", "cache", "entries_count"}, float32(len(c.entries)))
|
|
|
|
metrics.SetGauge([]string{"cache", "entries_count"}, float32(len(c.entries)))
|
|
|
|
metrics.SetGauge([]string{"cache", "entries_count"}, float32(len(c.entries)))
|
|
|
|
|
|
|
|
|
|
|
|
// The actual Fetch must be performed in a goroutine.
|
|
|
|
|
|
|
|
go c.launchBackgroundFetcher(entry.GoroutineID, key, r)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return entry.Waiter
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
func (c *Cache) launchBackgroundFetcher(goroutineID uint64, key string, r getOptions) {
|
|
|
|
|
|
|
|
defer func() {
|
|
|
|
|
|
|
|
c.entriesLock.Lock()
|
|
|
|
|
|
|
|
defer c.entriesLock.Unlock()
|
|
|
|
|
|
|
|
entry, ok := c.entries[key]
|
|
|
|
|
|
|
|
if ok && entry.GoroutineID == goroutineID {
|
|
|
|
|
|
|
|
entry.GoroutineID = 0
|
|
|
|
|
|
|
|
c.entries[key] = entry
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
var attempt uint
|
|
|
|
|
|
|
|
for {
|
|
|
|
|
|
|
|
shouldStop, shouldBackoff := c.runBackgroundFetcherOnce(goroutineID, key, r)
|
|
|
|
|
|
|
|
if shouldStop {
|
|
|
|
|
|
|
|
return
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if shouldBackoff {
|
|
|
|
|
|
|
|
attempt++
|
|
|
|
|
|
|
|
} else {
|
|
|
|
|
|
|
|
attempt = 0
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
// If we're over the attempt minimum, start an exponential backoff.
|
|
|
|
|
|
|
|
wait := backOffWait(c.options, attempt)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// If we have a timer, wait for it
|
|
|
|
|
|
|
|
wait += r.TypeEntry.Opts.RefreshTimer
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
select {
|
|
|
|
|
|
|
|
case <-time.After(wait):
|
|
|
|
|
|
|
|
case <-c.stopCh:
|
|
|
|
|
|
|
|
return // Check if cache was stopped
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Trigger.
|
|
|
|
|
|
|
|
r.Info.MustRevalidate = false
|
|
|
|
|
|
|
|
r.Info.MinIndex = 0
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// We acquire a write lock because we may have to set Fetching to true.
|
|
|
|
|
|
|
|
c.entriesLock.Lock()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
entry, ok := c.entries[key]
|
|
|
|
|
|
|
|
if !ok || entry.GoroutineID != goroutineID {
|
|
|
|
|
|
|
|
// If we don't have an existing entry, return immediately.
|
|
|
|
|
|
|
|
//
|
|
|
|
|
|
|
|
// Also if we already have an entry and it is actively fetching, then
|
|
|
|
|
|
|
|
// return immediately.
|
|
|
|
|
|
|
|
//
|
|
|
|
|
|
|
|
// If we've somehow lost control of the entry, also return.
|
|
|
|
|
|
|
|
c.entriesLock.Unlock()
|
|
|
|
|
|
|
|
return
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
c.entries[key] = entry
|
|
|
|
|
|
|
|
metrics.SetGauge([]string{"consul", "cache", "entries_count"}, float32(len(c.entries)))
|
|
|
|
|
|
|
|
metrics.SetGauge([]string{"cache", "entries_count"}, float32(len(c.entries)))
|
|
|
|
|
|
|
|
c.entriesLock.Unlock()
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
func (c *Cache) runBackgroundFetcherOnce(goroutineID uint64, key string, r getOptions) (shouldStop, shouldBackoff bool) {
|
|
|
|
|
|
|
|
// Freshly re-read this, rather than relying upon the caller to fetch it
|
|
|
|
|
|
|
|
// and pass it in.
|
|
|
|
|
|
|
|
c.entriesLock.RLock()
|
|
|
|
|
|
|
|
entry, ok := c.entries[key]
|
|
|
|
|
|
|
|
c.entriesLock.RUnlock()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if !ok || entry.GoroutineID != goroutineID {
|
|
|
|
|
|
|
|
// If we don't have an existing entry, return immediately.
|
|
|
|
|
|
|
|
//
|
|
|
|
|
|
|
|
// Also if something weird has happened to orphan this goroutine, also
|
|
|
|
|
|
|
|
// return immediately.
|
|
|
|
|
|
|
|
return true, false
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
tEntry := r.TypeEntry
|
|
|
|
tEntry := r.TypeEntry
|
|
|
|
{ // NOTE: this indentation is here to facilitate the PR review diff only
|
|
|
|
|
|
|
|
|
|
|
|
// The actual Fetch must be performed in a goroutine. Ensure that we only
|
|
|
|
|
|
|
|
// have one in-flight at a time, but don't use a deferred
|
|
|
|
|
|
|
|
// context.WithCancel style termination so that these things outlive their
|
|
|
|
|
|
|
|
// requester.
|
|
|
|
|
|
|
|
//
|
|
|
|
|
|
|
|
// By the time we get here the system WANTS to make a replacement fetcher, so
|
|
|
|
|
|
|
|
// we terminate the prior one and replace it.
|
|
|
|
|
|
|
|
handle := c.getOrReplaceFetchHandle(key)
|
|
|
|
|
|
|
|
go func(handle fetchHandle) {
|
|
|
|
|
|
|
|
defer c.deleteFetchHandle(key, handle.id)
|
|
|
|
|
|
|
|
|
|
|
|
// If we have background refresh and currently are in "disconnected" state,
|
|
|
|
// If we have background refresh and currently are in "disconnected" state,
|
|
|
|
// waiting for a response might mean we mark our results as stale for up to
|
|
|
|
// waiting for a response might mean we mark our results as stale for up to
|
|
|
|
// 10 minutes (max blocking timeout) after connection is restored. To reduce
|
|
|
|
// 10 minutes (max blocking timeout) after connection is restored. To reduce
|
|
|
@ -731,7 +652,7 @@ func (c *Cache) runBackgroundFetcherOnce(goroutineID uint64, key string, r getOp
|
|
|
|
c.entriesLock.Lock()
|
|
|
|
c.entriesLock.Lock()
|
|
|
|
defer c.entriesLock.Unlock()
|
|
|
|
defer c.entriesLock.Unlock()
|
|
|
|
entry, ok := c.entries[key]
|
|
|
|
entry, ok := c.entries[key]
|
|
|
|
if !ok || entry.RefreshLostContact.IsZero() || entry.GoroutineID != goroutineID {
|
|
|
|
if !ok || entry.RefreshLostContact.IsZero() {
|
|
|
|
return
|
|
|
|
return
|
|
|
|
}
|
|
|
|
}
|
|
|
|
entry.RefreshLostContact = time.Time{}
|
|
|
|
entry.RefreshLostContact = time.Time{}
|
|
|
@ -755,15 +676,12 @@ func (c *Cache) runBackgroundFetcherOnce(goroutineID uint64, key string, r getOp
|
|
|
|
Index: entry.Index,
|
|
|
|
Index: entry.Index,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
if err := entry.FetchRateLimiter.Wait(c.rateLimitContext); err != nil {
|
|
|
|
if err := entry.FetchRateLimiter.Wait(c.rateLimitContext); err != nil {
|
|
|
|
if connectedTimer != nil {
|
|
|
|
if connectedTimer != nil {
|
|
|
|
connectedTimer.Stop()
|
|
|
|
connectedTimer.Stop()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
entry.Error = fmt.Errorf("rateLimitContext canceled: %s", err.Error())
|
|
|
|
entry.Error = fmt.Errorf("rateLimitContext canceled: %s", err.Error())
|
|
|
|
// NOTE: this can only happen when the entire cache is being
|
|
|
|
return
|
|
|
|
// shutdown and isn't something that can happen normally.
|
|
|
|
|
|
|
|
return true, false
|
|
|
|
|
|
|
|
}
|
|
|
|
}
|
|
|
|
// Start building the new entry by blocking on the fetch.
|
|
|
|
// Start building the new entry by blocking on the fetch.
|
|
|
|
result, err := r.Fetch(fOpts)
|
|
|
|
result, err := r.Fetch(fOpts)
|
|
|
@ -771,8 +689,17 @@ func (c *Cache) runBackgroundFetcherOnce(goroutineID uint64, key string, r getOp
|
|
|
|
connectedTimer.Stop()
|
|
|
|
connectedTimer.Stop()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// If we were stopped while waiting on a blocking query now would be a
|
|
|
|
|
|
|
|
// good time to detect that.
|
|
|
|
|
|
|
|
select {
|
|
|
|
|
|
|
|
case <-handle.stopCh:
|
|
|
|
|
|
|
|
return
|
|
|
|
|
|
|
|
default:
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// Copy the existing entry to start.
|
|
|
|
// Copy the existing entry to start.
|
|
|
|
newEntry := entry
|
|
|
|
newEntry := entry
|
|
|
|
|
|
|
|
newEntry.Fetching = false
|
|
|
|
|
|
|
|
|
|
|
|
// Importantly, always reset the Error. Having both Error and a Value that
|
|
|
|
// Importantly, always reset the Error. Having both Error and a Value that
|
|
|
|
// are non-nil is allowed in the cache entry but it indicates that the Error
|
|
|
|
// are non-nil is allowed in the cache entry but it indicates that the Error
|
|
|
@ -828,7 +755,7 @@ func (c *Cache) runBackgroundFetcherOnce(goroutineID uint64, key string, r getOp
|
|
|
|
|
|
|
|
|
|
|
|
if result.Index > 0 {
|
|
|
|
if result.Index > 0 {
|
|
|
|
// Reset the attempts counter so we don't have any backoff
|
|
|
|
// Reset the attempts counter so we don't have any backoff
|
|
|
|
shouldBackoff = false
|
|
|
|
attempt = 0
|
|
|
|
} else {
|
|
|
|
} else {
|
|
|
|
// Result having a zero index is an implicit error case. There was no
|
|
|
|
// Result having a zero index is an implicit error case. There was no
|
|
|
|
// actual error but it implies the RPC found in index (nothing written
|
|
|
|
// actual error but it implies the RPC found in index (nothing written
|
|
|
@ -843,7 +770,7 @@ func (c *Cache) runBackgroundFetcherOnce(goroutineID uint64, key string, r getOp
|
|
|
|
// state it can be considered a bug in the RPC implementation (to ever
|
|
|
|
// state it can be considered a bug in the RPC implementation (to ever
|
|
|
|
// return a zero index) however since it can happen this is a safety net
|
|
|
|
// return a zero index) however since it can happen this is a safety net
|
|
|
|
// for the future.
|
|
|
|
// for the future.
|
|
|
|
shouldBackoff = true
|
|
|
|
attempt++
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// If we have refresh active, this successful response means cache is now
|
|
|
|
// If we have refresh active, this successful response means cache is now
|
|
|
@ -863,7 +790,7 @@ func (c *Cache) runBackgroundFetcherOnce(goroutineID uint64, key string, r getOp
|
|
|
|
metrics.IncrCounterWithLabels([]string{"cache", tEntry.Name, "fetch_error"}, 1, labels)
|
|
|
|
metrics.IncrCounterWithLabels([]string{"cache", tEntry.Name, "fetch_error"}, 1, labels)
|
|
|
|
|
|
|
|
|
|
|
|
// Increment attempt counter
|
|
|
|
// Increment attempt counter
|
|
|
|
shouldBackoff = true
|
|
|
|
attempt++
|
|
|
|
|
|
|
|
|
|
|
|
// If we are refreshing and just failed, updated the lost contact time as
|
|
|
|
// If we are refreshing and just failed, updated the lost contact time as
|
|
|
|
// our cache will be stale until we get successfully reconnected. We only
|
|
|
|
// our cache will be stale until we get successfully reconnected. We only
|
|
|
@ -880,7 +807,7 @@ func (c *Cache) runBackgroundFetcherOnce(goroutineID uint64, key string, r getOp
|
|
|
|
// Set our entry
|
|
|
|
// Set our entry
|
|
|
|
c.entriesLock.Lock()
|
|
|
|
c.entriesLock.Lock()
|
|
|
|
|
|
|
|
|
|
|
|
if currEntry, ok := c.entries[key]; !ok || currEntry.GoroutineID != goroutineID {
|
|
|
|
if _, ok := c.entries[key]; !ok {
|
|
|
|
// This entry was evicted during our fetch. DON'T re-insert it or fall
|
|
|
|
// This entry was evicted during our fetch. DON'T re-insert it or fall
|
|
|
|
// through to the refresh loop below otherwise it will live forever! In
|
|
|
|
// through to the refresh loop below otherwise it will live forever! In
|
|
|
|
// theory there should not be any Get calls waiting on entry.Waiter since
|
|
|
|
// theory there should not be any Get calls waiting on entry.Waiter since
|
|
|
@ -893,7 +820,7 @@ func (c *Cache) runBackgroundFetcherOnce(goroutineID uint64, key string, r getOp
|
|
|
|
|
|
|
|
|
|
|
|
// Trigger any waiters that are around.
|
|
|
|
// Trigger any waiters that are around.
|
|
|
|
close(entry.Waiter)
|
|
|
|
close(entry.Waiter)
|
|
|
|
return true, false
|
|
|
|
return
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// If this is a new entry (not in the heap yet), then setup the
|
|
|
|
// If this is a new entry (not in the heap yet), then setup the
|
|
|
@ -918,22 +845,79 @@ func (c *Cache) runBackgroundFetcherOnce(goroutineID uint64, key string, r getOp
|
|
|
|
// request back up again shortly but in the general case this prevents
|
|
|
|
// request back up again shortly but in the general case this prevents
|
|
|
|
// spamming the logs with tons of ACL not found errors for days.
|
|
|
|
// spamming the logs with tons of ACL not found errors for days.
|
|
|
|
if tEntry.Opts.Refresh && !preventRefresh {
|
|
|
|
if tEntry.Opts.Refresh && !preventRefresh {
|
|
|
|
return false, shouldBackoff
|
|
|
|
// Check if cache was stopped
|
|
|
|
}
|
|
|
|
if atomic.LoadUint32(&c.stopped) == 1 {
|
|
|
|
|
|
|
|
return
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
return true, false
|
|
|
|
// If we're over the attempt minimum, start an exponential backoff.
|
|
|
|
|
|
|
|
wait := backOffWait(attempt)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// If we have a timer, wait for it
|
|
|
|
|
|
|
|
wait += tEntry.Opts.RefreshTimer
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
select {
|
|
|
|
|
|
|
|
case <-time.After(wait):
|
|
|
|
|
|
|
|
case <-handle.stopCh:
|
|
|
|
|
|
|
|
return
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Trigger. The "allowNew" field is false because in the time we were
|
|
|
|
|
|
|
|
// waiting to refresh we may have expired and got evicted. If that
|
|
|
|
|
|
|
|
// happened, we don't want to create a new entry.
|
|
|
|
|
|
|
|
r.Info.MustRevalidate = false
|
|
|
|
|
|
|
|
r.Info.MinIndex = 0
|
|
|
|
|
|
|
|
c.fetch(key, r, false, attempt, true)
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}(handle)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return entry.Waiter
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func backOffWait(opts Options, failures uint) time.Duration {
|
|
|
|
func (c *Cache) getOrReplaceFetchHandle(key string) fetchHandle {
|
|
|
|
if failures > opts.CacheRefreshBackoffMin {
|
|
|
|
c.fetchLock.Lock()
|
|
|
|
shift := failures - opts.CacheRefreshBackoffMin
|
|
|
|
defer c.fetchLock.Unlock()
|
|
|
|
waitTime := opts.CacheRefreshMaxWait
|
|
|
|
|
|
|
|
|
|
|
|
if prevHandle, ok := c.fetchHandles[key]; ok {
|
|
|
|
|
|
|
|
close(prevHandle.stopCh)
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
c.lastFetchID++
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
handle := fetchHandle{
|
|
|
|
|
|
|
|
id: c.lastFetchID,
|
|
|
|
|
|
|
|
stopCh: make(chan struct{}),
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
c.fetchHandles[key] = handle
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return handle
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
func (c *Cache) deleteFetchHandle(key string, fetchID uint64) {
|
|
|
|
|
|
|
|
c.fetchLock.Lock()
|
|
|
|
|
|
|
|
defer c.fetchLock.Unlock()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Only remove a fetchHandle if it's YOUR fetchHandle.
|
|
|
|
|
|
|
|
handle, ok := c.fetchHandles[key]
|
|
|
|
|
|
|
|
if !ok {
|
|
|
|
|
|
|
|
return
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if handle.id == fetchID {
|
|
|
|
|
|
|
|
delete(c.fetchHandles, key)
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
func backOffWait(failures uint) time.Duration {
|
|
|
|
|
|
|
|
if failures > CacheRefreshBackoffMin {
|
|
|
|
|
|
|
|
shift := failures - CacheRefreshBackoffMin
|
|
|
|
|
|
|
|
waitTime := CacheRefreshMaxWait
|
|
|
|
if shift < 31 {
|
|
|
|
if shift < 31 {
|
|
|
|
waitTime = (1 << shift) * time.Second
|
|
|
|
waitTime = (1 << shift) * time.Second
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if waitTime > opts.CacheRefreshMaxWait {
|
|
|
|
if waitTime > CacheRefreshMaxWait {
|
|
|
|
waitTime = opts.CacheRefreshMaxWait
|
|
|
|
waitTime = CacheRefreshMaxWait
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return waitTime + lib.RandomStagger(waitTime)
|
|
|
|
return waitTime + lib.RandomStagger(waitTime)
|
|
|
|
}
|
|
|
|
}
|
|
|
|