From b6f24c6554a249a1bbc745bcdbfd409cbace0537 Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Tue, 20 Oct 2020 13:19:21 -0400 Subject: [PATCH 1/8] cache: extract cache eviction heap Start creating an interface that doesn't require using heap and hides more of the entry internals. --- agent/cache/cache.go | 7 +- agent/cache/cache_test.go | 3 + agent/cache/entry.go | 116 --------------------------------- agent/cache/entry_test.go | 91 -------------------------- agent/cache/eviction.go | 123 +++++++++++++++++++++++++++++++++++ agent/cache/eviction_test.go | 96 +++++++++++++++++++++++++++ 6 files changed, 224 insertions(+), 212 deletions(-) delete mode 100644 agent/cache/entry_test.go create mode 100644 agent/cache/eviction.go create mode 100644 agent/cache/eviction_test.go diff --git a/agent/cache/cache.go b/agent/cache/cache.go index 0852393af..10792c5d5 100644 --- a/agent/cache/cache.go +++ b/agent/cache/cache.go @@ -406,8 +406,7 @@ RETRY_GET: // Touch the expiration and fix the heap. c.entriesLock.Lock() - entry.Expiry.Update(r.TypeEntry.Opts.LastGetTTL) - c.entriesExpiryHeap.Fix(entry.Expiry) + c.entriesExpiryHeap.Update(entry.Expiry.HeapIndex, r.TypeEntry.Opts.LastGetTTL) c.entriesLock.Unlock() // We purposely do not return an error here since the cache only works with @@ -689,9 +688,7 @@ func (c *Cache) fetch(key string, r getOptions, allowNew bool, attempt uint, ign // initial expiry information and insert. If we're already in // the heap we do nothing since we're reusing the same entry. if newEntry.Expiry == nil || newEntry.Expiry.HeapIndex == -1 { - newEntry.Expiry = &cacheEntryExpiry{Key: key} - newEntry.Expiry.Update(tEntry.Opts.LastGetTTL) - heap.Push(c.entriesExpiryHeap, newEntry.Expiry) + newEntry.Expiry = c.entriesExpiryHeap.Add(key, tEntry.Opts.LastGetTTL) } c.entries[key] = newEntry diff --git a/agent/cache/cache_test.go b/agent/cache/cache_test.go index 8832572c3..780137e18 100644 --- a/agent/cache/cache_test.go +++ b/agent/cache/cache_test.go @@ -1000,6 +1000,9 @@ func (t *testPartitionType) RegisterOptions() RegisterOptions { // Test that background refreshing reports correct Age in failure and happy // states. func TestCacheGet_refreshAge(t *testing.T) { + if testing.Short() { + t.Skip("too slow for -short run") + } t.Parallel() require := require.New(t) diff --git a/agent/cache/entry.go b/agent/cache/entry.go index 440c654ba..9d7c343d7 100644 --- a/agent/cache/entry.go +++ b/agent/cache/entry.go @@ -1,7 +1,6 @@ package cache import ( - "container/heap" "time" "golang.org/x/time/rate" @@ -46,118 +45,3 @@ type cacheEntry struct { // FetchRateLimiter limits the rate at which fetch is called for this entry. FetchRateLimiter *rate.Limiter } - -// cacheEntryExpiry contains the expiration information for a cache -// entry. Any modifications to this struct should be done only while -// the Cache entriesLock is held. -type cacheEntryExpiry struct { - Key string // Key in the cache map - Expires time.Time // Time when entry expires (monotonic clock) - HeapIndex int // Index in the heap -} - -// Update the expiry to d time from now. -func (e *cacheEntryExpiry) Update(d time.Duration) { - e.Expires = time.Now().Add(d) -} - -// expiryHeap is a heap implementation that stores information about -// when entries expire. Implements container/heap.Interface. -// -// All operations on the heap and read/write of the heap contents require -// the proper entriesLock to be held on Cache. -type expiryHeap struct { - Entries []*cacheEntryExpiry - - // NotifyCh is sent a value whenever the 0 index value of the heap - // changes. This can be used to detect when the earliest value - // changes. - // - // There is a single edge case where the heap will not automatically - // send a notification: if heap.Fix is called manually and the index - // changed is 0 and the change doesn't result in any moves (stays at index - // 0), then we won't detect the change. To work around this, please - // always call the expiryHeap.Fix method instead. - NotifyCh chan struct{} -} - -// Identical to heap.Fix for this heap instance but will properly handle -// the edge case where idx == 0 and no heap modification is necessary, -// and still notify the NotifyCh. -// -// This is important for cache expiry since the expiry time may have been -// extended and if we don't send a message to the NotifyCh then we'll never -// reset the timer and the entry will be evicted early. -func (h *expiryHeap) Fix(entry *cacheEntryExpiry) { - idx := entry.HeapIndex - heap.Fix(h, idx) - - // This is the edge case we handle: if the prev (idx) and current (HeapIndex) - // is zero, it means the head-of-line didn't change while the value - // changed. Notify to reset our expiry worker. - if idx == 0 && entry.HeapIndex == 0 { - h.notify() - } -} - -func (h *expiryHeap) Len() int { return len(h.Entries) } - -func (h *expiryHeap) Swap(i, j int) { - h.Entries[i], h.Entries[j] = h.Entries[j], h.Entries[i] - h.Entries[i].HeapIndex = i - h.Entries[j].HeapIndex = j - - // If we're moving the 0 index, update the channel since we need - // to re-update the timer we're waiting on for the soonest expiring - // value. - if i == 0 || j == 0 { - h.notify() - } -} - -func (h *expiryHeap) Less(i, j int) bool { - // The usage of Before here is important (despite being obvious): - // this function uses the monotonic time that should be available - // on the time.Time value so the heap is immune to wall clock changes. - return h.Entries[i].Expires.Before(h.Entries[j].Expires) -} - -// heap.Interface, this isn't expected to be called directly. -func (h *expiryHeap) Push(x interface{}) { - entry := x.(*cacheEntryExpiry) - - // Set initial heap index, if we're going to the end then Swap - // won't be called so we need to initialize - entry.HeapIndex = len(h.Entries) - - // For the first entry, we need to trigger a channel send because - // Swap won't be called; nothing to swap! We can call it right away - // because all heap operations are within a lock. - if len(h.Entries) == 0 { - h.notify() - } - - h.Entries = append(h.Entries, entry) -} - -// heap.Interface, this isn't expected to be called directly. -func (h *expiryHeap) Pop() interface{} { - old := h.Entries - n := len(old) - x := old[n-1] - h.Entries = old[0 : n-1] - return x -} - -func (h *expiryHeap) notify() { - select { - case h.NotifyCh <- struct{}{}: - // Good - - default: - // If the send would've blocked, we just ignore it. The reason this - // is safe is because NotifyCh should always be a buffered channel. - // If this blocks, it means that there is a pending message anyways - // so the receiver will restart regardless. - } -} diff --git a/agent/cache/entry_test.go b/agent/cache/entry_test.go deleted file mode 100644 index fe4073363..000000000 --- a/agent/cache/entry_test.go +++ /dev/null @@ -1,91 +0,0 @@ -package cache - -import ( - "container/heap" - "testing" - "time" - - "github.com/stretchr/testify/require" -) - -func TestExpiryHeap_impl(t *testing.T) { - var _ heap.Interface = new(expiryHeap) -} - -func TestExpiryHeap(t *testing.T) { - require := require.New(t) - now := time.Now() - ch := make(chan struct{}, 10) // buffered to prevent blocking in tests - h := &expiryHeap{NotifyCh: ch} - - // Init, shouldn't trigger anything - heap.Init(h) - testNoMessage(t, ch) - - // Push an initial value, expect one message - entry := &cacheEntryExpiry{Key: "foo", HeapIndex: -1, Expires: now.Add(100)} - heap.Push(h, entry) - require.Equal(0, entry.HeapIndex) - testMessage(t, ch) - testNoMessage(t, ch) // exactly one asserted above - - // Push another that goes earlier than entry - entry2 := &cacheEntryExpiry{Key: "bar", HeapIndex: -1, Expires: now.Add(50)} - heap.Push(h, entry2) - require.Equal(0, entry2.HeapIndex) - require.Equal(1, entry.HeapIndex) - testMessage(t, ch) - testNoMessage(t, ch) // exactly one asserted above - - // Push another that goes at the end - entry3 := &cacheEntryExpiry{Key: "bar", HeapIndex: -1, Expires: now.Add(1000)} - heap.Push(h, entry3) - require.Equal(2, entry3.HeapIndex) - testNoMessage(t, ch) // no notify cause index 0 stayed the same - - // Remove the first entry (not Pop, since we don't use Pop, but that works too) - remove := h.Entries[0] - heap.Remove(h, remove.HeapIndex) - require.Equal(0, entry.HeapIndex) - require.Equal(1, entry3.HeapIndex) - testMessage(t, ch) - testMessage(t, ch) // we have two because two swaps happen - testNoMessage(t, ch) - - // Let's change entry 3 to be early, and fix it - entry3.Expires = now.Add(10) - h.Fix(entry3) - require.Equal(1, entry.HeapIndex) - require.Equal(0, entry3.HeapIndex) - testMessage(t, ch) - testNoMessage(t, ch) - - // Let's change entry 3 again, this is an edge case where if the 0th - // element changed, we didn't trigger the channel. Our Fix func should. - entry.Expires = now.Add(20) - h.Fix(entry3) - require.Equal(1, entry.HeapIndex) // no move - require.Equal(0, entry3.HeapIndex) - testMessage(t, ch) - testNoMessage(t, ch) // one message -} - -func testNoMessage(t *testing.T, ch <-chan struct{}) { - t.Helper() - - select { - case <-ch: - t.Fatal("should not have a message") - default: - } -} - -func testMessage(t *testing.T, ch <-chan struct{}) { - t.Helper() - - select { - case <-ch: - default: - t.Fatal("should have a message") - } -} diff --git a/agent/cache/eviction.go b/agent/cache/eviction.go new file mode 100644 index 000000000..953929b7e --- /dev/null +++ b/agent/cache/eviction.go @@ -0,0 +1,123 @@ +package cache + +import ( + "container/heap" + "time" +) + +// cacheEntryExpiry contains the expiration time for a cache entry. +type cacheEntryExpiry struct { + Key string // Key in the cache map + Expires time.Time // Time when entry expires (monotonic clock) + HeapIndex int // Index in the heap +} + +// TODO: use or remove +func newCacheEntry(key string, expiry time.Duration) *cacheEntryExpiry { + return &cacheEntryExpiry{Key: key, Expires: time.Now().Add(expiry)} +} + +// expiryHeap is a container/heap.Interface implementation that expires entries +// in the cache when their expiration time is reached. +// +// All operations on the heap and read/write of the heap contents require +// the proper entriesLock to be held on Cache. +type expiryHeap struct { + Entries []*cacheEntryExpiry + + // NotifyCh is sent a value whenever the 0 index value of the heap + // changes. This can be used to detect when the earliest value + // changes. + // + // There is a single edge case where the heap will not automatically + // send a notification: if heap.Fix is called manually and the index + // changed is 0 and the change doesn't result in any moves (stays at index + // 0), then we won't detect the change. To work around this, please + // always call the expiryHeap.Fix method instead. + NotifyCh chan struct{} +} + +func (h *expiryHeap) Add(key string, expiry time.Duration) *cacheEntryExpiry { + entry := &cacheEntryExpiry{Key: key, Expires: time.Now().Add(expiry)} + heap.Push(h, entry) + return entry +} + +// Update the entry that is currently at idx with the new expiry time. The heap +// will be rebalanced after the entry is updated. +// +// Must be synchronized by the caller. +func (h *expiryHeap) Update(idx int, expiry time.Duration) { + entry := h.Entries[idx] + entry.Expires = time.Now().Add(expiry) + heap.Fix(h, idx) + + // If the previous index and current index are both zero then Fix did not + // swap the entry, and notify must be called here. + if idx == 0 && entry.HeapIndex == 0 { + h.notify() + } +} + +func (h *expiryHeap) Len() int { return len(h.Entries) } + +func (h *expiryHeap) Swap(i, j int) { + h.Entries[i], h.Entries[j] = h.Entries[j], h.Entries[i] + h.Entries[i].HeapIndex = i + h.Entries[j].HeapIndex = j + + // If we're moving the 0 index, update the channel since we need + // to re-update the timer we're waiting on for the soonest expiring + // value. + if i == 0 || j == 0 { + h.notify() + } +} + +func (h *expiryHeap) Less(i, j int) bool { + // The usage of Before here is important (despite being obvious): + // this function uses the monotonic time that should be available + // on the time.Time value so the heap is immune to wall clock changes. + return h.Entries[i].Expires.Before(h.Entries[j].Expires) +} + +// heap.Interface, this isn't expected to be called directly. +func (h *expiryHeap) Push(x interface{}) { + entry := x.(*cacheEntryExpiry) + + // Set the initial heap index to the last index. If the entry is swapped it + // will have the correct set, and if it remains at the end the last index will + // be correct. + entry.HeapIndex = len(h.Entries) + + // For the first entry, we need to trigger a channel send because + // Swap won't be called; nothing to swap! We can call it right away + // because all heap operations are within a lock. + if len(h.Entries) == 0 { + h.notify() + } + + h.Entries = append(h.Entries, entry) +} + +// heap.Interface, this isn't expected to be called directly. +func (h *expiryHeap) Pop() interface{} { + n := len(h.Entries) + entries := h.Entries + last := entries[n-1] + h.Entries = entries[0 : n-1] + return last +} + +func (h *expiryHeap) notify() { + select { + case h.NotifyCh <- struct{}{}: + // Good + + default: + // If the send would've blocked, we just ignore it. The reason this + // is safe is because NotifyCh should always be a buffered channel. + // If this blocks, it means that there is a pending message anyways + // so the receiver will restart regardless. + } +} diff --git a/agent/cache/eviction_test.go b/agent/cache/eviction_test.go new file mode 100644 index 000000000..edb4564cf --- /dev/null +++ b/agent/cache/eviction_test.go @@ -0,0 +1,96 @@ +package cache + +import ( + "container/heap" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +var _ heap.Interface = new(expiryHeap) + +func TestExpiryHeap(t *testing.T) { + require := require.New(t) + ch := make(chan struct{}, 10) // buffered to prevent blocking in tests + h := &expiryHeap{NotifyCh: ch} + var entry, entry2, entry3 *cacheEntryExpiry + + // Init, shouldn't trigger anything + heap.Init(h) + testNoMessage(t, ch) + + runStep(t, "add an entry", func(t *testing.T) { + entry = h.Add("foo", 100*time.Millisecond) + require.Equal(0, entry.HeapIndex) + testMessage(t, ch) + testNoMessage(t, ch) // exactly one asserted above + }) + + runStep(t, "add a second entry in front", func(t *testing.T) { + entry2 = h.Add("bar", 50*time.Millisecond) + require.Equal(0, entry2.HeapIndex) + require.Equal(1, entry.HeapIndex) + testMessage(t, ch) + testNoMessage(t, ch) // exactly one asserted above + }) + + runStep(t, "add a third entry at the end", func(t *testing.T) { + entry3 = h.Add("baz", 1000*time.Millisecond) + require.Equal(2, entry3.HeapIndex) + testNoMessage(t, ch) // no notify cause index 0 stayed the same + }) + + runStep(t, "remove the first entry", func(t *testing.T) { + remove := h.Entries[0] + heap.Remove(h, remove.HeapIndex) + require.Equal(0, entry.HeapIndex) + require.Equal(1, entry3.HeapIndex) + testMessage(t, ch) + testMessage(t, ch) // we have two because two swaps happen + testNoMessage(t, ch) + }) + + runStep(t, "update entry3 to expire first", func(t *testing.T) { + h.Update(entry3.HeapIndex, 10*time.Millisecond) + assert.Equal(t, 1, entry.HeapIndex) + assert.Equal(t, 0, entry3.HeapIndex) + testMessage(t, ch) + testNoMessage(t, ch) + }) + + runStep(t, "0th element change triggers a notify", func(t *testing.T) { + h.Update(entry3.HeapIndex, 20) + require.Equal(1, entry.HeapIndex) // no move + require.Equal(0, entry3.HeapIndex) + testMessage(t, ch) + testNoMessage(t, ch) // one message + }) +} + +func testNoMessage(t *testing.T, ch <-chan struct{}) { + t.Helper() + + select { + case <-ch: + t.Fatal("should not have a message") + default: + } +} + +func testMessage(t *testing.T, ch <-chan struct{}) { + t.Helper() + + select { + case <-ch: + default: + t.Fatal("should have a message") + } +} + +func runStep(t *testing.T, name string, fn func(t *testing.T)) { + if !t.Run(name, fn) { + t.FailNow() + } +} From a96646c5627e0ad0c19f0254c6365388d95505e1 Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Tue, 20 Oct 2020 13:59:53 -0400 Subject: [PATCH 2/8] cache: Move more of the expiryLoop into the Heap --- agent/cache/cache.go | 41 ++++++------------------- agent/cache/eviction.go | 58 ++++++++++++++++++++++++++++++++---- agent/cache/eviction_test.go | 28 +++++++++++++++-- 3 files changed, 87 insertions(+), 40 deletions(-) diff --git a/agent/cache/cache.go b/agent/cache/cache.go index 10792c5d5..361f29f4d 100644 --- a/agent/cache/cache.go +++ b/agent/cache/cache.go @@ -15,7 +15,6 @@ package cache import ( - "container/heap" "context" "fmt" "io" @@ -166,16 +165,11 @@ func applyDefaultValuesOnOptions(options Options) Options { // Further settings can be tweaked on the returned value. func New(options Options) *Cache { options = applyDefaultValuesOnOptions(options) - // Initialize the heap. The buffer of 1 is really important because - // its possible for the expiry loop to trigger the heap to update - // itself and it'd block forever otherwise. - h := &expiryHeap{NotifyCh: make(chan struct{}, 1)} - heap.Init(h) ctx, cancel := context.WithCancel(context.Background()) c := &Cache{ types: make(map[string]typeEntry), entries: make(map[string]cacheEntry), - entriesExpiryHeap: h, + entriesExpiryHeap: newExpiryHeap(), stopCh: make(chan struct{}), options: options, rateLimitContext: ctx, @@ -745,47 +739,30 @@ func backOffWait(failures uint) time.Duration { // runExpiryLoop is a blocking function that watches the expiration // heap and invalidates entries that have expired. func (c *Cache) runExpiryLoop() { - var expiryTimer *time.Timer for { - // If we have a previous timer, stop it. - if expiryTimer != nil { - expiryTimer.Stop() - } - - // Get the entry expiring soonest - var entry *cacheEntryExpiry - var expiryCh <-chan time.Time c.entriesLock.RLock() - if len(c.entriesExpiryHeap.Entries) > 0 { - entry = c.entriesExpiryHeap.Entries[0] - expiryTimer = time.NewTimer(time.Until(entry.Expires)) - expiryCh = expiryTimer.C - } + timer := c.entriesExpiryHeap.Next() c.entriesLock.RUnlock() select { case <-c.stopCh: + timer.Stop() return case <-c.entriesExpiryHeap.NotifyCh: - // Entries changed, so the heap may have changed. Restart loop. + timer.Stop() + continue - case <-expiryCh: + case <-timer.Wait(): c.entriesLock.Lock() - // Perform cleanup operations on the entry's state, if applicable. - state := c.entries[entry.Key].State - if closer, ok := state.(io.Closer); ok { + entry := timer.Entry + if closer, ok := c.entries[entry.Key].State.(io.Closer); ok { closer.Close() } // Entry expired! Remove it. delete(c.entries, entry.Key) - heap.Remove(c.entriesExpiryHeap, entry.HeapIndex) - - // This is subtle but important: if we race and simultaneously - // evict and fetch a new value, then we set this to -1 to - // have it treated as a new value so that the TTL is extended. - entry.HeapIndex = -1 + c.entriesExpiryHeap.Remove(entry.HeapIndex) // Set some metrics metrics.IncrCounter([]string{"consul", "cache", "evict_expired"}, 1) diff --git a/agent/cache/eviction.go b/agent/cache/eviction.go index 953929b7e..b0de07f2c 100644 --- a/agent/cache/eviction.go +++ b/agent/cache/eviction.go @@ -12,11 +12,6 @@ type cacheEntryExpiry struct { HeapIndex int // Index in the heap } -// TODO: use or remove -func newCacheEntry(key string, expiry time.Duration) *cacheEntryExpiry { - return &cacheEntryExpiry{Key: key, Expires: time.Now().Add(expiry)} -} - // expiryHeap is a container/heap.Interface implementation that expires entries // in the cache when their expiration time is reached. // @@ -37,6 +32,16 @@ type expiryHeap struct { NotifyCh chan struct{} } +// Initialize the heap. The buffer of 1 is really important because +// its possible for the expiry loop to trigger the heap to update +// itself and it'd block forever otherwise. +func newExpiryHeap() *expiryHeap { + h := &expiryHeap{NotifyCh: make(chan struct{}, 1)} + heap.Init(h) + return h +} + +// Must be synchronized by the caller. func (h *expiryHeap) Add(key string, expiry time.Duration) *cacheEntryExpiry { entry := &cacheEntryExpiry{Key: key, Expires: time.Now().Add(expiry)} heap.Push(h, entry) @@ -59,6 +64,18 @@ func (h *expiryHeap) Update(idx int, expiry time.Duration) { } } +// Must be synchronized by the caller. +func (h *expiryHeap) Remove(idx int) { + entry := h.Entries[idx] + heap.Remove(h, idx) + + // A goroutine which is fetching a new value will have a reference to this + // entry. When it re-acquires the lock it needs to be informed that + // the entry was expired while it was fetching. Setting HeapIndex to -1 + // indicates that the entry is no longer in the heap, and must be re-added. + entry.HeapIndex = -1 +} + func (h *expiryHeap) Len() int { return len(h.Entries) } func (h *expiryHeap) Swap(i, j int) { @@ -109,6 +126,7 @@ func (h *expiryHeap) Pop() interface{} { return last } +// TODO: look at calls to notify. func (h *expiryHeap) notify() { select { case h.NotifyCh <- struct{}{}: @@ -121,3 +139,33 @@ func (h *expiryHeap) notify() { // so the receiver will restart regardless. } } + +// Must be synchronized by the caller. +func (h *expiryHeap) Next() timer { + if len(h.Entries) == 0 { + return timer{} + } + entry := h.Entries[0] + return timer{ + timer: time.NewTimer(time.Until(entry.Expires)), + Entry: entry, + } +} + +type timer struct { + timer *time.Timer + Entry *cacheEntryExpiry +} + +func (t *timer) Wait() <-chan time.Time { + if t.timer == nil { + return nil + } + return t.timer.C +} + +func (t *timer) Stop() { + if t.timer != nil { + t.timer.Stop() + } +} diff --git a/agent/cache/eviction_test.go b/agent/cache/eviction_test.go index edb4564cf..0b200c2dd 100644 --- a/agent/cache/eviction_test.go +++ b/agent/cache/eviction_test.go @@ -43,12 +43,10 @@ func TestExpiryHeap(t *testing.T) { }) runStep(t, "remove the first entry", func(t *testing.T) { - remove := h.Entries[0] - heap.Remove(h, remove.HeapIndex) + h.Remove(0) require.Equal(0, entry.HeapIndex) require.Equal(1, entry3.HeapIndex) testMessage(t, ch) - testMessage(t, ch) // we have two because two swaps happen testNoMessage(t, ch) }) @@ -94,3 +92,27 @@ func runStep(t *testing.T, name string, fn func(t *testing.T)) { t.FailNow() } } + +func TestExpiryLoop_ExitsWhenStopped(t *testing.T) { + c := &Cache{ + stopCh: make(chan struct{}), + entries: make(map[string]cacheEntry), + entriesExpiryHeap: newExpiryHeap(), + } + chStart := make(chan struct{}) + chDone := make(chan struct{}) + go func() { + close(chStart) + c.runExpiryLoop() + close(chDone) + }() + + <-chStart + close(c.stopCh) + + select { + case <-chDone: + case <-time.After(50 * time.Millisecond): + t.Fatalf("expected loop to exit when stopped") + } +} From d3742a1d0e7fa10a6a89445b2cb27b621d18fe6d Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Tue, 20 Oct 2020 14:57:22 -0400 Subject: [PATCH 3/8] cache: Refactor heap.notify to make it more explicit. And remove duplicate notifications. Instead of performing the check in the heap implementation, check the index in the higher level interface (Add,Remove,Update) and notify if one of the relevant indexes is 0. --- agent/cache/eviction.go | 60 ++++++++++++++++------------------------- 1 file changed, 23 insertions(+), 37 deletions(-) diff --git a/agent/cache/eviction.go b/agent/cache/eviction.go index b0de07f2c..efd102502 100644 --- a/agent/cache/eviction.go +++ b/agent/cache/eviction.go @@ -23,12 +23,6 @@ type expiryHeap struct { // NotifyCh is sent a value whenever the 0 index value of the heap // changes. This can be used to detect when the earliest value // changes. - // - // There is a single edge case where the heap will not automatically - // send a notification: if heap.Fix is called manually and the index - // changed is 0 and the change doesn't result in any moves (stays at index - // 0), then we won't detect the change. To work around this, please - // always call the expiryHeap.Fix method instead. NotifyCh chan struct{} } @@ -41,10 +35,22 @@ func newExpiryHeap() *expiryHeap { return h } +// Add an entry to the heap. +// // Must be synchronized by the caller. func (h *expiryHeap) Add(key string, expiry time.Duration) *cacheEntryExpiry { - entry := &cacheEntryExpiry{Key: key, Expires: time.Now().Add(expiry)} + entry := &cacheEntryExpiry{ + Key: key, + Expires: time.Now().Add(expiry), + // Set the initial heap index to the last index. If the entry is swapped it + // will have the correct index set, and if it remains at the end the last + // index will be correct. + HeapIndex: len(h.Entries), + } heap.Push(h, entry) + if entry.HeapIndex == 0 { + h.notify() + } return entry } @@ -59,7 +65,7 @@ func (h *expiryHeap) Update(idx int, expiry time.Duration) { // If the previous index and current index are both zero then Fix did not // swap the entry, and notify must be called here. - if idx == 0 && entry.HeapIndex == 0 { + if idx == 0 || entry.HeapIndex == 0 { h.notify() } } @@ -74,6 +80,10 @@ func (h *expiryHeap) Remove(idx int) { // the entry was expired while it was fetching. Setting HeapIndex to -1 // indicates that the entry is no longer in the heap, and must be re-added. entry.HeapIndex = -1 + + if idx == 0 { + h.notify() + } } func (h *expiryHeap) Len() int { return len(h.Entries) } @@ -82,13 +92,6 @@ func (h *expiryHeap) Swap(i, j int) { h.Entries[i], h.Entries[j] = h.Entries[j], h.Entries[i] h.Entries[i].HeapIndex = i h.Entries[j].HeapIndex = j - - // If we're moving the 0 index, update the channel since we need - // to re-update the timer we're waiting on for the soonest expiring - // value. - if i == 0 || j == 0 { - h.notify() - } } func (h *expiryHeap) Less(i, j int) bool { @@ -100,21 +103,7 @@ func (h *expiryHeap) Less(i, j int) bool { // heap.Interface, this isn't expected to be called directly. func (h *expiryHeap) Push(x interface{}) { - entry := x.(*cacheEntryExpiry) - - // Set the initial heap index to the last index. If the entry is swapped it - // will have the correct set, and if it remains at the end the last index will - // be correct. - entry.HeapIndex = len(h.Entries) - - // For the first entry, we need to trigger a channel send because - // Swap won't be called; nothing to swap! We can call it right away - // because all heap operations are within a lock. - if len(h.Entries) == 0 { - h.notify() - } - - h.Entries = append(h.Entries, entry) + h.Entries = append(h.Entries, x.(*cacheEntryExpiry)) } // heap.Interface, this isn't expected to be called directly. @@ -126,17 +115,14 @@ func (h *expiryHeap) Pop() interface{} { return last } -// TODO: look at calls to notify. +// notify the timer that the head value has changed, so the expiry time has +// also likely changed. func (h *expiryHeap) notify() { + // Send to channel without blocking. Skips sending if there is already + // an item in the buffered channel. select { case h.NotifyCh <- struct{}{}: - // Good - default: - // If the send would've blocked, we just ignore it. The reason this - // is safe is because NotifyCh should always be a buffered channel. - // If this blocks, it means that there is a pending message anyways - // so the receiver will restart regardless. } } From 909b8e674eaf625a95fae81b79922551949d30ee Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Tue, 20 Oct 2020 17:38:12 -0400 Subject: [PATCH 4/8] cache: export ExpiryHeap and hide internal methods on an unexported type, so that when it is extrated those methods are not exported. --- agent/cache/cache.go | 6 +-- agent/cache/entry.go | 4 +- agent/cache/eviction.go | 84 ++++++++++++++++++------------------ agent/cache/eviction_test.go | 29 ++++++------- 4 files changed, 61 insertions(+), 62 deletions(-) diff --git a/agent/cache/cache.go b/agent/cache/cache.go index 361f29f4d..03b9df8b9 100644 --- a/agent/cache/cache.go +++ b/agent/cache/cache.go @@ -87,7 +87,7 @@ type Cache struct { // internal storage format so changing this should be possible safely. entriesLock sync.RWMutex entries map[string]cacheEntry - entriesExpiryHeap *expiryHeap + entriesExpiryHeap *ExpiryHeap // stopped is used as an atomic flag to signal that the Cache has been // discarded so background fetches and expiry processing should stop. @@ -169,7 +169,7 @@ func New(options Options) *Cache { c := &Cache{ types: make(map[string]typeEntry), entries: make(map[string]cacheEntry), - entriesExpiryHeap: newExpiryHeap(), + entriesExpiryHeap: NewExpiryHeap(), stopCh: make(chan struct{}), options: options, rateLimitContext: ctx, @@ -803,7 +803,7 @@ func (c *Cache) Prepopulate(t string, res FetchResult, dc, token, k string) erro Index: res.Index, FetchedAt: time.Now(), Waiter: make(chan struct{}), - Expiry: &cacheEntryExpiry{Key: key}, + Expiry: &CacheEntryExpiry{Key: key}, FetchRateLimiter: rate.NewLimiter( c.options.EntryFetchRate, c.options.EntryFetchMaxBurst, diff --git a/agent/cache/entry.go b/agent/cache/entry.go index 9d7c343d7..fca1f20d1 100644 --- a/agent/cache/entry.go +++ b/agent/cache/entry.go @@ -30,8 +30,8 @@ type cacheEntry struct { // Expiry contains information about the expiration of this // entry. This is a pointer as its shared as a value in the - // expiryHeap as well. - Expiry *cacheEntryExpiry + // ExpiryHeap as well. + Expiry *CacheEntryExpiry // FetchedAt stores the time the cache entry was retrieved for determining // it's age later. diff --git a/agent/cache/eviction.go b/agent/cache/eviction.go index efd102502..579f13fef 100644 --- a/agent/cache/eviction.go +++ b/agent/cache/eviction.go @@ -5,20 +5,20 @@ import ( "time" ) -// cacheEntryExpiry contains the expiration time for a cache entry. -type cacheEntryExpiry struct { +// CacheEntryExpiry contains the expiration time for a cache entry. +type CacheEntryExpiry struct { Key string // Key in the cache map Expires time.Time // Time when entry expires (monotonic clock) HeapIndex int // Index in the heap } -// expiryHeap is a container/heap.Interface implementation that expires entries +// ExpiryHeap is a container/heap.Interface implementation that expires entries // in the cache when their expiration time is reached. // // All operations on the heap and read/write of the heap contents require // the proper entriesLock to be held on Cache. -type expiryHeap struct { - Entries []*cacheEntryExpiry +type ExpiryHeap struct { + entries []*CacheEntryExpiry // NotifyCh is sent a value whenever the 0 index value of the heap // changes. This can be used to detect when the earliest value @@ -29,25 +29,25 @@ type expiryHeap struct { // Initialize the heap. The buffer of 1 is really important because // its possible for the expiry loop to trigger the heap to update // itself and it'd block forever otherwise. -func newExpiryHeap() *expiryHeap { - h := &expiryHeap{NotifyCh: make(chan struct{}, 1)} - heap.Init(h) +func NewExpiryHeap() *ExpiryHeap { + h := &ExpiryHeap{NotifyCh: make(chan struct{}, 1)} + heap.Init((*entryHeap)(h)) return h } // Add an entry to the heap. // // Must be synchronized by the caller. -func (h *expiryHeap) Add(key string, expiry time.Duration) *cacheEntryExpiry { - entry := &cacheEntryExpiry{ +func (h *ExpiryHeap) Add(key string, expiry time.Duration) *CacheEntryExpiry { + entry := &CacheEntryExpiry{ Key: key, Expires: time.Now().Add(expiry), // Set the initial heap index to the last index. If the entry is swapped it // will have the correct index set, and if it remains at the end the last // index will be correct. - HeapIndex: len(h.Entries), + HeapIndex: len(h.entries), } - heap.Push(h, entry) + heap.Push((*entryHeap)(h), entry) if entry.HeapIndex == 0 { h.notify() } @@ -58,10 +58,10 @@ func (h *expiryHeap) Add(key string, expiry time.Duration) *cacheEntryExpiry { // will be rebalanced after the entry is updated. // // Must be synchronized by the caller. -func (h *expiryHeap) Update(idx int, expiry time.Duration) { - entry := h.Entries[idx] +func (h *ExpiryHeap) Update(idx int, expiry time.Duration) { + entry := h.entries[idx] entry.Expires = time.Now().Add(expiry) - heap.Fix(h, idx) + heap.Fix((*entryHeap)(h), idx) // If the previous index and current index are both zero then Fix did not // swap the entry, and notify must be called here. @@ -71,9 +71,9 @@ func (h *expiryHeap) Update(idx int, expiry time.Duration) { } // Must be synchronized by the caller. -func (h *expiryHeap) Remove(idx int) { - entry := h.Entries[idx] - heap.Remove(h, idx) +func (h *ExpiryHeap) Remove(idx int) { + entry := h.entries[idx] + heap.Remove((*entryHeap)(h), idx) // A goroutine which is fetching a new value will have a reference to this // entry. When it re-acquires the lock it needs to be informed that @@ -86,38 +86,40 @@ func (h *expiryHeap) Remove(idx int) { } } -func (h *expiryHeap) Len() int { return len(h.Entries) } +type entryHeap ExpiryHeap -func (h *expiryHeap) Swap(i, j int) { - h.Entries[i], h.Entries[j] = h.Entries[j], h.Entries[i] - h.Entries[i].HeapIndex = i - h.Entries[j].HeapIndex = j +func (h *entryHeap) Len() int { return len(h.entries) } + +func (h *entryHeap) Swap(i, j int) { + h.entries[i], h.entries[j] = h.entries[j], h.entries[i] + h.entries[i].HeapIndex = i + h.entries[j].HeapIndex = j } -func (h *expiryHeap) Less(i, j int) bool { +func (h *entryHeap) Less(i, j int) bool { // The usage of Before here is important (despite being obvious): // this function uses the monotonic time that should be available // on the time.Time value so the heap is immune to wall clock changes. - return h.Entries[i].Expires.Before(h.Entries[j].Expires) + return h.entries[i].Expires.Before(h.entries[j].Expires) } // heap.Interface, this isn't expected to be called directly. -func (h *expiryHeap) Push(x interface{}) { - h.Entries = append(h.Entries, x.(*cacheEntryExpiry)) +func (h *entryHeap) Push(x interface{}) { + h.entries = append(h.entries, x.(*CacheEntryExpiry)) } // heap.Interface, this isn't expected to be called directly. -func (h *expiryHeap) Pop() interface{} { - n := len(h.Entries) - entries := h.Entries +func (h *entryHeap) Pop() interface{} { + n := len(h.entries) + entries := h.entries last := entries[n-1] - h.Entries = entries[0 : n-1] + h.entries = entries[0 : n-1] return last } // notify the timer that the head value has changed, so the expiry time has // also likely changed. -func (h *expiryHeap) notify() { +func (h *ExpiryHeap) notify() { // Send to channel without blocking. Skips sending if there is already // an item in the buffered channel. select { @@ -127,30 +129,30 @@ func (h *expiryHeap) notify() { } // Must be synchronized by the caller. -func (h *expiryHeap) Next() timer { - if len(h.Entries) == 0 { - return timer{} +func (h *ExpiryHeap) Next() Timer { + if len(h.entries) == 0 { + return Timer{} } - entry := h.Entries[0] - return timer{ + entry := h.entries[0] + return Timer{ timer: time.NewTimer(time.Until(entry.Expires)), Entry: entry, } } -type timer struct { +type Timer struct { timer *time.Timer - Entry *cacheEntryExpiry + Entry *CacheEntryExpiry } -func (t *timer) Wait() <-chan time.Time { +func (t *Timer) Wait() <-chan time.Time { if t.timer == nil { return nil } return t.timer.C } -func (t *timer) Stop() { +func (t *Timer) Stop() { if t.timer != nil { t.timer.Stop() } diff --git a/agent/cache/eviction_test.go b/agent/cache/eviction_test.go index 0b200c2dd..1612b2947 100644 --- a/agent/cache/eviction_test.go +++ b/agent/cache/eviction_test.go @@ -6,46 +6,43 @@ import ( "time" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" ) -var _ heap.Interface = new(expiryHeap) +var _ heap.Interface = (*entryHeap)(nil) func TestExpiryHeap(t *testing.T) { - require := require.New(t) - ch := make(chan struct{}, 10) // buffered to prevent blocking in tests - h := &expiryHeap{NotifyCh: ch} - var entry, entry2, entry3 *cacheEntryExpiry + h := NewExpiryHeap() + ch := h.NotifyCh + var entry, entry2, entry3 *CacheEntryExpiry // Init, shouldn't trigger anything - heap.Init(h) testNoMessage(t, ch) runStep(t, "add an entry", func(t *testing.T) { entry = h.Add("foo", 100*time.Millisecond) - require.Equal(0, entry.HeapIndex) + assert.Equal(t, 0, entry.HeapIndex) testMessage(t, ch) testNoMessage(t, ch) // exactly one asserted above }) runStep(t, "add a second entry in front", func(t *testing.T) { entry2 = h.Add("bar", 50*time.Millisecond) - require.Equal(0, entry2.HeapIndex) - require.Equal(1, entry.HeapIndex) + assert.Equal(t, 0, entry2.HeapIndex) + assert.Equal(t, 1, entry.HeapIndex) testMessage(t, ch) testNoMessage(t, ch) // exactly one asserted above }) runStep(t, "add a third entry at the end", func(t *testing.T) { entry3 = h.Add("baz", 1000*time.Millisecond) - require.Equal(2, entry3.HeapIndex) + assert.Equal(t, 2, entry3.HeapIndex) testNoMessage(t, ch) // no notify cause index 0 stayed the same }) runStep(t, "remove the first entry", func(t *testing.T) { h.Remove(0) - require.Equal(0, entry.HeapIndex) - require.Equal(1, entry3.HeapIndex) + assert.Equal(t, 0, entry.HeapIndex) + assert.Equal(t, 1, entry3.HeapIndex) testMessage(t, ch) testNoMessage(t, ch) }) @@ -60,8 +57,8 @@ func TestExpiryHeap(t *testing.T) { runStep(t, "0th element change triggers a notify", func(t *testing.T) { h.Update(entry3.HeapIndex, 20) - require.Equal(1, entry.HeapIndex) // no move - require.Equal(0, entry3.HeapIndex) + assert.Equal(t, 1, entry.HeapIndex) // no move + assert.Equal(t, 0, entry3.HeapIndex) testMessage(t, ch) testNoMessage(t, ch) // one message }) @@ -97,7 +94,7 @@ func TestExpiryLoop_ExitsWhenStopped(t *testing.T) { c := &Cache{ stopCh: make(chan struct{}), entries: make(map[string]cacheEntry), - entriesExpiryHeap: newExpiryHeap(), + entriesExpiryHeap: NewExpiryHeap(), } chStart := make(chan struct{}) chDone := make(chan struct{}) From 9d5b738cdbbcb7ec445e88fe80ca08f94af6ff1f Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Tue, 20 Oct 2020 18:34:42 -0400 Subject: [PATCH 5/8] lib/ttlcache: extract package from agent/cache --- agent/cache/cache.go | 13 ++-- agent/cache/cache_test.go | 25 ++++++++ agent/cache/entry.go | 4 +- {agent/cache => lib/ttlcache}/eviction.go | 63 +++++++++++-------- .../cache => lib/ttlcache}/eviction_test.go | 52 +++++---------- 5 files changed, 86 insertions(+), 71 deletions(-) rename {agent/cache => lib/ttlcache}/eviction.go (74%) rename {agent/cache => lib/ttlcache}/eviction_test.go (60%) diff --git a/agent/cache/cache.go b/agent/cache/cache.go index 03b9df8b9..c7b7bcfc1 100644 --- a/agent/cache/cache.go +++ b/agent/cache/cache.go @@ -26,6 +26,8 @@ import ( "github.com/armon/go-metrics" "golang.org/x/time/rate" + "github.com/hashicorp/consul/lib/ttlcache" + "github.com/hashicorp/consul/lib" ) @@ -87,7 +89,7 @@ type Cache struct { // internal storage format so changing this should be possible safely. entriesLock sync.RWMutex entries map[string]cacheEntry - entriesExpiryHeap *ExpiryHeap + entriesExpiryHeap *ttlcache.ExpiryHeap // stopped is used as an atomic flag to signal that the Cache has been // discarded so background fetches and expiry processing should stop. @@ -169,7 +171,7 @@ func New(options Options) *Cache { c := &Cache{ types: make(map[string]typeEntry), entries: make(map[string]cacheEntry), - entriesExpiryHeap: NewExpiryHeap(), + entriesExpiryHeap: ttlcache.NewExpiryHeap(), stopCh: make(chan struct{}), options: options, rateLimitContext: ctx, @@ -400,7 +402,7 @@ RETRY_GET: // Touch the expiration and fix the heap. c.entriesLock.Lock() - c.entriesExpiryHeap.Update(entry.Expiry.HeapIndex, r.TypeEntry.Opts.LastGetTTL) + c.entriesExpiryHeap.Update(entry.Expiry.Index(), r.TypeEntry.Opts.LastGetTTL) c.entriesLock.Unlock() // We purposely do not return an error here since the cache only works with @@ -681,7 +683,7 @@ func (c *Cache) fetch(key string, r getOptions, allowNew bool, attempt uint, ign // If this is a new entry (not in the heap yet), then setup the // initial expiry information and insert. If we're already in // the heap we do nothing since we're reusing the same entry. - if newEntry.Expiry == nil || newEntry.Expiry.HeapIndex == -1 { + if newEntry.Expiry == nil || newEntry.Expiry.Index() == -1 { newEntry.Expiry = c.entriesExpiryHeap.Add(key, tEntry.Opts.LastGetTTL) } @@ -762,7 +764,7 @@ func (c *Cache) runExpiryLoop() { // Entry expired! Remove it. delete(c.entries, entry.Key) - c.entriesExpiryHeap.Remove(entry.HeapIndex) + c.entriesExpiryHeap.Remove(entry.Index()) // Set some metrics metrics.IncrCounter([]string{"consul", "cache", "evict_expired"}, 1) @@ -803,7 +805,6 @@ func (c *Cache) Prepopulate(t string, res FetchResult, dc, token, k string) erro Index: res.Index, FetchedAt: time.Now(), Waiter: make(chan struct{}), - Expiry: &CacheEntryExpiry{Key: key}, FetchRateLimiter: rate.NewLimiter( c.options.EntryFetchRate, c.options.EntryFetchMaxBurst, diff --git a/agent/cache/cache_test.go b/agent/cache/cache_test.go index 780137e18..8988b3bf6 100644 --- a/agent/cache/cache_test.go +++ b/agent/cache/cache_test.go @@ -15,6 +15,7 @@ import ( "github.com/stretchr/testify/require" "golang.org/x/time/rate" + "github.com/hashicorp/consul/lib/ttlcache" "github.com/hashicorp/consul/sdk/testutil" ) @@ -1405,3 +1406,27 @@ OUT: } } } + +func TestCache_ExpiryLoop_ExitsWhenStopped(t *testing.T) { + c := &Cache{ + stopCh: make(chan struct{}), + entries: make(map[string]cacheEntry), + entriesExpiryHeap: ttlcache.NewExpiryHeap(), + } + chStart := make(chan struct{}) + chDone := make(chan struct{}) + go func() { + close(chStart) + c.runExpiryLoop() + close(chDone) + }() + + <-chStart + close(c.stopCh) + + select { + case <-chDone: + case <-time.After(50 * time.Millisecond): + t.Fatalf("expected loop to exit when stopped") + } +} diff --git a/agent/cache/entry.go b/agent/cache/entry.go index fca1f20d1..0c71e9443 100644 --- a/agent/cache/entry.go +++ b/agent/cache/entry.go @@ -4,6 +4,8 @@ import ( "time" "golang.org/x/time/rate" + + "github.com/hashicorp/consul/lib/ttlcache" ) // cacheEntry stores a single cache entry. @@ -31,7 +33,7 @@ type cacheEntry struct { // Expiry contains information about the expiration of this // entry. This is a pointer as its shared as a value in the // ExpiryHeap as well. - Expiry *CacheEntryExpiry + Expiry *ttlcache.Entry // FetchedAt stores the time the cache entry was retrieved for determining // it's age later. diff --git a/agent/cache/eviction.go b/lib/ttlcache/eviction.go similarity index 74% rename from agent/cache/eviction.go rename to lib/ttlcache/eviction.go index 579f13fef..c04855a9b 100644 --- a/agent/cache/eviction.go +++ b/lib/ttlcache/eviction.go @@ -1,15 +1,24 @@ -package cache +package ttlcache import ( "container/heap" "time" ) -// CacheEntryExpiry contains the expiration time for a cache entry. -type CacheEntryExpiry struct { - Key string // Key in the cache map - Expires time.Time // Time when entry expires (monotonic clock) - HeapIndex int // Index in the heap +// Entry in the ExpiryHeap, tracks the index and expiry time of an item in a +// ttl cache. +type Entry struct { + // TODO: can Key be unexported? + Key string + expiry time.Time + heapIndex int +} + +func (c *Entry) Index() int { + if c == nil { + return -1 + } + return c.heapIndex } // ExpiryHeap is a container/heap.Interface implementation that expires entries @@ -18,7 +27,7 @@ type CacheEntryExpiry struct { // All operations on the heap and read/write of the heap contents require // the proper entriesLock to be held on Cache. type ExpiryHeap struct { - entries []*CacheEntryExpiry + entries []*Entry // NotifyCh is sent a value whenever the 0 index value of the heap // changes. This can be used to detect when the earliest value @@ -26,9 +35,7 @@ type ExpiryHeap struct { NotifyCh chan struct{} } -// Initialize the heap. The buffer of 1 is really important because -// its possible for the expiry loop to trigger the heap to update -// itself and it'd block forever otherwise. +// NewExpiryHeap creates and returns a new ExpiryHeap. func NewExpiryHeap() *ExpiryHeap { h := &ExpiryHeap{NotifyCh: make(chan struct{}, 1)} heap.Init((*entryHeap)(h)) @@ -38,17 +45,17 @@ func NewExpiryHeap() *ExpiryHeap { // Add an entry to the heap. // // Must be synchronized by the caller. -func (h *ExpiryHeap) Add(key string, expiry time.Duration) *CacheEntryExpiry { - entry := &CacheEntryExpiry{ - Key: key, - Expires: time.Now().Add(expiry), +func (h *ExpiryHeap) Add(key string, expiry time.Duration) *Entry { + entry := &Entry{ + Key: key, + expiry: time.Now().Add(expiry), // Set the initial heap index to the last index. If the entry is swapped it // will have the correct index set, and if it remains at the end the last // index will be correct. - HeapIndex: len(h.entries), + heapIndex: len(h.entries), } heap.Push((*entryHeap)(h), entry) - if entry.HeapIndex == 0 { + if entry.heapIndex == 0 { h.notify() } return entry @@ -60,16 +67,18 @@ func (h *ExpiryHeap) Add(key string, expiry time.Duration) *CacheEntryExpiry { // Must be synchronized by the caller. func (h *ExpiryHeap) Update(idx int, expiry time.Duration) { entry := h.entries[idx] - entry.Expires = time.Now().Add(expiry) + entry.expiry = time.Now().Add(expiry) heap.Fix((*entryHeap)(h), idx) // If the previous index and current index are both zero then Fix did not // swap the entry, and notify must be called here. - if idx == 0 || entry.HeapIndex == 0 { + if idx == 0 || entry.heapIndex == 0 { h.notify() } } +// Remove the entry at idx from the heap. +// // Must be synchronized by the caller. func (h *ExpiryHeap) Remove(idx int) { entry := h.entries[idx] @@ -77,9 +86,9 @@ func (h *ExpiryHeap) Remove(idx int) { // A goroutine which is fetching a new value will have a reference to this // entry. When it re-acquires the lock it needs to be informed that - // the entry was expired while it was fetching. Setting HeapIndex to -1 + // the entry was expired while it was fetching. Setting heapIndex to -1 // indicates that the entry is no longer in the heap, and must be re-added. - entry.HeapIndex = -1 + entry.heapIndex = -1 if idx == 0 { h.notify() @@ -92,20 +101,20 @@ func (h *entryHeap) Len() int { return len(h.entries) } func (h *entryHeap) Swap(i, j int) { h.entries[i], h.entries[j] = h.entries[j], h.entries[i] - h.entries[i].HeapIndex = i - h.entries[j].HeapIndex = j + h.entries[i].heapIndex = i + h.entries[j].heapIndex = j } func (h *entryHeap) Less(i, j int) bool { // The usage of Before here is important (despite being obvious): // this function uses the monotonic time that should be available // on the time.Time value so the heap is immune to wall clock changes. - return h.entries[i].Expires.Before(h.entries[j].Expires) + return h.entries[i].expiry.Before(h.entries[j].expiry) } // heap.Interface, this isn't expected to be called directly. func (h *entryHeap) Push(x interface{}) { - h.entries = append(h.entries, x.(*CacheEntryExpiry)) + h.entries = append(h.entries, x.(*Entry)) } // heap.Interface, this isn't expected to be called directly. @@ -128,6 +137,8 @@ func (h *ExpiryHeap) notify() { } } +// Next returns a Timer that waits until the first entry in the heap expires. +// // Must be synchronized by the caller. func (h *ExpiryHeap) Next() Timer { if len(h.entries) == 0 { @@ -135,14 +146,14 @@ func (h *ExpiryHeap) Next() Timer { } entry := h.entries[0] return Timer{ - timer: time.NewTimer(time.Until(entry.Expires)), + timer: time.NewTimer(time.Until(entry.expiry)), Entry: entry, } } type Timer struct { timer *time.Timer - Entry *CacheEntryExpiry + Entry *Entry } func (t *Timer) Wait() <-chan time.Time { diff --git a/agent/cache/eviction_test.go b/lib/ttlcache/eviction_test.go similarity index 60% rename from agent/cache/eviction_test.go rename to lib/ttlcache/eviction_test.go index 1612b2947..1fdd8e485 100644 --- a/agent/cache/eviction_test.go +++ b/lib/ttlcache/eviction_test.go @@ -1,4 +1,4 @@ -package cache +package ttlcache import ( "container/heap" @@ -13,52 +13,52 @@ var _ heap.Interface = (*entryHeap)(nil) func TestExpiryHeap(t *testing.T) { h := NewExpiryHeap() ch := h.NotifyCh - var entry, entry2, entry3 *CacheEntryExpiry + var entry, entry2, entry3 *Entry // Init, shouldn't trigger anything testNoMessage(t, ch) runStep(t, "add an entry", func(t *testing.T) { entry = h.Add("foo", 100*time.Millisecond) - assert.Equal(t, 0, entry.HeapIndex) + assert.Equal(t, 0, entry.heapIndex) testMessage(t, ch) testNoMessage(t, ch) // exactly one asserted above }) runStep(t, "add a second entry in front", func(t *testing.T) { entry2 = h.Add("bar", 50*time.Millisecond) - assert.Equal(t, 0, entry2.HeapIndex) - assert.Equal(t, 1, entry.HeapIndex) + assert.Equal(t, 0, entry2.heapIndex) + assert.Equal(t, 1, entry.heapIndex) testMessage(t, ch) testNoMessage(t, ch) // exactly one asserted above }) runStep(t, "add a third entry at the end", func(t *testing.T) { entry3 = h.Add("baz", 1000*time.Millisecond) - assert.Equal(t, 2, entry3.HeapIndex) + assert.Equal(t, 2, entry3.heapIndex) testNoMessage(t, ch) // no notify cause index 0 stayed the same }) runStep(t, "remove the first entry", func(t *testing.T) { h.Remove(0) - assert.Equal(t, 0, entry.HeapIndex) - assert.Equal(t, 1, entry3.HeapIndex) + assert.Equal(t, 0, entry.heapIndex) + assert.Equal(t, 1, entry3.heapIndex) testMessage(t, ch) testNoMessage(t, ch) }) runStep(t, "update entry3 to expire first", func(t *testing.T) { - h.Update(entry3.HeapIndex, 10*time.Millisecond) - assert.Equal(t, 1, entry.HeapIndex) - assert.Equal(t, 0, entry3.HeapIndex) + h.Update(entry3.heapIndex, 10*time.Millisecond) + assert.Equal(t, 1, entry.heapIndex) + assert.Equal(t, 0, entry3.heapIndex) testMessage(t, ch) testNoMessage(t, ch) }) runStep(t, "0th element change triggers a notify", func(t *testing.T) { - h.Update(entry3.HeapIndex, 20) - assert.Equal(t, 1, entry.HeapIndex) // no move - assert.Equal(t, 0, entry3.HeapIndex) + h.Update(entry3.heapIndex, 20) + assert.Equal(t, 1, entry.heapIndex) // no move + assert.Equal(t, 0, entry3.heapIndex) testMessage(t, ch) testNoMessage(t, ch) // one message }) @@ -89,27 +89,3 @@ func runStep(t *testing.T, name string, fn func(t *testing.T)) { t.FailNow() } } - -func TestExpiryLoop_ExitsWhenStopped(t *testing.T) { - c := &Cache{ - stopCh: make(chan struct{}), - entries: make(map[string]cacheEntry), - entriesExpiryHeap: NewExpiryHeap(), - } - chStart := make(chan struct{}) - chDone := make(chan struct{}) - go func() { - close(chStart) - c.runExpiryLoop() - close(chDone) - }() - - <-chStart - close(c.stopCh) - - select { - case <-chDone: - case <-time.After(50 * time.Millisecond): - t.Fatalf("expected loop to exit when stopped") - } -} From 0beaced90fd24631bd65b2f2336d581b300ae1c9 Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Tue, 20 Oct 2020 18:58:30 -0400 Subject: [PATCH 6/8] cache: fix a bug with Prepopulate Prepopulate was setting entry.Expiry.HeapIndex to 0. Previously this would result in a call to heap.Fix(0) which wasn't correct, but was also not really a problem because at worse it would re-notify. With the recent change to extract cachettl it was changed to call Update(idx), which would have updated the wrong entry. A previous commit removed the setting of entry.Expiry so that the HeapIndex would be reported as -1, and this commit adds a test and handles the -1 heap index. --- agent/cache/cache_test.go | 46 +++++++++++++++++++++++++++++++++++++++ lib/ttlcache/eviction.go | 4 ++++ 2 files changed, 50 insertions(+) diff --git a/agent/cache/cache_test.go b/agent/cache/cache_test.go index 8988b3bf6..8b77773f9 100644 --- a/agent/cache/cache_test.go +++ b/agent/cache/cache_test.go @@ -1430,3 +1430,49 @@ func TestCache_ExpiryLoop_ExitsWhenStopped(t *testing.T) { t.Fatalf("expected loop to exit when stopped") } } + +func TestCache_Prepopulate(t *testing.T) { + typ := &fakeType{index: 5} + c := New(Options{}) + c.RegisterType("t", typ) + + c.Prepopulate("t", FetchResult{Value: 17, Index: 1}, "dc1", "token", "v1") + + ctx := context.Background() + req := fakeRequest{ + info: RequestInfo{ + Key: "v1", + Token: "token", + Datacenter: "dc1", + MinIndex: 1, + }, + } + result, _, err := c.Get(ctx, "t", req) + require.NoError(t, err) + require.Equal(t, 17, result) +} + +type fakeType struct { + index uint64 +} + +func (f fakeType) Fetch(_ FetchOptions, _ Request) (FetchResult, error) { + idx := atomic.LoadUint64(&f.index) + return FetchResult{Value: int(idx * 2), Index: idx}, nil +} + +func (f fakeType) RegisterOptions() RegisterOptions { + return RegisterOptions{Refresh: true} +} + +var _ Type = (*fakeType)(nil) + +type fakeRequest struct { + info RequestInfo +} + +func (f fakeRequest) CacheInfo() RequestInfo { + return f.info +} + +var _ Request = (*fakeRequest)(nil) diff --git a/lib/ttlcache/eviction.go b/lib/ttlcache/eviction.go index c04855a9b..df9624bdc 100644 --- a/lib/ttlcache/eviction.go +++ b/lib/ttlcache/eviction.go @@ -66,6 +66,10 @@ func (h *ExpiryHeap) Add(key string, expiry time.Duration) *Entry { // // Must be synchronized by the caller. func (h *ExpiryHeap) Update(idx int, expiry time.Duration) { + if idx < 0 { + // the previous entry did not have a valid index, its not in the heap + return + } entry := h.entries[idx] entry.expiry = time.Now().Add(expiry) heap.Fix((*entryHeap)(h), idx) From 2601998766091a2e7f635eb18edf2663b98ef843 Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Tue, 20 Oct 2020 19:00:24 -0400 Subject: [PATCH 7/8] lib/ttlcache: add a constant for NotIndexed --- agent/cache/cache.go | 2 +- lib/ttlcache/eviction.go | 10 +++++++--- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/agent/cache/cache.go b/agent/cache/cache.go index c7b7bcfc1..dd6d09234 100644 --- a/agent/cache/cache.go +++ b/agent/cache/cache.go @@ -683,7 +683,7 @@ func (c *Cache) fetch(key string, r getOptions, allowNew bool, attempt uint, ign // If this is a new entry (not in the heap yet), then setup the // initial expiry information and insert. If we're already in // the heap we do nothing since we're reusing the same entry. - if newEntry.Expiry == nil || newEntry.Expiry.Index() == -1 { + if newEntry.Expiry == nil || newEntry.Expiry.Index() == ttlcache.NotIndexed { newEntry.Expiry = c.entriesExpiryHeap.Add(key, tEntry.Opts.LastGetTTL) } diff --git a/lib/ttlcache/eviction.go b/lib/ttlcache/eviction.go index df9624bdc..3c704edeb 100644 --- a/lib/ttlcache/eviction.go +++ b/lib/ttlcache/eviction.go @@ -14,9 +14,13 @@ type Entry struct { heapIndex int } +// NotIndexed indicates that the entry does not exist in the heap. Either because +// it is nil, or because it was removed. +const NotIndexed = -1 + func (c *Entry) Index() int { if c == nil { - return -1 + return NotIndexed } return c.heapIndex } @@ -66,7 +70,7 @@ func (h *ExpiryHeap) Add(key string, expiry time.Duration) *Entry { // // Must be synchronized by the caller. func (h *ExpiryHeap) Update(idx int, expiry time.Duration) { - if idx < 0 { + if idx == NotIndexed { // the previous entry did not have a valid index, its not in the heap return } @@ -92,7 +96,7 @@ func (h *ExpiryHeap) Remove(idx int) { // entry. When it re-acquires the lock it needs to be informed that // the entry was expired while it was fetching. Setting heapIndex to -1 // indicates that the entry is no longer in the heap, and must be re-added. - entry.heapIndex = -1 + entry.heapIndex = NotIndexed if idx == 0 { h.notify() From 09d62f1df09b5baf9dc2a27960d1523fdee908f0 Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Tue, 20 Oct 2020 19:09:51 -0400 Subject: [PATCH 8/8] lib/ttlcache: unexport key and additional godoc --- agent/cache/cache.go | 7 +++---- lib/ttlcache/eviction.go | 37 +++++++++++++++++++++++++++---------- 2 files changed, 30 insertions(+), 14 deletions(-) diff --git a/agent/cache/cache.go b/agent/cache/cache.go index dd6d09234..1a5193792 100644 --- a/agent/cache/cache.go +++ b/agent/cache/cache.go @@ -26,9 +26,8 @@ import ( "github.com/armon/go-metrics" "golang.org/x/time/rate" - "github.com/hashicorp/consul/lib/ttlcache" - "github.com/hashicorp/consul/lib" + "github.com/hashicorp/consul/lib/ttlcache" ) //go:generate mockery -all -inpkg @@ -758,12 +757,12 @@ func (c *Cache) runExpiryLoop() { c.entriesLock.Lock() entry := timer.Entry - if closer, ok := c.entries[entry.Key].State.(io.Closer); ok { + if closer, ok := c.entries[entry.Key()].State.(io.Closer); ok { closer.Close() } // Entry expired! Remove it. - delete(c.entries, entry.Key) + delete(c.entries, entry.Key()) c.entriesExpiryHeap.Remove(entry.Index()) // Set some metrics diff --git a/lib/ttlcache/eviction.go b/lib/ttlcache/eviction.go index 3c704edeb..d07b2bf46 100644 --- a/lib/ttlcache/eviction.go +++ b/lib/ttlcache/eviction.go @@ -1,3 +1,8 @@ +/* +Package ttlcache provides an ExpiryHeap that can be used by a cache to track the +expiration time of its entries. When an expiry is reached the Timer will fire +and the entry can be removed. +*/ package ttlcache import ( @@ -8,8 +13,7 @@ import ( // Entry in the ExpiryHeap, tracks the index and expiry time of an item in a // ttl cache. type Entry struct { - // TODO: can Key be unexported? - Key string + key string expiry time.Time heapIndex int } @@ -18,18 +22,25 @@ type Entry struct { // it is nil, or because it was removed. const NotIndexed = -1 -func (c *Entry) Index() int { - if c == nil { +// Index returns the index of this entry within the heap. +func (e *Entry) Index() int { + if e == nil { return NotIndexed } - return c.heapIndex + return e.heapIndex } -// ExpiryHeap is a container/heap.Interface implementation that expires entries -// in the cache when their expiration time is reached. +// Key returns the key for the entry in the heap. +func (e *Entry) Key() string { + return e.key +} + +// ExpiryHeap is a heap that is ordered by the expiry time of entries. It may +// be used by a cache or storage to expiry items after a TTL. // -// All operations on the heap and read/write of the heap contents require -// the proper entriesLock to be held on Cache. +// ExpiryHeap expects the caller to synchronize calls to most of its methods. This +// is necessary because the cache needs to ensure that updates to both its +// storage and the ExpiryHeap are synchronized. type ExpiryHeap struct { entries []*Entry @@ -51,7 +62,7 @@ func NewExpiryHeap() *ExpiryHeap { // Must be synchronized by the caller. func (h *ExpiryHeap) Add(key string, expiry time.Duration) *Entry { entry := &Entry{ - Key: key, + key: key, expiry: time.Now().Add(expiry), // Set the initial heap index to the last index. If the entry is swapped it // will have the correct index set, and if it remains at the end the last @@ -159,6 +170,12 @@ func (h *ExpiryHeap) Next() Timer { } } +// Timer provides a channel to block on. When the Wait channel receives an +// item the Timer.Entry has expired. The caller is expected to call +// ExpiryHeap.Remove with the Entry.Index(). +// +// The caller is responsible for calling Stop to stop the timer if the timer has +// not fired. type Timer struct { timer *time.Timer Entry *Entry