cache: Move more of the expiryLoop into the Heap

This commit is contained in:
Daniel Nephin 2020-10-20 13:59:53 -04:00
parent b6f24c6554
commit a96646c562
3 changed files with 87 additions and 40 deletions

41
agent/cache/cache.go vendored
View file

@ -15,7 +15,6 @@
package cache package cache
import ( import (
"container/heap"
"context" "context"
"fmt" "fmt"
"io" "io"
@ -166,16 +165,11 @@ func applyDefaultValuesOnOptions(options Options) Options {
// Further settings can be tweaked on the returned value. // Further settings can be tweaked on the returned value.
func New(options Options) *Cache { func New(options Options) *Cache {
options = applyDefaultValuesOnOptions(options) options = applyDefaultValuesOnOptions(options)
// Initialize the heap. The buffer of 1 is really important because
// its possible for the expiry loop to trigger the heap to update
// itself and it'd block forever otherwise.
h := &expiryHeap{NotifyCh: make(chan struct{}, 1)}
heap.Init(h)
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
c := &Cache{ c := &Cache{
types: make(map[string]typeEntry), types: make(map[string]typeEntry),
entries: make(map[string]cacheEntry), entries: make(map[string]cacheEntry),
entriesExpiryHeap: h, entriesExpiryHeap: newExpiryHeap(),
stopCh: make(chan struct{}), stopCh: make(chan struct{}),
options: options, options: options,
rateLimitContext: ctx, rateLimitContext: ctx,
@ -745,47 +739,30 @@ func backOffWait(failures uint) time.Duration {
// runExpiryLoop is a blocking function that watches the expiration // runExpiryLoop is a blocking function that watches the expiration
// heap and invalidates entries that have expired. // heap and invalidates entries that have expired.
func (c *Cache) runExpiryLoop() { func (c *Cache) runExpiryLoop() {
var expiryTimer *time.Timer
for { for {
// If we have a previous timer, stop it.
if expiryTimer != nil {
expiryTimer.Stop()
}
// Get the entry expiring soonest
var entry *cacheEntryExpiry
var expiryCh <-chan time.Time
c.entriesLock.RLock() c.entriesLock.RLock()
if len(c.entriesExpiryHeap.Entries) > 0 { timer := c.entriesExpiryHeap.Next()
entry = c.entriesExpiryHeap.Entries[0]
expiryTimer = time.NewTimer(time.Until(entry.Expires))
expiryCh = expiryTimer.C
}
c.entriesLock.RUnlock() c.entriesLock.RUnlock()
select { select {
case <-c.stopCh: case <-c.stopCh:
timer.Stop()
return return
case <-c.entriesExpiryHeap.NotifyCh: case <-c.entriesExpiryHeap.NotifyCh:
// Entries changed, so the heap may have changed. Restart loop. timer.Stop()
continue
case <-expiryCh: case <-timer.Wait():
c.entriesLock.Lock() c.entriesLock.Lock()
// Perform cleanup operations on the entry's state, if applicable. entry := timer.Entry
state := c.entries[entry.Key].State if closer, ok := c.entries[entry.Key].State.(io.Closer); ok {
if closer, ok := state.(io.Closer); ok {
closer.Close() closer.Close()
} }
// Entry expired! Remove it. // Entry expired! Remove it.
delete(c.entries, entry.Key) delete(c.entries, entry.Key)
heap.Remove(c.entriesExpiryHeap, entry.HeapIndex) c.entriesExpiryHeap.Remove(entry.HeapIndex)
// This is subtle but important: if we race and simultaneously
// evict and fetch a new value, then we set this to -1 to
// have it treated as a new value so that the TTL is extended.
entry.HeapIndex = -1
// Set some metrics // Set some metrics
metrics.IncrCounter([]string{"consul", "cache", "evict_expired"}, 1) metrics.IncrCounter([]string{"consul", "cache", "evict_expired"}, 1)

View file

@ -12,11 +12,6 @@ type cacheEntryExpiry struct {
HeapIndex int // Index in the heap HeapIndex int // Index in the heap
} }
// TODO: use or remove
func newCacheEntry(key string, expiry time.Duration) *cacheEntryExpiry {
return &cacheEntryExpiry{Key: key, Expires: time.Now().Add(expiry)}
}
// expiryHeap is a container/heap.Interface implementation that expires entries // expiryHeap is a container/heap.Interface implementation that expires entries
// in the cache when their expiration time is reached. // in the cache when their expiration time is reached.
// //
@ -37,6 +32,16 @@ type expiryHeap struct {
NotifyCh chan struct{} NotifyCh chan struct{}
} }
// Initialize the heap. The buffer of 1 is really important because
// its possible for the expiry loop to trigger the heap to update
// itself and it'd block forever otherwise.
func newExpiryHeap() *expiryHeap {
h := &expiryHeap{NotifyCh: make(chan struct{}, 1)}
heap.Init(h)
return h
}
// Must be synchronized by the caller.
func (h *expiryHeap) Add(key string, expiry time.Duration) *cacheEntryExpiry { func (h *expiryHeap) Add(key string, expiry time.Duration) *cacheEntryExpiry {
entry := &cacheEntryExpiry{Key: key, Expires: time.Now().Add(expiry)} entry := &cacheEntryExpiry{Key: key, Expires: time.Now().Add(expiry)}
heap.Push(h, entry) heap.Push(h, entry)
@ -59,6 +64,18 @@ func (h *expiryHeap) Update(idx int, expiry time.Duration) {
} }
} }
// Must be synchronized by the caller.
func (h *expiryHeap) Remove(idx int) {
entry := h.Entries[idx]
heap.Remove(h, idx)
// A goroutine which is fetching a new value will have a reference to this
// entry. When it re-acquires the lock it needs to be informed that
// the entry was expired while it was fetching. Setting HeapIndex to -1
// indicates that the entry is no longer in the heap, and must be re-added.
entry.HeapIndex = -1
}
func (h *expiryHeap) Len() int { return len(h.Entries) } func (h *expiryHeap) Len() int { return len(h.Entries) }
func (h *expiryHeap) Swap(i, j int) { func (h *expiryHeap) Swap(i, j int) {
@ -109,6 +126,7 @@ func (h *expiryHeap) Pop() interface{} {
return last return last
} }
// TODO: look at calls to notify.
func (h *expiryHeap) notify() { func (h *expiryHeap) notify() {
select { select {
case h.NotifyCh <- struct{}{}: case h.NotifyCh <- struct{}{}:
@ -121,3 +139,33 @@ func (h *expiryHeap) notify() {
// so the receiver will restart regardless. // so the receiver will restart regardless.
} }
} }
// Must be synchronized by the caller.
func (h *expiryHeap) Next() timer {
if len(h.Entries) == 0 {
return timer{}
}
entry := h.Entries[0]
return timer{
timer: time.NewTimer(time.Until(entry.Expires)),
Entry: entry,
}
}
type timer struct {
timer *time.Timer
Entry *cacheEntryExpiry
}
func (t *timer) Wait() <-chan time.Time {
if t.timer == nil {
return nil
}
return t.timer.C
}
func (t *timer) Stop() {
if t.timer != nil {
t.timer.Stop()
}
}

View file

@ -43,12 +43,10 @@ func TestExpiryHeap(t *testing.T) {
}) })
runStep(t, "remove the first entry", func(t *testing.T) { runStep(t, "remove the first entry", func(t *testing.T) {
remove := h.Entries[0] h.Remove(0)
heap.Remove(h, remove.HeapIndex)
require.Equal(0, entry.HeapIndex) require.Equal(0, entry.HeapIndex)
require.Equal(1, entry3.HeapIndex) require.Equal(1, entry3.HeapIndex)
testMessage(t, ch) testMessage(t, ch)
testMessage(t, ch) // we have two because two swaps happen
testNoMessage(t, ch) testNoMessage(t, ch)
}) })
@ -94,3 +92,27 @@ func runStep(t *testing.T, name string, fn func(t *testing.T)) {
t.FailNow() t.FailNow()
} }
} }
func TestExpiryLoop_ExitsWhenStopped(t *testing.T) {
c := &Cache{
stopCh: make(chan struct{}),
entries: make(map[string]cacheEntry),
entriesExpiryHeap: newExpiryHeap(),
}
chStart := make(chan struct{})
chDone := make(chan struct{})
go func() {
close(chStart)
c.runExpiryLoop()
close(chDone)
}()
<-chStart
close(c.stopCh)
select {
case <-chDone:
case <-time.After(50 * time.Millisecond):
t.Fatalf("expected loop to exit when stopped")
}
}