agent/cache: reduce function arguments by removing duplicates

A few of the unexported functions in agent/cache took a large number of
arguments. These arguments were effectively overrides for values that
were provided in RequestInfo.

By using a struct we can not only reduce the number of arguments, but
also simplify the logic by removing the need for overrides.
This commit is contained in:
Daniel Nephin 2020-04-13 14:49:13 -04:00
parent 7536b41e8f
commit d015d3c563
2 changed files with 60 additions and 44 deletions

84
agent/cache/cache.go vendored
View File

@ -225,7 +225,28 @@ func (c *Cache) Get(t string, r Request) (interface{}, ResultMeta, error) {
// once. But be robust against panics. // once. But be robust against panics.
return nil, ResultMeta{}, fmt.Errorf("unknown type in cache: %s", t) return nil, ResultMeta{}, fmt.Errorf("unknown type in cache: %s", t)
} }
return c.getWithIndex(tEntry, r, r.CacheInfo().MinIndex) return c.getWithIndex(newGetOptions(tEntry, r))
}
// getOptions contains the arguments for a Get request. It is used in place of
// Request so that internal functions can modify Info without having to extract
// it from the Request each time.
type getOptions struct {
// Fetch is a closure over tEntry.Type.Fetch which provides the original
// Request from the caller.
Fetch func(opts FetchOptions) (FetchResult, error)
Info RequestInfo
TypeEntry typeEntry
}
func newGetOptions(tEntry typeEntry, r Request) getOptions {
return getOptions{
Fetch: func(opts FetchOptions) (FetchResult, error) {
return tEntry.Type.Fetch(opts, r)
},
Info: r.CacheInfo(),
TypeEntry: tEntry,
}
} }
// getEntryLocked retrieves a cache entry and checks if it is ready to be // getEntryLocked retrieves a cache entry and checks if it is ready to be
@ -234,9 +255,7 @@ func (c *Cache) Get(t string, r Request) (interface{}, ResultMeta, error) {
func (c *Cache) getEntryLocked( func (c *Cache) getEntryLocked(
tEntry typeEntry, tEntry typeEntry,
key string, key string,
maxAge time.Duration, info RequestInfo,
revalidate bool,
minIndex uint64,
) (entryExists bool, entryValid bool, entry cacheEntry) { ) (entryExists bool, entryValid bool, entry cacheEntry) {
entry, ok := c.entries[key] entry, ok := c.entries[key]
if !entry.Valid { if !entry.Valid {
@ -245,7 +264,7 @@ func (c *Cache) getEntryLocked(
// Check index is not specified or lower than value, or the type doesn't // Check index is not specified or lower than value, or the type doesn't
// support blocking. // support blocking.
if tEntry.Opts.SupportsBlocking && minIndex > 0 && minIndex >= entry.Index { if tEntry.Opts.SupportsBlocking && info.MinIndex > 0 && info.MinIndex >= entry.Index {
// MinIndex was given and matches or is higher than current value so we // MinIndex was given and matches or is higher than current value so we
// ignore the cache and fallthrough to blocking on a new value below. // ignore the cache and fallthrough to blocking on a new value below.
return true, false, entry return true, false, entry
@ -253,13 +272,13 @@ func (c *Cache) getEntryLocked(
// Check MaxAge is not exceeded if this is not a background refreshing type // Check MaxAge is not exceeded if this is not a background refreshing type
// and MaxAge was specified. // and MaxAge was specified.
if !tEntry.Opts.Refresh && maxAge > 0 && entryExceedsMaxAge(maxAge, entry) { if !tEntry.Opts.Refresh && info.MaxAge > 0 && entryExceedsMaxAge(info.MaxAge, entry) {
return true, false, entry return true, false, entry
} }
// Check if re-validate is requested. If so the first time round the // Check if re-validate is requested. If so the first time round the
// loop is not a hit but subsequent ones should be treated normally. // loop is not a hit but subsequent ones should be treated normally.
if !tEntry.Opts.Refresh && revalidate { if !tEntry.Opts.Refresh && info.MustRevalidate {
return true, false, entry return true, false, entry
} }
@ -273,17 +292,17 @@ func entryExceedsMaxAge(maxAge time.Duration, entry cacheEntry) bool {
// getWithIndex implements the main Get functionality but allows internal // getWithIndex implements the main Get functionality but allows internal
// callers (Watch) to manipulate the blocking index separately from the actual // callers (Watch) to manipulate the blocking index separately from the actual
// request object. // request object.
func (c *Cache) getWithIndex(tEntry typeEntry, r Request, minIndex uint64) (interface{}, ResultMeta, error) { func (c *Cache) getWithIndex(r getOptions) (interface{}, ResultMeta, error) {
info := r.CacheInfo() if r.Info.Key == "" {
if info.Key == "" {
metrics.IncrCounter([]string{"consul", "cache", "bypass"}, 1) metrics.IncrCounter([]string{"consul", "cache", "bypass"}, 1)
// If no key is specified, then we do not cache this request. // If no key is specified, then we do not cache this request.
// Pass directly through to the backend. // Pass directly through to the backend.
return c.fetchDirect(tEntry, r, minIndex) result, err := r.Fetch(FetchOptions{MinIndex: r.Info.MinIndex})
return result.Value, ResultMeta{}, err
} }
key := makeEntryKey(tEntry.Name, info.Datacenter, info.Token, info.Key) key := makeEntryKey(r.TypeEntry.Name, r.Info.Datacenter, r.Info.Token, r.Info.Key)
// First time through // First time through
first := true first := true
@ -294,19 +313,19 @@ func (c *Cache) getWithIndex(tEntry typeEntry, r Request, minIndex uint64) (inte
RETRY_GET: RETRY_GET:
// Get the current value // Get the current value
c.entriesLock.RLock() c.entriesLock.RLock()
_, entryValid, entry := c.getEntryLocked(tEntry, key, info.MaxAge, info.MustRevalidate && first, minIndex) _, entryValid, entry := c.getEntryLocked(r.TypeEntry, key, r.Info)
c.entriesLock.RUnlock() c.entriesLock.RUnlock()
if entryValid { if entryValid {
meta := ResultMeta{Index: entry.Index} meta := ResultMeta{Index: entry.Index}
if first { if first {
metrics.IncrCounter([]string{"consul", "cache", tEntry.Name, "hit"}, 1) metrics.IncrCounter([]string{"consul", "cache", r.TypeEntry.Name, "hit"}, 1)
meta.Hit = true meta.Hit = true
} }
// If refresh is enabled, calculate age based on whether the background // If refresh is enabled, calculate age based on whether the background
// routine is still connected. // routine is still connected.
if tEntry.Opts.Refresh { if r.TypeEntry.Opts.Refresh {
meta.Age = time.Duration(0) meta.Age = time.Duration(0)
if !entry.RefreshLostContact.IsZero() { if !entry.RefreshLostContact.IsZero() {
meta.Age = time.Since(entry.RefreshLostContact) meta.Age = time.Since(entry.RefreshLostContact)
@ -321,7 +340,7 @@ RETRY_GET:
// Touch the expiration and fix the heap. // Touch the expiration and fix the heap.
c.entriesLock.Lock() c.entriesLock.Lock()
entry.Expiry.Update(tEntry.Opts.LastGetTTL) entry.Expiry.Update(r.TypeEntry.Opts.LastGetTTL)
c.entriesExpiryHeap.Fix(entry.Expiry) c.entriesExpiryHeap.Fix(entry.Expiry)
c.entriesLock.Unlock() c.entriesLock.Unlock()
@ -356,20 +375,20 @@ RETRY_GET:
// whether we're missing because we didn't have the data at all, // whether we're missing because we didn't have the data at all,
// or if we're missing because we're blocking on a set index. // or if we're missing because we're blocking on a set index.
missKey := "miss_block" missKey := "miss_block"
if minIndex == 0 { if r.Info.MinIndex == 0 {
missKey = "miss_new" missKey = "miss_new"
} }
metrics.IncrCounter([]string{"consul", "cache", tEntry.Name, missKey}, 1) metrics.IncrCounter([]string{"consul", "cache", r.TypeEntry.Name, missKey}, 1)
} }
// Set our timeout channel if we must // Set our timeout channel if we must
if info.Timeout > 0 && timeoutCh == nil { if r.Info.Timeout > 0 && timeoutCh == nil {
timeoutCh = time.After(info.Timeout) timeoutCh = time.After(r.Info.Timeout)
} }
// At this point, we know we either don't have a value at all or the // At this point, we know we either don't have a value at all or the
// value we have is too old. We need to wait for new data. // value we have is too old. We need to wait for new data.
waiterCh, err := c.fetch(tEntry, key, r, true, 0, minIndex, false, !first) waiterCh, err := c.fetch(key, r, true, 0, false)
if err != nil { if err != nil {
return nil, ResultMeta{Index: entry.Index}, err return nil, ResultMeta{Index: entry.Index}, err
} }
@ -380,6 +399,7 @@ RETRY_GET:
select { select {
case <-waiterCh: case <-waiterCh:
// Our fetch returned, retry the get from the cache. // Our fetch returned, retry the get from the cache.
r.Info.MustRevalidate = false
goto RETRY_GET goto RETRY_GET
case <-timeoutCh: case <-timeoutCh:
@ -400,13 +420,13 @@ func makeEntryKey(t, dc, token, key string) string {
// If allowNew is true then the fetch should create the cache entry // If allowNew is true then the fetch should create the cache entry
// if it doesn't exist. If this is false, then fetch will do nothing // if it doesn't exist. If this is false, then fetch will do nothing
// if the entry doesn't exist. This latter case is to support refreshing. // if the entry doesn't exist. This latter case is to support refreshing.
func (c *Cache) fetch(tEntry typeEntry, key string, r Request, allowNew bool, attempt uint, minIndex uint64, ignoreExisting bool, ignoreRevalidation bool) (<-chan struct{}, error) { func (c *Cache) fetch(key string, r getOptions, allowNew bool, attempt uint, ignoreExisting bool) (<-chan struct{}, error) {
info := r.CacheInfo() info := r.Info
// We acquire a write lock because we may have to set Fetching to true. // We acquire a write lock because we may have to set Fetching to true.
c.entriesLock.Lock() c.entriesLock.Lock()
defer c.entriesLock.Unlock() defer c.entriesLock.Unlock()
ok, entryValid, entry := c.getEntryLocked(tEntry, key, info.MaxAge, info.MustRevalidate && !ignoreRevalidation, minIndex) ok, entryValid, entry := c.getEntryLocked(r.TypeEntry, key, info)
// This handles the case where a fetch succeeded after checking for its existence in // This handles the case where a fetch succeeded after checking for its existence in
// getWithIndex. This ensures that we don't miss updates. // getWithIndex. This ensures that we don't miss updates.
@ -444,6 +464,7 @@ func (c *Cache) fetch(tEntry typeEntry, key string, r Request, allowNew bool, at
c.entries[key] = entry c.entries[key] = entry
metrics.SetGauge([]string{"consul", "cache", "entries_count"}, float32(len(c.entries))) metrics.SetGauge([]string{"consul", "cache", "entries_count"}, float32(len(c.entries)))
tEntry := r.TypeEntry
// The actual Fetch must be performed in a goroutine. // The actual Fetch must be performed in a goroutine.
go func() { go func() {
// If we have background refresh and currently are in "disconnected" state, // If we have background refresh and currently are in "disconnected" state,
@ -482,7 +503,7 @@ func (c *Cache) fetch(tEntry typeEntry, key string, r Request, allowNew bool, at
} }
// Start building the new entry by blocking on the fetch. // Start building the new entry by blocking on the fetch.
result, err := tEntry.Type.Fetch(fOpts, r) result, err := r.Fetch(fOpts)
if connectedTimer != nil { if connectedTimer != nil {
connectedTimer.Stop() connectedTimer.Stop()
} }
@ -618,22 +639,15 @@ func (c *Cache) fetch(tEntry typeEntry, key string, r Request, allowNew bool, at
// Trigger. The "allowNew" field is false because in the time we were // Trigger. The "allowNew" field is false because in the time we were
// waiting to refresh we may have expired and got evicted. If that // waiting to refresh we may have expired and got evicted. If that
// happened, we don't want to create a new entry. // happened, we don't want to create a new entry.
c.fetch(tEntry, key, r, false, attempt, 0, true, true) r.Info.MustRevalidate = false
r.Info.MinIndex = 0
c.fetch(key, r, false, attempt, true)
} }
}() }()
return entry.Waiter, nil return entry.Waiter, nil
} }
// fetchDirect fetches the given request with no caching. Because this
// bypasses the caching entirely, multiple matching requests will result
// in multiple actual RPC calls (unlike fetch).
func (c *Cache) fetchDirect(tEntry typeEntry, r Request, minIndex uint64) (interface{}, ResultMeta, error) {
// Fetch it with the min index specified directly by the request.
result, err := tEntry.Type.Fetch(FetchOptions{MinIndex: minIndex}, r)
return result.Value, ResultMeta{}, err
}
func backOffWait(failures uint) time.Duration { func backOffWait(failures uint) time.Duration {
if failures > CacheRefreshBackoffMin { if failures > CacheRefreshBackoffMin {
shift := failures - CacheRefreshBackoffMin shift := failures - CacheRefreshBackoffMin

20
agent/cache/watch.go vendored
View File

@ -65,7 +65,7 @@ func (c *Cache) Notify(
} }
if tEntry.Opts.SupportsBlocking { if tEntry.Opts.SupportsBlocking {
go c.notifyBlockingQuery(ctx, tEntry, r, correlationID, ch) go c.notifyBlockingQuery(ctx, newGetOptions(tEntry, r), correlationID, ch)
return nil return nil
} }
@ -73,11 +73,11 @@ func (c *Cache) Notify(
if info.MaxAge == 0 { if info.MaxAge == 0 {
return fmt.Errorf("Cannot use Notify for polling cache types without specifying the MaxAge") return fmt.Errorf("Cannot use Notify for polling cache types without specifying the MaxAge")
} }
go c.notifyPollingQuery(ctx, tEntry, r, correlationID, ch, info.MaxAge) go c.notifyPollingQuery(ctx, newGetOptions(tEntry, r), correlationID, ch)
return nil return nil
} }
func (c *Cache) notifyBlockingQuery(ctx context.Context, tEntry typeEntry, r Request, correlationID string, ch chan<- UpdateEvent) { func (c *Cache) notifyBlockingQuery(ctx context.Context, r getOptions, correlationID string, ch chan<- UpdateEvent) {
// Always start at 0 index to deliver the initial (possibly currently cached // Always start at 0 index to deliver the initial (possibly currently cached
// value). // value).
index := uint64(0) index := uint64(0)
@ -90,7 +90,8 @@ func (c *Cache) notifyBlockingQuery(ctx context.Context, tEntry typeEntry, r Req
} }
// Blocking request // Blocking request
res, meta, err := c.getWithIndex(tEntry, r, index) r.Info.MinIndex = index
res, meta, err := c.getWithIndex(r)
// Check context hasn't been canceled // Check context hasn't been canceled
if ctx.Err() != nil { if ctx.Err() != nil {
@ -136,7 +137,7 @@ func (c *Cache) notifyBlockingQuery(ctx context.Context, tEntry typeEntry, r Req
} }
} }
func (c *Cache) notifyPollingQuery(ctx context.Context, tEntry typeEntry, r Request, correlationID string, ch chan<- UpdateEvent, maxAge time.Duration) { func (c *Cache) notifyPollingQuery(ctx context.Context, r getOptions, correlationID string, ch chan<- UpdateEvent) {
index := uint64(0) index := uint64(0)
failures := uint(0) failures := uint(0)
@ -149,7 +150,8 @@ func (c *Cache) notifyPollingQuery(ctx context.Context, tEntry typeEntry, r Requ
} }
// Make the request // Make the request
res, meta, err := c.getWithIndex(tEntry, r, index) r.Info.MinIndex = index
res, meta, err := c.getWithIndex(r)
// Check context hasn't been canceled // Check context hasn't been canceled
if ctx.Err() != nil { if ctx.Err() != nil {
@ -204,8 +206,8 @@ func (c *Cache) notifyPollingQuery(ctx context.Context, tEntry typeEntry, r Requ
// Calculate when the cached data's Age will get too stale and // Calculate when the cached data's Age will get too stale and
// need to be re-queried. When the data's Age already exceeds the // need to be re-queried. When the data's Age already exceeds the
// maxAge the pollWait value is left at 0 to immediately re-poll // maxAge the pollWait value is left at 0 to immediately re-poll
if meta.Age <= maxAge { if meta.Age <= r.Info.MaxAge {
wait = maxAge - meta.Age wait = r.Info.MaxAge - meta.Age
} }
// Add a small amount of random jitter to the polling time. One // Add a small amount of random jitter to the polling time. One
@ -217,7 +219,7 @@ func (c *Cache) notifyPollingQuery(ctx context.Context, tEntry typeEntry, r Requ
// and then immediately have to re-fetch again. That wouldn't // and then immediately have to re-fetch again. That wouldn't
// be terrible but it would expend a bunch more cpu cycles when // be terrible but it would expend a bunch more cpu cycles when
// we can definitely avoid it. // we can definitely avoid it.
wait += lib.RandomStagger(maxAge / 16) wait += lib.RandomStagger(r.Info.MaxAge / 16)
} }
select { select {