diff --git a/agent/cache/cache.go b/agent/cache/cache.go index c03224eed..54f2707e6 100644 --- a/agent/cache/cache.go +++ b/agent/cache/cache.go @@ -192,7 +192,7 @@ func (c *Cache) Get(t string, r Request) (interface{}, error) { key := c.entryKey(&info) // First time through - var attempt uint + first := true // timeoutCh for watching our timeout var timeoutCh <-chan time.Time @@ -209,7 +209,7 @@ RETRY_GET: // we have. if ok && entry.Valid { if info.MinIndex == 0 || info.MinIndex < entry.Index { - if attempt == 0 { + if first { metrics.IncrCounter([]string{"consul", "cache", t, "hit"}, 1) atomic.AddUint64(&c.hits, 1) } @@ -220,7 +220,11 @@ RETRY_GET: c.entriesExpiryHeap.Fix(entry.Expiry) c.entriesLock.Unlock() - return entry.Value, entry.Error + // We purposely do not return an error here since the cache + // only works with fetching values that either have a value + // or have an error, but not both. The Error may be non-nil + // in the entry because of this to note future fetch errors. + return entry.Value, nil } } @@ -229,11 +233,11 @@ RETRY_GET: // a retry loop getting the same error for the entire duration of the // timeout. Instead, we make one effort to fetch a new value, and if // there was an error, we return. - if attempt > 0 && entry.Error != nil { + if !first && entry.Error != nil { return entry.Value, entry.Error } - if attempt == 0 { + if first { // Record the miss if its our first time through atomic.AddUint64(&c.misses, 1) @@ -248,7 +252,7 @@ RETRY_GET: } // No longer our first time through - attempt++ + first = false // Set our timeout channel if we must if info.Timeout > 0 && timeoutCh == nil { @@ -257,7 +261,7 @@ RETRY_GET: // At this point, we know we either don't have a value at all or the // value we have is too old. We need to wait for new data. - waiterCh, err := c.fetch(t, key, r, true, attempt) + waiterCh, err := c.fetch(t, key, r, true, 0) if err != nil { return nil, err } @@ -296,16 +300,6 @@ func (c *Cache) fetch(t, key string, r Request, allowNew bool, attempt uint) (<- return nil, fmt.Errorf("unknown type in cache: %s", t) } - // If we're over the attempt minimum, start an exponential backoff. - if attempt > CacheRefreshBackoffMin { - waitTime := (1 << (attempt - CacheRefreshBackoffMin)) * time.Second - if waitTime > CacheRefreshMaxWait { - waitTime = CacheRefreshMaxWait - } - - time.Sleep(waitTime) - } - // We acquire a write lock because we may have to set Fetching to true. c.entriesLock.Lock() defer c.entriesLock.Unlock() @@ -354,7 +348,6 @@ func (c *Cache) fetch(t, key string, r Request, allowNew bool, attempt uint) (<- // A new value was given, so we create a brand new entry. newEntry.Value = result.Value newEntry.Index = result.Index - newEntry.Error = err // This is a valid entry with a result newEntry.Valid = true @@ -374,12 +367,8 @@ func (c *Cache) fetch(t, key string, r Request, allowNew bool, attempt uint) (<- // Increment attempt counter attempt++ - // If the entry wasn't valid, we set an error. If it was valid, - // we don't set an error so that the prior value can continue - // being used. This will be evicted if the TTL comes up. - if !newEntry.Valid { - newEntry.Error = err - } + // Set the error that should be used if the fetch is failing. + newEntry.Error = err } // Create a new waiter that will be used for the next fetch. @@ -448,6 +437,16 @@ func (c *Cache) refresh(opts *RegisterOptions, attempt uint, t string, key strin return } + // If we're over the attempt minimum, start an exponential backoff. + if attempt > CacheRefreshBackoffMin { + waitTime := (1 << (attempt - CacheRefreshBackoffMin)) * time.Second + if waitTime > CacheRefreshMaxWait { + waitTime = CacheRefreshMaxWait + } + + time.Sleep(waitTime) + } + // If we have a timer, wait for it if opts.RefreshTimer > 0 { time.Sleep(opts.RefreshTimer) diff --git a/agent/cache/cache_test.go b/agent/cache/cache_test.go index 1f643ed9f..0d8ecb5b0 100644 --- a/agent/cache/cache_test.go +++ b/agent/cache/cache_test.go @@ -285,16 +285,10 @@ func TestCacheGet_blockingIndexBackoff(t *testing.T) { resultCh := TestCacheGetCh(t, c, "t", TestRequest(t, RequestInfo{Key: "hello"})) TestCacheGetChResult(t, resultCh, 1) - // Fetch should block + // Fetch should not block and should return error resultCh = TestCacheGetCh(t, c, "t", TestRequest(t, RequestInfo{ Key: "hello", MinIndex: 7, Timeout: 1 * time.Minute})) - - // Should block - select { - case <-resultCh: - t.Fatal("should block") - case <-time.After(50 * time.Millisecond): - } + TestCacheGetChResult(t, resultCh, nil) // Wait a bit time.Sleep(100 * time.Millisecond) diff --git a/agent/cache/testing.go b/agent/cache/testing.go index 365dc3b4e..46f072e42 100644 --- a/agent/cache/testing.go +++ b/agent/cache/testing.go @@ -44,7 +44,9 @@ func TestCacheGetChResult(t testing.T, ch <-chan interface{}, expected interface if !reflect.DeepEqual(result, expected) { t.Fatalf("Result doesn't match!\n\n%#v\n\n%#v", result, expected) } + case <-time.After(50 * time.Millisecond): + t.Fatalf("Result not sent on channel") } }