diff --git a/agent/cache/cache.go b/agent/cache/cache.go index 3603c3151..6f9aec912 100644 --- a/agent/cache/cache.go +++ b/agent/cache/cache.go @@ -225,7 +225,28 @@ func (c *Cache) Get(t string, r Request) (interface{}, ResultMeta, error) { // once. But be robust against panics. return nil, ResultMeta{}, fmt.Errorf("unknown type in cache: %s", t) } - return c.getWithIndex(tEntry, r, r.CacheInfo().MinIndex) + return c.getWithIndex(newGetOptions(tEntry, r)) +} + +// getOptions contains the arguments for a Get request. It is used in place of +// Request so that internal functions can modify Info without having to extract +// it from the Request each time. +type getOptions struct { + // Fetch is a closure over tEntry.Type.Fetch which provides the original + // Request from the caller. + Fetch func(opts FetchOptions) (FetchResult, error) + Info RequestInfo + TypeEntry typeEntry +} + +func newGetOptions(tEntry typeEntry, r Request) getOptions { + return getOptions{ + Fetch: func(opts FetchOptions) (FetchResult, error) { + return tEntry.Type.Fetch(opts, r) + }, + Info: r.CacheInfo(), + TypeEntry: tEntry, + } } // getEntryLocked retrieves a cache entry and checks if it is ready to be @@ -234,9 +255,7 @@ func (c *Cache) Get(t string, r Request) (interface{}, ResultMeta, error) { func (c *Cache) getEntryLocked( tEntry typeEntry, key string, - maxAge time.Duration, - revalidate bool, - minIndex uint64, + info RequestInfo, ) (entryExists bool, entryValid bool, entry cacheEntry) { entry, ok := c.entries[key] if !entry.Valid { @@ -245,7 +264,7 @@ func (c *Cache) getEntryLocked( // Check index is not specified or lower than value, or the type doesn't // support blocking. - if tEntry.Opts.SupportsBlocking && minIndex > 0 && minIndex >= entry.Index { + if tEntry.Opts.SupportsBlocking && info.MinIndex > 0 && info.MinIndex >= entry.Index { // MinIndex was given and matches or is higher than current value so we // ignore the cache and fallthrough to blocking on a new value below. return true, false, entry @@ -253,13 +272,13 @@ func (c *Cache) getEntryLocked( // Check MaxAge is not exceeded if this is not a background refreshing type // and MaxAge was specified. - if !tEntry.Opts.Refresh && maxAge > 0 && entryExceedsMaxAge(maxAge, entry) { + if !tEntry.Opts.Refresh && info.MaxAge > 0 && entryExceedsMaxAge(info.MaxAge, entry) { return true, false, entry } // Check if re-validate is requested. If so the first time round the // loop is not a hit but subsequent ones should be treated normally. - if !tEntry.Opts.Refresh && revalidate { + if !tEntry.Opts.Refresh && info.MustRevalidate { return true, false, entry } @@ -273,17 +292,17 @@ func entryExceedsMaxAge(maxAge time.Duration, entry cacheEntry) bool { // getWithIndex implements the main Get functionality but allows internal // callers (Watch) to manipulate the blocking index separately from the actual // request object. -func (c *Cache) getWithIndex(tEntry typeEntry, r Request, minIndex uint64) (interface{}, ResultMeta, error) { - info := r.CacheInfo() - if info.Key == "" { +func (c *Cache) getWithIndex(r getOptions) (interface{}, ResultMeta, error) { + if r.Info.Key == "" { metrics.IncrCounter([]string{"consul", "cache", "bypass"}, 1) // If no key is specified, then we do not cache this request. // Pass directly through to the backend. - return c.fetchDirect(tEntry, r, minIndex) + result, err := r.Fetch(FetchOptions{MinIndex: r.Info.MinIndex}) + return result.Value, ResultMeta{}, err } - key := makeEntryKey(tEntry.Name, info.Datacenter, info.Token, info.Key) + key := makeEntryKey(r.TypeEntry.Name, r.Info.Datacenter, r.Info.Token, r.Info.Key) // First time through first := true @@ -294,19 +313,19 @@ func (c *Cache) getWithIndex(tEntry typeEntry, r Request, minIndex uint64) (inte RETRY_GET: // Get the current value c.entriesLock.RLock() - _, entryValid, entry := c.getEntryLocked(tEntry, key, info.MaxAge, info.MustRevalidate && first, minIndex) + _, entryValid, entry := c.getEntryLocked(r.TypeEntry, key, r.Info) c.entriesLock.RUnlock() if entryValid { meta := ResultMeta{Index: entry.Index} if first { - metrics.IncrCounter([]string{"consul", "cache", tEntry.Name, "hit"}, 1) + metrics.IncrCounter([]string{"consul", "cache", r.TypeEntry.Name, "hit"}, 1) meta.Hit = true } // If refresh is enabled, calculate age based on whether the background // routine is still connected. - if tEntry.Opts.Refresh { + if r.TypeEntry.Opts.Refresh { meta.Age = time.Duration(0) if !entry.RefreshLostContact.IsZero() { meta.Age = time.Since(entry.RefreshLostContact) @@ -321,7 +340,7 @@ RETRY_GET: // Touch the expiration and fix the heap. c.entriesLock.Lock() - entry.Expiry.Update(tEntry.Opts.LastGetTTL) + entry.Expiry.Update(r.TypeEntry.Opts.LastGetTTL) c.entriesExpiryHeap.Fix(entry.Expiry) c.entriesLock.Unlock() @@ -356,23 +375,20 @@ RETRY_GET: // whether we're missing because we didn't have the data at all, // or if we're missing because we're blocking on a set index. missKey := "miss_block" - if minIndex == 0 { + if r.Info.MinIndex == 0 { missKey = "miss_new" } - metrics.IncrCounter([]string{"consul", "cache", tEntry.Name, missKey}, 1) + metrics.IncrCounter([]string{"consul", "cache", r.TypeEntry.Name, missKey}, 1) } // Set our timeout channel if we must - if info.Timeout > 0 && timeoutCh == nil { - timeoutCh = time.After(info.Timeout) + if r.Info.Timeout > 0 && timeoutCh == nil { + timeoutCh = time.After(r.Info.Timeout) } // At this point, we know we either don't have a value at all or the // value we have is too old. We need to wait for new data. - waiterCh, err := c.fetch(tEntry, key, r, true, 0, minIndex, false, !first) - if err != nil { - return nil, ResultMeta{Index: entry.Index}, err - } + waiterCh := c.fetch(key, r, true, 0, false) // No longer our first time through first = false @@ -380,6 +396,7 @@ RETRY_GET: select { case <-waiterCh: // Our fetch returned, retry the get from the cache. + r.Info.MustRevalidate = false goto RETRY_GET case <-timeoutCh: @@ -400,20 +417,18 @@ func makeEntryKey(t, dc, token, key string) string { // If allowNew is true then the fetch should create the cache entry // if it doesn't exist. If this is false, then fetch will do nothing // if the entry doesn't exist. This latter case is to support refreshing. -func (c *Cache) fetch(tEntry typeEntry, key string, r Request, allowNew bool, attempt uint, minIndex uint64, ignoreExisting bool, ignoreRevalidation bool) (<-chan struct{}, error) { - info := r.CacheInfo() - +func (c *Cache) fetch(key string, r getOptions, allowNew bool, attempt uint, ignoreExisting bool) <-chan struct{} { // We acquire a write lock because we may have to set Fetching to true. c.entriesLock.Lock() defer c.entriesLock.Unlock() - ok, entryValid, entry := c.getEntryLocked(tEntry, key, info.MaxAge, info.MustRevalidate && !ignoreRevalidation, minIndex) + ok, entryValid, entry := c.getEntryLocked(r.TypeEntry, key, r.Info) // This handles the case where a fetch succeeded after checking for its existence in // getWithIndex. This ensures that we don't miss updates. if ok && entryValid && !ignoreExisting { ch := make(chan struct{}) close(ch) - return ch, nil + return ch } // If we aren't allowing new values and we don't have an existing value, @@ -422,13 +437,13 @@ func (c *Cache) fetch(tEntry typeEntry, key string, r Request, allowNew bool, at if !ok && !allowNew { ch := make(chan struct{}) close(ch) - return ch, nil + return ch } // If we already have an entry and it is actively fetching, then return // the currently active waiter. if ok && entry.Fetching { - return entry.Waiter, nil + return entry.Waiter } // If we don't have an entry, then create it. The entry must be marked @@ -444,6 +459,7 @@ func (c *Cache) fetch(tEntry typeEntry, key string, r Request, allowNew bool, at c.entries[key] = entry metrics.SetGauge([]string{"consul", "cache", "entries_count"}, float32(len(c.entries))) + tEntry := r.TypeEntry // The actual Fetch must be performed in a goroutine. go func() { // If we have background refresh and currently are in "disconnected" state, @@ -482,7 +498,7 @@ func (c *Cache) fetch(tEntry typeEntry, key string, r Request, allowNew bool, at } // Start building the new entry by blocking on the fetch. - result, err := tEntry.Type.Fetch(fOpts, r) + result, err := r.Fetch(fOpts) if connectedTimer != nil { connectedTimer.Stop() } @@ -618,20 +634,13 @@ func (c *Cache) fetch(tEntry typeEntry, key string, r Request, allowNew bool, at // Trigger. The "allowNew" field is false because in the time we were // waiting to refresh we may have expired and got evicted. If that // happened, we don't want to create a new entry. - c.fetch(tEntry, key, r, false, attempt, 0, true, true) + r.Info.MustRevalidate = false + r.Info.MinIndex = 0 + c.fetch(key, r, false, attempt, true) } }() - return entry.Waiter, nil -} - -// fetchDirect fetches the given request with no caching. Because this -// bypasses the caching entirely, multiple matching requests will result -// in multiple actual RPC calls (unlike fetch). -func (c *Cache) fetchDirect(tEntry typeEntry, r Request, minIndex uint64) (interface{}, ResultMeta, error) { - // Fetch it with the min index specified directly by the request. - result, err := tEntry.Type.Fetch(FetchOptions{MinIndex: minIndex}, r) - return result.Value, ResultMeta{}, err + return entry.Waiter } func backOffWait(failures uint) time.Duration { diff --git a/agent/cache/watch.go b/agent/cache/watch.go index 1ed3ccad1..baf2759f9 100644 --- a/agent/cache/watch.go +++ b/agent/cache/watch.go @@ -65,7 +65,7 @@ func (c *Cache) Notify( } if tEntry.Opts.SupportsBlocking { - go c.notifyBlockingQuery(ctx, tEntry, r, correlationID, ch) + go c.notifyBlockingQuery(ctx, newGetOptions(tEntry, r), correlationID, ch) return nil } @@ -73,11 +73,11 @@ func (c *Cache) Notify( if info.MaxAge == 0 { return fmt.Errorf("Cannot use Notify for polling cache types without specifying the MaxAge") } - go c.notifyPollingQuery(ctx, tEntry, r, correlationID, ch, info.MaxAge) + go c.notifyPollingQuery(ctx, newGetOptions(tEntry, r), correlationID, ch) return nil } -func (c *Cache) notifyBlockingQuery(ctx context.Context, tEntry typeEntry, r Request, correlationID string, ch chan<- UpdateEvent) { +func (c *Cache) notifyBlockingQuery(ctx context.Context, r getOptions, correlationID string, ch chan<- UpdateEvent) { // Always start at 0 index to deliver the initial (possibly currently cached // value). index := uint64(0) @@ -90,7 +90,8 @@ func (c *Cache) notifyBlockingQuery(ctx context.Context, tEntry typeEntry, r Req } // Blocking request - res, meta, err := c.getWithIndex(tEntry, r, index) + r.Info.MinIndex = index + res, meta, err := c.getWithIndex(r) // Check context hasn't been canceled if ctx.Err() != nil { @@ -136,7 +137,7 @@ func (c *Cache) notifyBlockingQuery(ctx context.Context, tEntry typeEntry, r Req } } -func (c *Cache) notifyPollingQuery(ctx context.Context, tEntry typeEntry, r Request, correlationID string, ch chan<- UpdateEvent, maxAge time.Duration) { +func (c *Cache) notifyPollingQuery(ctx context.Context, r getOptions, correlationID string, ch chan<- UpdateEvent) { index := uint64(0) failures := uint(0) @@ -149,7 +150,8 @@ func (c *Cache) notifyPollingQuery(ctx context.Context, tEntry typeEntry, r Requ } // Make the request - res, meta, err := c.getWithIndex(tEntry, r, index) + r.Info.MinIndex = index + res, meta, err := c.getWithIndex(r) // Check context hasn't been canceled if ctx.Err() != nil { @@ -204,8 +206,8 @@ func (c *Cache) notifyPollingQuery(ctx context.Context, tEntry typeEntry, r Requ // Calculate when the cached data's Age will get too stale and // need to be re-queried. When the data's Age already exceeds the // maxAge the pollWait value is left at 0 to immediately re-poll - if meta.Age <= maxAge { - wait = maxAge - meta.Age + if meta.Age <= r.Info.MaxAge { + wait = r.Info.MaxAge - meta.Age } // Add a small amount of random jitter to the polling time. One @@ -217,7 +219,7 @@ func (c *Cache) notifyPollingQuery(ctx context.Context, tEntry typeEntry, r Requ // and then immediately have to re-fetch again. That wouldn't // be terrible but it would expend a bunch more cpu cycles when // we can definitely avoid it. - wait += lib.RandomStagger(maxAge / 16) + wait += lib.RandomStagger(r.Info.MaxAge / 16) } select {