diff --git a/agent/cache/cache.go b/agent/cache/cache.go index 789227245..1bf14b341 100644 --- a/agent/cache/cache.go +++ b/agent/cache/cache.go @@ -83,6 +83,8 @@ type Cache struct { // typeEntry is a single type that is registered with a Cache. type typeEntry struct { + // Name that was used to register the Type + Name string Type Type Opts *RegisterOptions } @@ -193,7 +195,7 @@ func (c *Cache) RegisterType(n string, typ Type, opts *RegisterOptions) { c.typesLock.Lock() defer c.typesLock.Unlock() - c.types[n] = typeEntry{Type: typ, Opts: opts} + c.types[n] = typeEntry{Name: n, Type: typ, Opts: opts} } // Get loads the data for the given type and request. If data satisfying the @@ -211,7 +213,15 @@ func (c *Cache) RegisterType(n string, typ Type, opts *RegisterOptions) { // error is returned on timeout. This matches the behavior of Consul blocking // queries. func (c *Cache) Get(t string, r Request) (interface{}, ResultMeta, error) { - return c.getWithIndex(t, r, r.CacheInfo().MinIndex) + c.typesLock.RLock() + tEntry, ok := c.types[t] + c.typesLock.RUnlock() + if !ok { + // Shouldn't happen given that we successfully fetched this at least + // once. But be robust against panics. + return nil, ResultMeta{}, fmt.Errorf("unknown type in cache: %s", t) + } + return c.getWithIndex(tEntry, r, r.CacheInfo().MinIndex) } // getEntryLocked retrieves a cache entry and checks if it is ready to be @@ -258,18 +268,17 @@ func (c *Cache) getEntryLocked(tEntry typeEntry, key string, maxAge time.Duratio // getWithIndex implements the main Get functionality but allows internal // callers (Watch) to manipulate the blocking index separately from the actual // request object. -func (c *Cache) getWithIndex(t string, r Request, minIndex uint64) (interface{}, ResultMeta, error) { +func (c *Cache) getWithIndex(tEntry typeEntry, r Request, minIndex uint64) (interface{}, ResultMeta, error) { info := r.CacheInfo() if info.Key == "" { metrics.IncrCounter([]string{"consul", "cache", "bypass"}, 1) // If no key is specified, then we do not cache this request. // Pass directly through to the backend. - return c.fetchDirect(t, r, minIndex) + return c.fetchDirect(tEntry, r, minIndex) } - // Get the actual key for our entry - key := c.entryKey(t, &info) + key := makeEntryKey(tEntry.Name, info.Datacenter, info.Token, info.Key) // First time through first := true @@ -278,16 +287,6 @@ func (c *Cache) getWithIndex(t string, r Request, minIndex uint64) (interface{}, var timeoutCh <-chan time.Time RETRY_GET: - // Get the type that we're fetching - c.typesLock.RLock() - tEntry, ok := c.types[t] - c.typesLock.RUnlock() - if !ok { - // Shouldn't happen given that we successfully fetched this at least - // once. But be robust against panics. - return nil, ResultMeta{}, fmt.Errorf("unknown type in cache: %s", t) - } - // Get the current value c.entriesLock.RLock() _, cacheHit, entry := c.getEntryLocked(tEntry, key, info.MaxAge, info.MustRevalidate && first, minIndex) @@ -296,7 +295,7 @@ RETRY_GET: if cacheHit { meta := ResultMeta{Index: entry.Index} if first { - metrics.IncrCounter([]string{"consul", "cache", t, "hit"}, 1) + metrics.IncrCounter([]string{"consul", "cache", tEntry.Name, "hit"}, 1) meta.Hit = true } @@ -351,11 +350,11 @@ RETRY_GET: // We increment two different counters for cache misses depending on // whether we're missing because we didn't have the data at all, // or if we're missing because we're blocking on a set index. + missKey := "miss_block" if minIndex == 0 { - metrics.IncrCounter([]string{"consul", "cache", t, "miss_new"}, 1) - } else { - metrics.IncrCounter([]string{"consul", "cache", t, "miss_block"}, 1) + missKey = "miss_new" } + metrics.IncrCounter([]string{"consul", "cache", tEntry.Name, missKey}, 1) } // Set our timeout channel if we must @@ -365,7 +364,7 @@ RETRY_GET: // At this point, we know we either don't have a value at all or the // value we have is too old. We need to wait for new data. - waiterCh, err := c.fetch(t, key, r, true, 0, minIndex, false, !first) + waiterCh, err := c.fetch(tEntry, key, r, true, 0, minIndex, false, !first) if err != nil { return nil, ResultMeta{Index: entry.Index}, err } @@ -384,12 +383,6 @@ RETRY_GET: } } -// entryKey returns the key for the entry in the cache. See the note -// about the entry key format in the structure docs for Cache. -func (c *Cache) entryKey(t string, r *RequestInfo) string { - return makeEntryKey(t, r.Datacenter, r.Token, r.Key) -} - func makeEntryKey(t, dc, token, key string) string { return fmt.Sprintf("%s/%s/%s/%s", t, dc, token, key) } @@ -402,15 +395,7 @@ func makeEntryKey(t, dc, token, key string) string { // If allowNew is true then the fetch should create the cache entry // if it doesn't exist. If this is false, then fetch will do nothing // if the entry doesn't exist. This latter case is to support refreshing. -func (c *Cache) fetch(t, key string, r Request, allowNew bool, attempt uint, minIndex uint64, ignoreExisting bool, ignoreRevalidation bool) (<-chan struct{}, error) { - // Get the type that we're fetching - c.typesLock.RLock() - tEntry, ok := c.types[t] - c.typesLock.RUnlock() - if !ok { - return nil, fmt.Errorf("unknown type in cache: %s", t) - } - +func (c *Cache) fetch(tEntry typeEntry, key string, r Request, allowNew bool, attempt uint, minIndex uint64, ignoreExisting bool, ignoreRevalidation bool) (<-chan struct{}, error) { info := r.CacheInfo() // We acquire a write lock because we may have to set Fetching to true. @@ -543,7 +528,7 @@ func (c *Cache) fetch(t, key string, r Request, allowNew bool, attempt uint, min // Error handling if err == nil { metrics.IncrCounter([]string{"consul", "cache", "fetch_success"}, 1) - metrics.IncrCounter([]string{"consul", "cache", t, "fetch_success"}, 1) + metrics.IncrCounter([]string{"consul", "cache", tEntry.Name, "fetch_success"}, 1) if result.Index > 0 { // Reset the attempts counter so we don't have any backoff @@ -572,7 +557,7 @@ func (c *Cache) fetch(t, key string, r Request, allowNew bool, attempt uint, min } } else { metrics.IncrCounter([]string{"consul", "cache", "fetch_error"}, 1) - metrics.IncrCounter([]string{"consul", "cache", t, "fetch_error"}, 1) + metrics.IncrCounter([]string{"consul", "cache", tEntry.Name, "fetch_error"}, 1) // Increment attempt counter attempt++ @@ -613,7 +598,7 @@ func (c *Cache) fetch(t, key string, r Request, allowNew bool, attempt uint, min // If refresh is enabled, run the refresh in due time. The refresh // below might block, but saves us from spawning another goroutine. if tEntry.Opts.Refresh { - c.refresh(tEntry.Opts, attempt, t, key, r) + c.refresh(tEntry.Opts, attempt, tEntry, key, r) } }() @@ -623,15 +608,7 @@ func (c *Cache) fetch(t, key string, r Request, allowNew bool, attempt uint, min // fetchDirect fetches the given request with no caching. Because this // bypasses the caching entirely, multiple matching requests will result // in multiple actual RPC calls (unlike fetch). -func (c *Cache) fetchDirect(t string, r Request, minIndex uint64) (interface{}, ResultMeta, error) { - // Get the type that we're fetching - c.typesLock.RLock() - tEntry, ok := c.types[t] - c.typesLock.RUnlock() - if !ok { - return nil, ResultMeta{}, fmt.Errorf("unknown type in cache: %s", t) - } - +func (c *Cache) fetchDirect(tEntry typeEntry, r Request, minIndex uint64) (interface{}, ResultMeta, error) { // Fetch it with the min index specified directly by the request. result, err := tEntry.Type.Fetch(FetchOptions{ MinIndex: minIndex, @@ -661,7 +638,7 @@ func backOffWait(failures uint) time.Duration { // refresh triggers a fetch for a specific Request according to the // registration options. -func (c *Cache) refresh(opts *RegisterOptions, attempt uint, t string, key string, r Request) { +func (c *Cache) refresh(opts *RegisterOptions, attempt uint, tEntry typeEntry, key string, r Request) { // Sanity-check, we should not schedule anything that has refresh disabled if !opts.Refresh { return @@ -684,7 +661,7 @@ func (c *Cache) refresh(opts *RegisterOptions, attempt uint, t string, key strin // Trigger. The "allowNew" field is false because in the time we were // waiting to refresh we may have expired and got evicted. If that // happened, we don't want to create a new entry. - c.fetch(t, key, r, false, attempt, 0, true, true) + c.fetch(tEntry, key, r, false, attempt, 0, true, true) } // runExpiryLoop is a blocking function that watches the expiration diff --git a/agent/cache/watch.go b/agent/cache/watch.go index fca176fe0..af3b097c5 100644 --- a/agent/cache/watch.go +++ b/agent/cache/watch.go @@ -61,19 +61,19 @@ func (c *Cache) Notify(ctx context.Context, t string, r Request, return fmt.Errorf("unknown type in cache: %s", t) } if tEntry.Type.SupportsBlocking() { - go c.notifyBlockingQuery(ctx, t, r, correlationID, ch) + go c.notifyBlockingQuery(ctx, tEntry, r, correlationID, ch) } else { info := r.CacheInfo() if info.MaxAge == 0 { return fmt.Errorf("Cannot use Notify for polling cache types without specifying the MaxAge") } - go c.notifyPollingQuery(ctx, t, r, correlationID, ch, info.MaxAge) + go c.notifyPollingQuery(ctx, tEntry, r, correlationID, ch, info.MaxAge) } return nil } -func (c *Cache) notifyBlockingQuery(ctx context.Context, t string, r Request, correlationID string, ch chan<- UpdateEvent) { +func (c *Cache) notifyBlockingQuery(ctx context.Context, tEntry typeEntry, r Request, correlationID string, ch chan<- UpdateEvent) { // Always start at 0 index to deliver the initial (possibly currently cached // value). index := uint64(0) @@ -86,7 +86,7 @@ func (c *Cache) notifyBlockingQuery(ctx context.Context, t string, r Request, co } // Blocking request - res, meta, err := c.getWithIndex(t, r, index) + res, meta, err := c.getWithIndex(tEntry, r, index) // Check context hasn't been canceled if ctx.Err() != nil { @@ -132,7 +132,7 @@ func (c *Cache) notifyBlockingQuery(ctx context.Context, t string, r Request, co } } -func (c *Cache) notifyPollingQuery(ctx context.Context, t string, r Request, correlationID string, ch chan<- UpdateEvent, maxAge time.Duration) { +func (c *Cache) notifyPollingQuery(ctx context.Context, tEntry typeEntry, r Request, correlationID string, ch chan<- UpdateEvent, maxAge time.Duration) { index := uint64(0) failures := uint(0) @@ -145,7 +145,7 @@ func (c *Cache) notifyPollingQuery(ctx context.Context, t string, r Request, cor } // Make the request - res, meta, err := c.getWithIndex(t, r, index) + res, meta, err := c.getWithIndex(tEntry, r, index) // Check context hasn't been canceled if ctx.Err() != nil {