agent/cache: move typeEntry lookup to the edge

This change moves all the typeEntry lookups to the first step in the exported methods,
and makes unexporter internals accept the typeEntry struct.

This change is primarily intended to make it easier to extract the container of caches
from the Cache type.

It may incidentally reduce locking in fetch, but that was not a goal.
This commit is contained in:
Daniel Nephin 2020-04-03 16:01:56 -04:00
parent f5eb6ab539
commit ab068325da
2 changed files with 33 additions and 56 deletions

77
agent/cache/cache.go vendored
View File

@ -83,6 +83,8 @@ type Cache struct {
// typeEntry is a single type that is registered with a Cache.
type typeEntry struct {
// Name that was used to register the Type
Name string
Type Type
Opts *RegisterOptions
}
@ -193,7 +195,7 @@ func (c *Cache) RegisterType(n string, typ Type, opts *RegisterOptions) {
c.typesLock.Lock()
defer c.typesLock.Unlock()
c.types[n] = typeEntry{Type: typ, Opts: opts}
c.types[n] = typeEntry{Name: n, Type: typ, Opts: opts}
}
// Get loads the data for the given type and request. If data satisfying the
@ -211,7 +213,15 @@ func (c *Cache) RegisterType(n string, typ Type, opts *RegisterOptions) {
// error is returned on timeout. This matches the behavior of Consul blocking
// queries.
func (c *Cache) Get(t string, r Request) (interface{}, ResultMeta, error) {
return c.getWithIndex(t, r, r.CacheInfo().MinIndex)
c.typesLock.RLock()
tEntry, ok := c.types[t]
c.typesLock.RUnlock()
if !ok {
// Shouldn't happen given that we successfully fetched this at least
// once. But be robust against panics.
return nil, ResultMeta{}, fmt.Errorf("unknown type in cache: %s", t)
}
return c.getWithIndex(tEntry, r, r.CacheInfo().MinIndex)
}
// getEntryLocked retrieves a cache entry and checks if it is ready to be
@ -258,18 +268,17 @@ func (c *Cache) getEntryLocked(tEntry typeEntry, key string, maxAge time.Duratio
// getWithIndex implements the main Get functionality but allows internal
// callers (Watch) to manipulate the blocking index separately from the actual
// request object.
func (c *Cache) getWithIndex(t string, r Request, minIndex uint64) (interface{}, ResultMeta, error) {
func (c *Cache) getWithIndex(tEntry typeEntry, r Request, minIndex uint64) (interface{}, ResultMeta, error) {
info := r.CacheInfo()
if info.Key == "" {
metrics.IncrCounter([]string{"consul", "cache", "bypass"}, 1)
// If no key is specified, then we do not cache this request.
// Pass directly through to the backend.
return c.fetchDirect(t, r, minIndex)
return c.fetchDirect(tEntry, r, minIndex)
}
// Get the actual key for our entry
key := c.entryKey(t, &info)
key := makeEntryKey(tEntry.Name, info.Datacenter, info.Token, info.Key)
// First time through
first := true
@ -278,16 +287,6 @@ func (c *Cache) getWithIndex(t string, r Request, minIndex uint64) (interface{},
var timeoutCh <-chan time.Time
RETRY_GET:
// Get the type that we're fetching
c.typesLock.RLock()
tEntry, ok := c.types[t]
c.typesLock.RUnlock()
if !ok {
// Shouldn't happen given that we successfully fetched this at least
// once. But be robust against panics.
return nil, ResultMeta{}, fmt.Errorf("unknown type in cache: %s", t)
}
// Get the current value
c.entriesLock.RLock()
_, cacheHit, entry := c.getEntryLocked(tEntry, key, info.MaxAge, info.MustRevalidate && first, minIndex)
@ -296,7 +295,7 @@ RETRY_GET:
if cacheHit {
meta := ResultMeta{Index: entry.Index}
if first {
metrics.IncrCounter([]string{"consul", "cache", t, "hit"}, 1)
metrics.IncrCounter([]string{"consul", "cache", tEntry.Name, "hit"}, 1)
meta.Hit = true
}
@ -351,11 +350,11 @@ RETRY_GET:
// We increment two different counters for cache misses depending on
// whether we're missing because we didn't have the data at all,
// or if we're missing because we're blocking on a set index.
missKey := "miss_block"
if minIndex == 0 {
metrics.IncrCounter([]string{"consul", "cache", t, "miss_new"}, 1)
} else {
metrics.IncrCounter([]string{"consul", "cache", t, "miss_block"}, 1)
missKey = "miss_new"
}
metrics.IncrCounter([]string{"consul", "cache", tEntry.Name, missKey}, 1)
}
// Set our timeout channel if we must
@ -365,7 +364,7 @@ RETRY_GET:
// At this point, we know we either don't have a value at all or the
// value we have is too old. We need to wait for new data.
waiterCh, err := c.fetch(t, key, r, true, 0, minIndex, false, !first)
waiterCh, err := c.fetch(tEntry, key, r, true, 0, minIndex, false, !first)
if err != nil {
return nil, ResultMeta{Index: entry.Index}, err
}
@ -384,12 +383,6 @@ RETRY_GET:
}
}
// entryKey returns the key for the entry in the cache. See the note
// about the entry key format in the structure docs for Cache.
func (c *Cache) entryKey(t string, r *RequestInfo) string {
return makeEntryKey(t, r.Datacenter, r.Token, r.Key)
}
func makeEntryKey(t, dc, token, key string) string {
return fmt.Sprintf("%s/%s/%s/%s", t, dc, token, key)
}
@ -402,15 +395,7 @@ func makeEntryKey(t, dc, token, key string) string {
// If allowNew is true then the fetch should create the cache entry
// if it doesn't exist. If this is false, then fetch will do nothing
// if the entry doesn't exist. This latter case is to support refreshing.
func (c *Cache) fetch(t, key string, r Request, allowNew bool, attempt uint, minIndex uint64, ignoreExisting bool, ignoreRevalidation bool) (<-chan struct{}, error) {
// Get the type that we're fetching
c.typesLock.RLock()
tEntry, ok := c.types[t]
c.typesLock.RUnlock()
if !ok {
return nil, fmt.Errorf("unknown type in cache: %s", t)
}
func (c *Cache) fetch(tEntry typeEntry, key string, r Request, allowNew bool, attempt uint, minIndex uint64, ignoreExisting bool, ignoreRevalidation bool) (<-chan struct{}, error) {
info := r.CacheInfo()
// We acquire a write lock because we may have to set Fetching to true.
@ -543,7 +528,7 @@ func (c *Cache) fetch(t, key string, r Request, allowNew bool, attempt uint, min
// Error handling
if err == nil {
metrics.IncrCounter([]string{"consul", "cache", "fetch_success"}, 1)
metrics.IncrCounter([]string{"consul", "cache", t, "fetch_success"}, 1)
metrics.IncrCounter([]string{"consul", "cache", tEntry.Name, "fetch_success"}, 1)
if result.Index > 0 {
// Reset the attempts counter so we don't have any backoff
@ -572,7 +557,7 @@ func (c *Cache) fetch(t, key string, r Request, allowNew bool, attempt uint, min
}
} else {
metrics.IncrCounter([]string{"consul", "cache", "fetch_error"}, 1)
metrics.IncrCounter([]string{"consul", "cache", t, "fetch_error"}, 1)
metrics.IncrCounter([]string{"consul", "cache", tEntry.Name, "fetch_error"}, 1)
// Increment attempt counter
attempt++
@ -613,7 +598,7 @@ func (c *Cache) fetch(t, key string, r Request, allowNew bool, attempt uint, min
// If refresh is enabled, run the refresh in due time. The refresh
// below might block, but saves us from spawning another goroutine.
if tEntry.Opts.Refresh {
c.refresh(tEntry.Opts, attempt, t, key, r)
c.refresh(tEntry.Opts, attempt, tEntry, key, r)
}
}()
@ -623,15 +608,7 @@ func (c *Cache) fetch(t, key string, r Request, allowNew bool, attempt uint, min
// fetchDirect fetches the given request with no caching. Because this
// bypasses the caching entirely, multiple matching requests will result
// in multiple actual RPC calls (unlike fetch).
func (c *Cache) fetchDirect(t string, r Request, minIndex uint64) (interface{}, ResultMeta, error) {
// Get the type that we're fetching
c.typesLock.RLock()
tEntry, ok := c.types[t]
c.typesLock.RUnlock()
if !ok {
return nil, ResultMeta{}, fmt.Errorf("unknown type in cache: %s", t)
}
func (c *Cache) fetchDirect(tEntry typeEntry, r Request, minIndex uint64) (interface{}, ResultMeta, error) {
// Fetch it with the min index specified directly by the request.
result, err := tEntry.Type.Fetch(FetchOptions{
MinIndex: minIndex,
@ -661,7 +638,7 @@ func backOffWait(failures uint) time.Duration {
// refresh triggers a fetch for a specific Request according to the
// registration options.
func (c *Cache) refresh(opts *RegisterOptions, attempt uint, t string, key string, r Request) {
func (c *Cache) refresh(opts *RegisterOptions, attempt uint, tEntry typeEntry, key string, r Request) {
// Sanity-check, we should not schedule anything that has refresh disabled
if !opts.Refresh {
return
@ -684,7 +661,7 @@ func (c *Cache) refresh(opts *RegisterOptions, attempt uint, t string, key strin
// Trigger. The "allowNew" field is false because in the time we were
// waiting to refresh we may have expired and got evicted. If that
// happened, we don't want to create a new entry.
c.fetch(t, key, r, false, attempt, 0, true, true)
c.fetch(tEntry, key, r, false, attempt, 0, true, true)
}
// runExpiryLoop is a blocking function that watches the expiration

12
agent/cache/watch.go vendored
View File

@ -61,19 +61,19 @@ func (c *Cache) Notify(ctx context.Context, t string, r Request,
return fmt.Errorf("unknown type in cache: %s", t)
}
if tEntry.Type.SupportsBlocking() {
go c.notifyBlockingQuery(ctx, t, r, correlationID, ch)
go c.notifyBlockingQuery(ctx, tEntry, r, correlationID, ch)
} else {
info := r.CacheInfo()
if info.MaxAge == 0 {
return fmt.Errorf("Cannot use Notify for polling cache types without specifying the MaxAge")
}
go c.notifyPollingQuery(ctx, t, r, correlationID, ch, info.MaxAge)
go c.notifyPollingQuery(ctx, tEntry, r, correlationID, ch, info.MaxAge)
}
return nil
}
func (c *Cache) notifyBlockingQuery(ctx context.Context, t string, r Request, correlationID string, ch chan<- UpdateEvent) {
func (c *Cache) notifyBlockingQuery(ctx context.Context, tEntry typeEntry, r Request, correlationID string, ch chan<- UpdateEvent) {
// Always start at 0 index to deliver the initial (possibly currently cached
// value).
index := uint64(0)
@ -86,7 +86,7 @@ func (c *Cache) notifyBlockingQuery(ctx context.Context, t string, r Request, co
}
// Blocking request
res, meta, err := c.getWithIndex(t, r, index)
res, meta, err := c.getWithIndex(tEntry, r, index)
// Check context hasn't been canceled
if ctx.Err() != nil {
@ -132,7 +132,7 @@ func (c *Cache) notifyBlockingQuery(ctx context.Context, t string, r Request, co
}
}
func (c *Cache) notifyPollingQuery(ctx context.Context, t string, r Request, correlationID string, ch chan<- UpdateEvent, maxAge time.Duration) {
func (c *Cache) notifyPollingQuery(ctx context.Context, tEntry typeEntry, r Request, correlationID string, ch chan<- UpdateEvent, maxAge time.Duration) {
index := uint64(0)
failures := uint(0)
@ -145,7 +145,7 @@ func (c *Cache) notifyPollingQuery(ctx context.Context, t string, r Request, cor
}
// Make the request
res, meta, err := c.getWithIndex(t, r, index)
res, meta, err := c.getWithIndex(tEntry, r, index)
// Check context hasn't been canceled
if ctx.Err() != nil {