Merge pull request #7585 from hashicorp/dnephin/agent-cache
agent/cache: Small changes to hopefully improve readability
This commit is contained in:
commit
3237a55e32
|
@ -1715,15 +1715,6 @@ func (a *Agent) RPC(method string, args interface{}, reply interface{}) error {
|
|||
return a.delegate.RPC(method, args, reply)
|
||||
}
|
||||
|
||||
// SnapshotRPC performs the requested snapshot RPC against the Consul server in
|
||||
// a streaming manner. The contents of in will be read and passed along as the
|
||||
// payload, and the response message will determine the error status, and any
|
||||
// return payload will be written to out.
|
||||
func (a *Agent) SnapshotRPC(args *structs.SnapshotRequest, in io.Reader, out io.Writer,
|
||||
replyFn structs.SnapshotReplyFn) error {
|
||||
return a.delegate.SnapshotRPC(args, in, out, replyFn)
|
||||
}
|
||||
|
||||
// Leave is used to prepare the agent for a graceful shutdown
|
||||
func (a *Agent) Leave() error {
|
||||
return a.delegate.Leave()
|
||||
|
|
134
agent/cache/cache.go
vendored
134
agent/cache/cache.go
vendored
|
@ -227,42 +227,43 @@ func (c *Cache) Get(t string, r Request) (interface{}, ResultMeta, error) {
|
|||
// getEntryLocked retrieves a cache entry and checks if it is ready to be
|
||||
// returned given the other parameters. It reads from entries and the caller
|
||||
// has to issue a read lock if necessary.
|
||||
func (c *Cache) getEntryLocked(tEntry typeEntry, key string, maxAge time.Duration, revalidate bool, minIndex uint64) (bool, bool, cacheEntry) {
|
||||
func (c *Cache) getEntryLocked(
|
||||
tEntry typeEntry,
|
||||
key string,
|
||||
maxAge time.Duration,
|
||||
revalidate bool,
|
||||
minIndex uint64,
|
||||
) (entryExists bool, entryValid bool, entry cacheEntry) {
|
||||
entry, ok := c.entries[key]
|
||||
cacheHit := false
|
||||
|
||||
if !ok {
|
||||
return ok, cacheHit, entry
|
||||
if !entry.Valid {
|
||||
return ok, false, entry
|
||||
}
|
||||
|
||||
// Check if we have a hit
|
||||
cacheHit = ok && entry.Valid
|
||||
|
||||
supportsBlocking := tEntry.Type.SupportsBlocking()
|
||||
|
||||
// Check index is not specified or lower than value, or the type doesn't
|
||||
// support blocking.
|
||||
if cacheHit && supportsBlocking &&
|
||||
minIndex > 0 && minIndex >= entry.Index {
|
||||
if tEntry.Type.SupportsBlocking() && minIndex > 0 && minIndex >= entry.Index {
|
||||
// MinIndex was given and matches or is higher than current value so we
|
||||
// ignore the cache and fallthrough to blocking on a new value below.
|
||||
cacheHit = false
|
||||
return true, false, entry
|
||||
}
|
||||
|
||||
// Check MaxAge is not exceeded if this is not a background refreshing type
|
||||
// and MaxAge was specified.
|
||||
if cacheHit && !tEntry.Opts.Refresh && maxAge > 0 &&
|
||||
!entry.FetchedAt.IsZero() && maxAge < time.Since(entry.FetchedAt) {
|
||||
cacheHit = false
|
||||
if !tEntry.Opts.Refresh && maxAge > 0 && entryExceedsMaxAge(maxAge, entry) {
|
||||
return true, false, entry
|
||||
}
|
||||
|
||||
// Check if we are requested to revalidate. If so the first time round the
|
||||
// Check if re-validate is requested. If so the first time round the
|
||||
// loop is not a hit but subsequent ones should be treated normally.
|
||||
if cacheHit && !tEntry.Opts.Refresh && revalidate {
|
||||
cacheHit = false
|
||||
if !tEntry.Opts.Refresh && revalidate {
|
||||
return true, false, entry
|
||||
}
|
||||
|
||||
return ok, cacheHit, entry
|
||||
return true, true, entry
|
||||
}
|
||||
|
||||
func entryExceedsMaxAge(maxAge time.Duration, entry cacheEntry) bool {
|
||||
return !entry.FetchedAt.IsZero() && maxAge < time.Since(entry.FetchedAt)
|
||||
}
|
||||
|
||||
// getWithIndex implements the main Get functionality but allows internal
|
||||
|
@ -289,10 +290,10 @@ func (c *Cache) getWithIndex(tEntry typeEntry, r Request, minIndex uint64) (inte
|
|||
RETRY_GET:
|
||||
// Get the current value
|
||||
c.entriesLock.RLock()
|
||||
_, cacheHit, entry := c.getEntryLocked(tEntry, key, info.MaxAge, info.MustRevalidate && first, minIndex)
|
||||
_, entryValid, entry := c.getEntryLocked(tEntry, key, info.MaxAge, info.MustRevalidate && first, minIndex)
|
||||
c.entriesLock.RUnlock()
|
||||
|
||||
if cacheHit {
|
||||
if entryValid {
|
||||
meta := ResultMeta{Index: entry.Index}
|
||||
if first {
|
||||
metrics.IncrCounter([]string{"consul", "cache", tEntry.Name, "hit"}, 1)
|
||||
|
@ -316,7 +317,7 @@ RETRY_GET:
|
|||
|
||||
// Touch the expiration and fix the heap.
|
||||
c.entriesLock.Lock()
|
||||
entry.Expiry.Reset()
|
||||
entry.Expiry.Update(tEntry.Opts.LastGetTTL)
|
||||
c.entriesExpiryHeap.Fix(entry.Expiry)
|
||||
c.entriesLock.Unlock()
|
||||
|
||||
|
@ -401,11 +402,11 @@ func (c *Cache) fetch(tEntry typeEntry, key string, r Request, allowNew bool, at
|
|||
// We acquire a write lock because we may have to set Fetching to true.
|
||||
c.entriesLock.Lock()
|
||||
defer c.entriesLock.Unlock()
|
||||
ok, cacheHit, entry := c.getEntryLocked(tEntry, key, info.MaxAge, info.MustRevalidate && !ignoreRevalidation, minIndex)
|
||||
ok, entryValid, entry := c.getEntryLocked(tEntry, key, info.MaxAge, info.MustRevalidate && !ignoreRevalidation, minIndex)
|
||||
|
||||
// This handles the case where a fetch succeeded after checking for its existence in
|
||||
// getWithIndex. This ensures that we don't miss updates.
|
||||
if ok && cacheHit && !ignoreExisting {
|
||||
if ok && entryValid && !ignoreExisting {
|
||||
ch := make(chan struct{})
|
||||
close(ch)
|
||||
return ch, nil
|
||||
|
@ -581,11 +582,8 @@ func (c *Cache) fetch(tEntry typeEntry, key string, r Request, allowNew bool, at
|
|||
// initial expiry information and insert. If we're already in
|
||||
// the heap we do nothing since we're reusing the same entry.
|
||||
if newEntry.Expiry == nil || newEntry.Expiry.HeapIndex == -1 {
|
||||
newEntry.Expiry = &cacheEntryExpiry{
|
||||
Key: key,
|
||||
TTL: tEntry.Opts.LastGetTTL,
|
||||
}
|
||||
newEntry.Expiry.Reset()
|
||||
newEntry.Expiry = &cacheEntryExpiry{Key: key}
|
||||
newEntry.Expiry.Update(tEntry.Opts.LastGetTTL)
|
||||
heap.Push(c.entriesExpiryHeap, newEntry.Expiry)
|
||||
}
|
||||
|
||||
|
@ -598,7 +596,25 @@ func (c *Cache) fetch(tEntry typeEntry, key string, r Request, allowNew bool, at
|
|||
// If refresh is enabled, run the refresh in due time. The refresh
|
||||
// below might block, but saves us from spawning another goroutine.
|
||||
if tEntry.Opts.Refresh {
|
||||
c.refresh(tEntry.Opts, attempt, tEntry, key, r)
|
||||
// Check if cache was stopped
|
||||
if atomic.LoadUint32(&c.stopped) == 1 {
|
||||
return
|
||||
}
|
||||
|
||||
// If we're over the attempt minimum, start an exponential backoff.
|
||||
if wait := backOffWait(attempt); wait > 0 {
|
||||
time.Sleep(wait)
|
||||
}
|
||||
|
||||
// If we have a timer, wait for it
|
||||
if tEntry.Opts.RefreshTimer > 0 {
|
||||
time.Sleep(tEntry.Opts.RefreshTimer)
|
||||
}
|
||||
|
||||
// Trigger. The "allowNew" field is false because in the time we were
|
||||
// waiting to refresh we may have expired and got evicted. If that
|
||||
// happened, we don't want to create a new entry.
|
||||
c.fetch(tEntry, key, r, false, attempt, 0, true, true)
|
||||
}
|
||||
}()
|
||||
|
||||
|
@ -610,15 +626,8 @@ func (c *Cache) fetch(tEntry typeEntry, key string, r Request, allowNew bool, at
|
|||
// in multiple actual RPC calls (unlike fetch).
|
||||
func (c *Cache) fetchDirect(tEntry typeEntry, r Request, minIndex uint64) (interface{}, ResultMeta, error) {
|
||||
// Fetch it with the min index specified directly by the request.
|
||||
result, err := tEntry.Type.Fetch(FetchOptions{
|
||||
MinIndex: minIndex,
|
||||
}, r)
|
||||
if err != nil {
|
||||
return nil, ResultMeta{}, err
|
||||
}
|
||||
|
||||
// Return the result and ignore the rest
|
||||
return result.Value, ResultMeta{}, nil
|
||||
result, err := tEntry.Type.Fetch(FetchOptions{MinIndex: minIndex}, r)
|
||||
return result.Value, ResultMeta{}, err
|
||||
}
|
||||
|
||||
func backOffWait(failures uint) time.Duration {
|
||||
|
@ -636,34 +645,6 @@ func backOffWait(failures uint) time.Duration {
|
|||
return 0
|
||||
}
|
||||
|
||||
// refresh triggers a fetch for a specific Request according to the
|
||||
// registration options.
|
||||
func (c *Cache) refresh(opts *RegisterOptions, attempt uint, tEntry typeEntry, key string, r Request) {
|
||||
// Sanity-check, we should not schedule anything that has refresh disabled
|
||||
if !opts.Refresh {
|
||||
return
|
||||
}
|
||||
// Check if cache was stopped
|
||||
if atomic.LoadUint32(&c.stopped) == 1 {
|
||||
return
|
||||
}
|
||||
|
||||
// If we're over the attempt minimum, start an exponential backoff.
|
||||
if wait := backOffWait(attempt); wait > 0 {
|
||||
time.Sleep(wait)
|
||||
}
|
||||
|
||||
// If we have a timer, wait for it
|
||||
if opts.RefreshTimer > 0 {
|
||||
time.Sleep(opts.RefreshTimer)
|
||||
}
|
||||
|
||||
// Trigger. The "allowNew" field is false because in the time we were
|
||||
// waiting to refresh we may have expired and got evicted. If that
|
||||
// happened, we don't want to create a new entry.
|
||||
c.fetch(tEntry, key, r, false, attempt, 0, true, true)
|
||||
}
|
||||
|
||||
// runExpiryLoop is a blocking function that watches the expiration
|
||||
// heap and invalidates entries that have expired.
|
||||
func (c *Cache) runExpiryLoop() {
|
||||
|
@ -733,18 +714,15 @@ func (c *Cache) Close() error {
|
|||
// AutoEncrypt.TLS is turned on. The cache itself cannot fetch that the first
|
||||
// time because it requires a special RPCType. Subsequent runs are fine though.
|
||||
func (c *Cache) Prepopulate(t string, res FetchResult, dc, token, k string) error {
|
||||
// Check the type that we're prepolulating
|
||||
c.typesLock.RLock()
|
||||
tEntry, ok := c.types[t]
|
||||
c.typesLock.RUnlock()
|
||||
if !ok {
|
||||
return fmt.Errorf("unknown type in cache: %s", t)
|
||||
}
|
||||
key := makeEntryKey(t, dc, token, k)
|
||||
newEntry := cacheEntry{
|
||||
Valid: true, Value: res.Value, State: res.State, Index: res.Index,
|
||||
FetchedAt: time.Now(), Waiter: make(chan struct{}),
|
||||
Expiry: &cacheEntryExpiry{Key: key, TTL: tEntry.Opts.LastGetTTL},
|
||||
Valid: true,
|
||||
Value: res.Value,
|
||||
State: res.State,
|
||||
Index: res.Index,
|
||||
FetchedAt: time.Now(),
|
||||
Waiter: make(chan struct{}),
|
||||
Expiry: &cacheEntryExpiry{Key: key},
|
||||
}
|
||||
c.entriesLock.Lock()
|
||||
c.entries[key] = newEntry
|
||||
|
|
13
agent/cache/entry.go
vendored
13
agent/cache/entry.go
vendored
|
@ -47,15 +47,14 @@ type cacheEntry struct {
|
|||
// entry. Any modifications to this struct should be done only while
|
||||
// the Cache entriesLock is held.
|
||||
type cacheEntryExpiry struct {
|
||||
Key string // Key in the cache map
|
||||
Expires time.Time // Time when entry expires (monotonic clock)
|
||||
TTL time.Duration // TTL for this entry to extend when resetting
|
||||
HeapIndex int // Index in the heap
|
||||
Key string // Key in the cache map
|
||||
Expires time.Time // Time when entry expires (monotonic clock)
|
||||
HeapIndex int // Index in the heap
|
||||
}
|
||||
|
||||
// Reset resets the expiration to be the ttl duration from now.
|
||||
func (e *cacheEntryExpiry) Reset() {
|
||||
e.Expires = time.Now().Add(e.TTL)
|
||||
// Update the expiry to d time from now.
|
||||
func (e *cacheEntryExpiry) Update(d time.Duration) {
|
||||
e.Expires = time.Now().Add(d)
|
||||
}
|
||||
|
||||
// expiryHeap is a heap implementation that stores information about
|
||||
|
|
53
agent/cache/watch.go
vendored
53
agent/cache/watch.go
vendored
|
@ -50,26 +50,30 @@ type UpdateEvent struct {
|
|||
// value that allows them to disambiguate between events in the returned chan
|
||||
// when sharing a chan between multiple cache entries. If the chan is closed,
|
||||
// the notify loop will terminate.
|
||||
func (c *Cache) Notify(ctx context.Context, t string, r Request,
|
||||
correlationID string, ch chan<- UpdateEvent) error {
|
||||
|
||||
// Get the type that we're fetching
|
||||
func (c *Cache) Notify(
|
||||
ctx context.Context,
|
||||
t string,
|
||||
r Request,
|
||||
correlationID string,
|
||||
ch chan<- UpdateEvent,
|
||||
) error {
|
||||
c.typesLock.RLock()
|
||||
tEntry, ok := c.types[t]
|
||||
c.typesLock.RUnlock()
|
||||
if !ok {
|
||||
return fmt.Errorf("unknown type in cache: %s", t)
|
||||
}
|
||||
|
||||
if tEntry.Type.SupportsBlocking() {
|
||||
go c.notifyBlockingQuery(ctx, tEntry, r, correlationID, ch)
|
||||
} else {
|
||||
info := r.CacheInfo()
|
||||
if info.MaxAge == 0 {
|
||||
return fmt.Errorf("Cannot use Notify for polling cache types without specifying the MaxAge")
|
||||
}
|
||||
go c.notifyPollingQuery(ctx, tEntry, r, correlationID, ch, info.MaxAge)
|
||||
return nil
|
||||
}
|
||||
|
||||
info := r.CacheInfo()
|
||||
if info.MaxAge == 0 {
|
||||
return fmt.Errorf("Cannot use Notify for polling cache types without specifying the MaxAge")
|
||||
}
|
||||
go c.notifyPollingQuery(ctx, tEntry, r, correlationID, ch, info.MaxAge)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -107,10 +111,10 @@ func (c *Cache) notifyBlockingQuery(ctx context.Context, tEntry typeEntry, r Req
|
|||
index = meta.Index
|
||||
}
|
||||
|
||||
var wait time.Duration
|
||||
// Handle errors with backoff. Badly behaved blocking calls that returned
|
||||
// a zero index are considered as failures since we need to not get stuck
|
||||
// in a busy loop.
|
||||
wait := 0 * time.Second
|
||||
if err == nil && meta.Index > 0 {
|
||||
failures = 0
|
||||
} else {
|
||||
|
@ -173,6 +177,7 @@ func (c *Cache) notifyPollingQuery(ctx context.Context, tEntry typeEntry, r Requ
|
|||
failures++
|
||||
}
|
||||
|
||||
var wait time.Duration
|
||||
// Determining how long to wait before the next poll is complicated.
|
||||
// First off the happy path and the error path waits are handled distinctly
|
||||
//
|
||||
|
@ -194,23 +199,13 @@ func (c *Cache) notifyPollingQuery(ctx context.Context, tEntry typeEntry, r Requ
|
|||
// as this would eliminate the single-flighting of these requests in the cache and
|
||||
// the efficiencies gained by it.
|
||||
if failures > 0 {
|
||||
|
||||
errWait := backOffWait(failures)
|
||||
select {
|
||||
case <-time.After(errWait):
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
wait = backOffWait(failures)
|
||||
} else {
|
||||
// Default to immediately re-poll. This only will happen if the data
|
||||
// we just got out of the cache is already too stale
|
||||
pollWait := 0 * time.Second
|
||||
|
||||
// Calculate when the cached data's Age will get too stale and
|
||||
// need to be re-queried. When the data's Age already exceeds the
|
||||
// maxAge the pollWait value is left at 0 to immediately re-poll
|
||||
if meta.Age <= maxAge {
|
||||
pollWait = maxAge - meta.Age
|
||||
wait = maxAge - meta.Age
|
||||
}
|
||||
|
||||
// Add a small amount of random jitter to the polling time. One
|
||||
|
@ -222,13 +217,13 @@ func (c *Cache) notifyPollingQuery(ctx context.Context, tEntry typeEntry, r Requ
|
|||
// and then immediately have to re-fetch again. That wouldn't
|
||||
// be terrible but it would expend a bunch more cpu cycles when
|
||||
// we can definitely avoid it.
|
||||
pollWait += lib.RandomStagger(maxAge / 16)
|
||||
wait += lib.RandomStagger(maxAge / 16)
|
||||
}
|
||||
|
||||
select {
|
||||
case <-time.After(pollWait):
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
select {
|
||||
case <-time.After(wait):
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -31,14 +31,14 @@ func (s *HTTPServer) Snapshot(resp http.ResponseWriter, req *http.Request) (inte
|
|||
// Don't bother sending any request body through since it will
|
||||
// be ignored.
|
||||
var null bytes.Buffer
|
||||
if err := s.agent.SnapshotRPC(&args, &null, resp, replyFn); err != nil {
|
||||
if err := s.agent.delegate.SnapshotRPC(&args, &null, resp, replyFn); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return nil, nil
|
||||
|
||||
case "PUT":
|
||||
args.Op = structs.SnapshotRestore
|
||||
if err := s.agent.SnapshotRPC(&args, req.Body, resp, nil); err != nil {
|
||||
if err := s.agent.delegate.SnapshotRPC(&args, req.Body, resp, nil); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return nil, nil
|
||||
|
|
Loading…
Reference in a new issue