diff --git a/agent/agent.go b/agent/agent.go index 256785774..581199f22 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -1715,15 +1715,6 @@ func (a *Agent) RPC(method string, args interface{}, reply interface{}) error { return a.delegate.RPC(method, args, reply) } -// SnapshotRPC performs the requested snapshot RPC against the Consul server in -// a streaming manner. The contents of in will be read and passed along as the -// payload, and the response message will determine the error status, and any -// return payload will be written to out. -func (a *Agent) SnapshotRPC(args *structs.SnapshotRequest, in io.Reader, out io.Writer, - replyFn structs.SnapshotReplyFn) error { - return a.delegate.SnapshotRPC(args, in, out, replyFn) -} - // Leave is used to prepare the agent for a graceful shutdown func (a *Agent) Leave() error { return a.delegate.Leave() diff --git a/agent/cache/cache.go b/agent/cache/cache.go index 1bf14b341..66146610f 100644 --- a/agent/cache/cache.go +++ b/agent/cache/cache.go @@ -227,42 +227,43 @@ func (c *Cache) Get(t string, r Request) (interface{}, ResultMeta, error) { // getEntryLocked retrieves a cache entry and checks if it is ready to be // returned given the other parameters. It reads from entries and the caller // has to issue a read lock if necessary. -func (c *Cache) getEntryLocked(tEntry typeEntry, key string, maxAge time.Duration, revalidate bool, minIndex uint64) (bool, bool, cacheEntry) { +func (c *Cache) getEntryLocked( + tEntry typeEntry, + key string, + maxAge time.Duration, + revalidate bool, + minIndex uint64, +) (entryExists bool, entryValid bool, entry cacheEntry) { entry, ok := c.entries[key] - cacheHit := false - - if !ok { - return ok, cacheHit, entry + if !entry.Valid { + return ok, false, entry } - // Check if we have a hit - cacheHit = ok && entry.Valid - - supportsBlocking := tEntry.Type.SupportsBlocking() - // Check index is not specified or lower than value, or the type doesn't // support blocking. - if cacheHit && supportsBlocking && - minIndex > 0 && minIndex >= entry.Index { + if tEntry.Type.SupportsBlocking() && minIndex > 0 && minIndex >= entry.Index { // MinIndex was given and matches or is higher than current value so we // ignore the cache and fallthrough to blocking on a new value below. - cacheHit = false + return true, false, entry } // Check MaxAge is not exceeded if this is not a background refreshing type // and MaxAge was specified. - if cacheHit && !tEntry.Opts.Refresh && maxAge > 0 && - !entry.FetchedAt.IsZero() && maxAge < time.Since(entry.FetchedAt) { - cacheHit = false + if !tEntry.Opts.Refresh && maxAge > 0 && entryExceedsMaxAge(maxAge, entry) { + return true, false, entry } - // Check if we are requested to revalidate. If so the first time round the + // Check if re-validate is requested. If so the first time round the // loop is not a hit but subsequent ones should be treated normally. - if cacheHit && !tEntry.Opts.Refresh && revalidate { - cacheHit = false + if !tEntry.Opts.Refresh && revalidate { + return true, false, entry } - return ok, cacheHit, entry + return true, true, entry +} + +func entryExceedsMaxAge(maxAge time.Duration, entry cacheEntry) bool { + return !entry.FetchedAt.IsZero() && maxAge < time.Since(entry.FetchedAt) } // getWithIndex implements the main Get functionality but allows internal @@ -289,10 +290,10 @@ func (c *Cache) getWithIndex(tEntry typeEntry, r Request, minIndex uint64) (inte RETRY_GET: // Get the current value c.entriesLock.RLock() - _, cacheHit, entry := c.getEntryLocked(tEntry, key, info.MaxAge, info.MustRevalidate && first, minIndex) + _, entryValid, entry := c.getEntryLocked(tEntry, key, info.MaxAge, info.MustRevalidate && first, minIndex) c.entriesLock.RUnlock() - if cacheHit { + if entryValid { meta := ResultMeta{Index: entry.Index} if first { metrics.IncrCounter([]string{"consul", "cache", tEntry.Name, "hit"}, 1) @@ -316,7 +317,7 @@ RETRY_GET: // Touch the expiration and fix the heap. c.entriesLock.Lock() - entry.Expiry.Reset() + entry.Expiry.Update(tEntry.Opts.LastGetTTL) c.entriesExpiryHeap.Fix(entry.Expiry) c.entriesLock.Unlock() @@ -401,11 +402,11 @@ func (c *Cache) fetch(tEntry typeEntry, key string, r Request, allowNew bool, at // We acquire a write lock because we may have to set Fetching to true. c.entriesLock.Lock() defer c.entriesLock.Unlock() - ok, cacheHit, entry := c.getEntryLocked(tEntry, key, info.MaxAge, info.MustRevalidate && !ignoreRevalidation, minIndex) + ok, entryValid, entry := c.getEntryLocked(tEntry, key, info.MaxAge, info.MustRevalidate && !ignoreRevalidation, minIndex) // This handles the case where a fetch succeeded after checking for its existence in // getWithIndex. This ensures that we don't miss updates. - if ok && cacheHit && !ignoreExisting { + if ok && entryValid && !ignoreExisting { ch := make(chan struct{}) close(ch) return ch, nil @@ -581,11 +582,8 @@ func (c *Cache) fetch(tEntry typeEntry, key string, r Request, allowNew bool, at // initial expiry information and insert. If we're already in // the heap we do nothing since we're reusing the same entry. if newEntry.Expiry == nil || newEntry.Expiry.HeapIndex == -1 { - newEntry.Expiry = &cacheEntryExpiry{ - Key: key, - TTL: tEntry.Opts.LastGetTTL, - } - newEntry.Expiry.Reset() + newEntry.Expiry = &cacheEntryExpiry{Key: key} + newEntry.Expiry.Update(tEntry.Opts.LastGetTTL) heap.Push(c.entriesExpiryHeap, newEntry.Expiry) } @@ -598,7 +596,25 @@ func (c *Cache) fetch(tEntry typeEntry, key string, r Request, allowNew bool, at // If refresh is enabled, run the refresh in due time. The refresh // below might block, but saves us from spawning another goroutine. if tEntry.Opts.Refresh { - c.refresh(tEntry.Opts, attempt, tEntry, key, r) + // Check if cache was stopped + if atomic.LoadUint32(&c.stopped) == 1 { + return + } + + // If we're over the attempt minimum, start an exponential backoff. + if wait := backOffWait(attempt); wait > 0 { + time.Sleep(wait) + } + + // If we have a timer, wait for it + if tEntry.Opts.RefreshTimer > 0 { + time.Sleep(tEntry.Opts.RefreshTimer) + } + + // Trigger. The "allowNew" field is false because in the time we were + // waiting to refresh we may have expired and got evicted. If that + // happened, we don't want to create a new entry. + c.fetch(tEntry, key, r, false, attempt, 0, true, true) } }() @@ -610,15 +626,8 @@ func (c *Cache) fetch(tEntry typeEntry, key string, r Request, allowNew bool, at // in multiple actual RPC calls (unlike fetch). func (c *Cache) fetchDirect(tEntry typeEntry, r Request, minIndex uint64) (interface{}, ResultMeta, error) { // Fetch it with the min index specified directly by the request. - result, err := tEntry.Type.Fetch(FetchOptions{ - MinIndex: minIndex, - }, r) - if err != nil { - return nil, ResultMeta{}, err - } - - // Return the result and ignore the rest - return result.Value, ResultMeta{}, nil + result, err := tEntry.Type.Fetch(FetchOptions{MinIndex: minIndex}, r) + return result.Value, ResultMeta{}, err } func backOffWait(failures uint) time.Duration { @@ -636,34 +645,6 @@ func backOffWait(failures uint) time.Duration { return 0 } -// refresh triggers a fetch for a specific Request according to the -// registration options. -func (c *Cache) refresh(opts *RegisterOptions, attempt uint, tEntry typeEntry, key string, r Request) { - // Sanity-check, we should not schedule anything that has refresh disabled - if !opts.Refresh { - return - } - // Check if cache was stopped - if atomic.LoadUint32(&c.stopped) == 1 { - return - } - - // If we're over the attempt minimum, start an exponential backoff. - if wait := backOffWait(attempt); wait > 0 { - time.Sleep(wait) - } - - // If we have a timer, wait for it - if opts.RefreshTimer > 0 { - time.Sleep(opts.RefreshTimer) - } - - // Trigger. The "allowNew" field is false because in the time we were - // waiting to refresh we may have expired and got evicted. If that - // happened, we don't want to create a new entry. - c.fetch(tEntry, key, r, false, attempt, 0, true, true) -} - // runExpiryLoop is a blocking function that watches the expiration // heap and invalidates entries that have expired. func (c *Cache) runExpiryLoop() { @@ -733,18 +714,15 @@ func (c *Cache) Close() error { // AutoEncrypt.TLS is turned on. The cache itself cannot fetch that the first // time because it requires a special RPCType. Subsequent runs are fine though. func (c *Cache) Prepopulate(t string, res FetchResult, dc, token, k string) error { - // Check the type that we're prepolulating - c.typesLock.RLock() - tEntry, ok := c.types[t] - c.typesLock.RUnlock() - if !ok { - return fmt.Errorf("unknown type in cache: %s", t) - } key := makeEntryKey(t, dc, token, k) newEntry := cacheEntry{ - Valid: true, Value: res.Value, State: res.State, Index: res.Index, - FetchedAt: time.Now(), Waiter: make(chan struct{}), - Expiry: &cacheEntryExpiry{Key: key, TTL: tEntry.Opts.LastGetTTL}, + Valid: true, + Value: res.Value, + State: res.State, + Index: res.Index, + FetchedAt: time.Now(), + Waiter: make(chan struct{}), + Expiry: &cacheEntryExpiry{Key: key}, } c.entriesLock.Lock() c.entries[key] = newEntry diff --git a/agent/cache/entry.go b/agent/cache/entry.go index dfeb982e0..9d7678a05 100644 --- a/agent/cache/entry.go +++ b/agent/cache/entry.go @@ -47,15 +47,14 @@ type cacheEntry struct { // entry. Any modifications to this struct should be done only while // the Cache entriesLock is held. type cacheEntryExpiry struct { - Key string // Key in the cache map - Expires time.Time // Time when entry expires (monotonic clock) - TTL time.Duration // TTL for this entry to extend when resetting - HeapIndex int // Index in the heap + Key string // Key in the cache map + Expires time.Time // Time when entry expires (monotonic clock) + HeapIndex int // Index in the heap } -// Reset resets the expiration to be the ttl duration from now. -func (e *cacheEntryExpiry) Reset() { - e.Expires = time.Now().Add(e.TTL) +// Update the expiry to d time from now. +func (e *cacheEntryExpiry) Update(d time.Duration) { + e.Expires = time.Now().Add(d) } // expiryHeap is a heap implementation that stores information about diff --git a/agent/cache/watch.go b/agent/cache/watch.go index af3b097c5..ed22a70d3 100644 --- a/agent/cache/watch.go +++ b/agent/cache/watch.go @@ -50,26 +50,30 @@ type UpdateEvent struct { // value that allows them to disambiguate between events in the returned chan // when sharing a chan between multiple cache entries. If the chan is closed, // the notify loop will terminate. -func (c *Cache) Notify(ctx context.Context, t string, r Request, - correlationID string, ch chan<- UpdateEvent) error { - - // Get the type that we're fetching +func (c *Cache) Notify( + ctx context.Context, + t string, + r Request, + correlationID string, + ch chan<- UpdateEvent, +) error { c.typesLock.RLock() tEntry, ok := c.types[t] c.typesLock.RUnlock() if !ok { return fmt.Errorf("unknown type in cache: %s", t) } + if tEntry.Type.SupportsBlocking() { go c.notifyBlockingQuery(ctx, tEntry, r, correlationID, ch) - } else { - info := r.CacheInfo() - if info.MaxAge == 0 { - return fmt.Errorf("Cannot use Notify for polling cache types without specifying the MaxAge") - } - go c.notifyPollingQuery(ctx, tEntry, r, correlationID, ch, info.MaxAge) + return nil } + info := r.CacheInfo() + if info.MaxAge == 0 { + return fmt.Errorf("Cannot use Notify for polling cache types without specifying the MaxAge") + } + go c.notifyPollingQuery(ctx, tEntry, r, correlationID, ch, info.MaxAge) return nil } @@ -107,10 +111,10 @@ func (c *Cache) notifyBlockingQuery(ctx context.Context, tEntry typeEntry, r Req index = meta.Index } + var wait time.Duration // Handle errors with backoff. Badly behaved blocking calls that returned // a zero index are considered as failures since we need to not get stuck // in a busy loop. - wait := 0 * time.Second if err == nil && meta.Index > 0 { failures = 0 } else { @@ -173,6 +177,7 @@ func (c *Cache) notifyPollingQuery(ctx context.Context, tEntry typeEntry, r Requ failures++ } + var wait time.Duration // Determining how long to wait before the next poll is complicated. // First off the happy path and the error path waits are handled distinctly // @@ -194,23 +199,13 @@ func (c *Cache) notifyPollingQuery(ctx context.Context, tEntry typeEntry, r Requ // as this would eliminate the single-flighting of these requests in the cache and // the efficiencies gained by it. if failures > 0 { - - errWait := backOffWait(failures) - select { - case <-time.After(errWait): - case <-ctx.Done(): - return - } + wait = backOffWait(failures) } else { - // Default to immediately re-poll. This only will happen if the data - // we just got out of the cache is already too stale - pollWait := 0 * time.Second - // Calculate when the cached data's Age will get too stale and // need to be re-queried. When the data's Age already exceeds the // maxAge the pollWait value is left at 0 to immediately re-poll if meta.Age <= maxAge { - pollWait = maxAge - meta.Age + wait = maxAge - meta.Age } // Add a small amount of random jitter to the polling time. One @@ -222,13 +217,13 @@ func (c *Cache) notifyPollingQuery(ctx context.Context, tEntry typeEntry, r Requ // and then immediately have to re-fetch again. That wouldn't // be terrible but it would expend a bunch more cpu cycles when // we can definitely avoid it. - pollWait += lib.RandomStagger(maxAge / 16) + wait += lib.RandomStagger(maxAge / 16) + } - select { - case <-time.After(pollWait): - case <-ctx.Done(): - return - } + select { + case <-time.After(wait): + case <-ctx.Done(): + return } } } diff --git a/agent/snapshot_endpoint.go b/agent/snapshot_endpoint.go index 7c1bed366..483eae18d 100644 --- a/agent/snapshot_endpoint.go +++ b/agent/snapshot_endpoint.go @@ -31,14 +31,14 @@ func (s *HTTPServer) Snapshot(resp http.ResponseWriter, req *http.Request) (inte // Don't bother sending any request body through since it will // be ignored. var null bytes.Buffer - if err := s.agent.SnapshotRPC(&args, &null, resp, replyFn); err != nil { + if err := s.agent.delegate.SnapshotRPC(&args, &null, resp, replyFn); err != nil { return nil, err } return nil, nil case "PUT": args.Op = structs.SnapshotRestore - if err := s.agent.SnapshotRPC(&args, req.Body, resp, nil); err != nil { + if err := s.agent.delegate.SnapshotRPC(&args, req.Body, resp, nil); err != nil { return nil, err } return nil, nil