agent/cache: Reduce differences between notify implementations
These two notify functions are very similar. There appear to be just enough differences that trying to parameterize the differences may not improve things. For now, reduce some of the cosmetic differences so that the material differences are more obvious.
This commit is contained in:
parent
4d398d26ae
commit
371cf05340
41
agent/cache/watch.go
vendored
41
agent/cache/watch.go
vendored
|
@ -50,26 +50,30 @@ type UpdateEvent struct {
|
|||
// value that allows them to disambiguate between events in the returned chan
|
||||
// when sharing a chan between multiple cache entries. If the chan is closed,
|
||||
// the notify loop will terminate.
|
||||
func (c *Cache) Notify(ctx context.Context, t string, r Request,
|
||||
correlationID string, ch chan<- UpdateEvent) error {
|
||||
|
||||
// Get the type that we're fetching
|
||||
func (c *Cache) Notify(
|
||||
ctx context.Context,
|
||||
t string,
|
||||
r Request,
|
||||
correlationID string,
|
||||
ch chan<- UpdateEvent,
|
||||
) error {
|
||||
c.typesLock.RLock()
|
||||
tEntry, ok := c.types[t]
|
||||
c.typesLock.RUnlock()
|
||||
if !ok {
|
||||
return fmt.Errorf("unknown type in cache: %s", t)
|
||||
}
|
||||
|
||||
if tEntry.Type.SupportsBlocking() {
|
||||
go c.notifyBlockingQuery(ctx, tEntry, r, correlationID, ch)
|
||||
} else {
|
||||
return nil
|
||||
}
|
||||
|
||||
info := r.CacheInfo()
|
||||
if info.MaxAge == 0 {
|
||||
return fmt.Errorf("Cannot use Notify for polling cache types without specifying the MaxAge")
|
||||
}
|
||||
go c.notifyPollingQuery(ctx, tEntry, r, correlationID, ch, info.MaxAge)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -107,10 +111,10 @@ func (c *Cache) notifyBlockingQuery(ctx context.Context, tEntry typeEntry, r Req
|
|||
index = meta.Index
|
||||
}
|
||||
|
||||
var wait time.Duration
|
||||
// Handle errors with backoff. Badly behaved blocking calls that returned
|
||||
// a zero index are considered as failures since we need to not get stuck
|
||||
// in a busy loop.
|
||||
wait := 0 * time.Second
|
||||
if err == nil && meta.Index > 0 {
|
||||
failures = 0
|
||||
} else {
|
||||
|
@ -173,6 +177,7 @@ func (c *Cache) notifyPollingQuery(ctx context.Context, tEntry typeEntry, r Requ
|
|||
failures++
|
||||
}
|
||||
|
||||
var wait time.Duration
|
||||
// Determining how long to wait before the next poll is complicated.
|
||||
// First off the happy path and the error path waits are handled distinctly
|
||||
//
|
||||
|
@ -194,23 +199,13 @@ func (c *Cache) notifyPollingQuery(ctx context.Context, tEntry typeEntry, r Requ
|
|||
// as this would eliminate the single-flighting of these requests in the cache and
|
||||
// the efficiencies gained by it.
|
||||
if failures > 0 {
|
||||
|
||||
errWait := backOffWait(failures)
|
||||
select {
|
||||
case <-time.After(errWait):
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
wait = backOffWait(failures)
|
||||
} else {
|
||||
// Default to immediately re-poll. This only will happen if the data
|
||||
// we just got out of the cache is already too stale
|
||||
pollWait := 0 * time.Second
|
||||
|
||||
// Calculate when the cached data's Age will get too stale and
|
||||
// need to be re-queried. When the data's Age already exceeds the
|
||||
// maxAge the pollWait value is left at 0 to immediately re-poll
|
||||
if meta.Age <= maxAge {
|
||||
pollWait = maxAge - meta.Age
|
||||
wait = maxAge - meta.Age
|
||||
}
|
||||
|
||||
// Add a small amount of random jitter to the polling time. One
|
||||
|
@ -222,13 +217,13 @@ func (c *Cache) notifyPollingQuery(ctx context.Context, tEntry typeEntry, r Requ
|
|||
// and then immediately have to re-fetch again. That wouldn't
|
||||
// be terrible but it would expend a bunch more cpu cycles when
|
||||
// we can definitely avoid it.
|
||||
pollWait += lib.RandomStagger(maxAge / 16)
|
||||
wait += lib.RandomStagger(maxAge / 16)
|
||||
}
|
||||
|
||||
select {
|
||||
case <-time.After(pollWait):
|
||||
case <-time.After(wait):
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue