cache: export ExpiryHeap

and hide internal methods on an unexported type, so that when it is extrated those methods are not exported.
This commit is contained in:
Daniel Nephin 2020-10-20 17:38:12 -04:00
parent d3742a1d0e
commit 909b8e674e
4 changed files with 61 additions and 62 deletions

View file

@ -87,7 +87,7 @@ type Cache struct {
// internal storage format so changing this should be possible safely. // internal storage format so changing this should be possible safely.
entriesLock sync.RWMutex entriesLock sync.RWMutex
entries map[string]cacheEntry entries map[string]cacheEntry
entriesExpiryHeap *expiryHeap entriesExpiryHeap *ExpiryHeap
// stopped is used as an atomic flag to signal that the Cache has been // stopped is used as an atomic flag to signal that the Cache has been
// discarded so background fetches and expiry processing should stop. // discarded so background fetches and expiry processing should stop.
@ -169,7 +169,7 @@ func New(options Options) *Cache {
c := &Cache{ c := &Cache{
types: make(map[string]typeEntry), types: make(map[string]typeEntry),
entries: make(map[string]cacheEntry), entries: make(map[string]cacheEntry),
entriesExpiryHeap: newExpiryHeap(), entriesExpiryHeap: NewExpiryHeap(),
stopCh: make(chan struct{}), stopCh: make(chan struct{}),
options: options, options: options,
rateLimitContext: ctx, rateLimitContext: ctx,
@ -803,7 +803,7 @@ func (c *Cache) Prepopulate(t string, res FetchResult, dc, token, k string) erro
Index: res.Index, Index: res.Index,
FetchedAt: time.Now(), FetchedAt: time.Now(),
Waiter: make(chan struct{}), Waiter: make(chan struct{}),
Expiry: &cacheEntryExpiry{Key: key}, Expiry: &CacheEntryExpiry{Key: key},
FetchRateLimiter: rate.NewLimiter( FetchRateLimiter: rate.NewLimiter(
c.options.EntryFetchRate, c.options.EntryFetchRate,
c.options.EntryFetchMaxBurst, c.options.EntryFetchMaxBurst,

View file

@ -30,8 +30,8 @@ type cacheEntry struct {
// Expiry contains information about the expiration of this // Expiry contains information about the expiration of this
// entry. This is a pointer as its shared as a value in the // entry. This is a pointer as its shared as a value in the
// expiryHeap as well. // ExpiryHeap as well.
Expiry *cacheEntryExpiry Expiry *CacheEntryExpiry
// FetchedAt stores the time the cache entry was retrieved for determining // FetchedAt stores the time the cache entry was retrieved for determining
// it's age later. // it's age later.

View file

@ -5,20 +5,20 @@ import (
"time" "time"
) )
// cacheEntryExpiry contains the expiration time for a cache entry. // CacheEntryExpiry contains the expiration time for a cache entry.
type cacheEntryExpiry struct { type CacheEntryExpiry struct {
Key string // Key in the cache map Key string // Key in the cache map
Expires time.Time // Time when entry expires (monotonic clock) Expires time.Time // Time when entry expires (monotonic clock)
HeapIndex int // Index in the heap HeapIndex int // Index in the heap
} }
// expiryHeap is a container/heap.Interface implementation that expires entries // ExpiryHeap is a container/heap.Interface implementation that expires entries
// in the cache when their expiration time is reached. // in the cache when their expiration time is reached.
// //
// All operations on the heap and read/write of the heap contents require // All operations on the heap and read/write of the heap contents require
// the proper entriesLock to be held on Cache. // the proper entriesLock to be held on Cache.
type expiryHeap struct { type ExpiryHeap struct {
Entries []*cacheEntryExpiry entries []*CacheEntryExpiry
// NotifyCh is sent a value whenever the 0 index value of the heap // NotifyCh is sent a value whenever the 0 index value of the heap
// changes. This can be used to detect when the earliest value // changes. This can be used to detect when the earliest value
@ -29,25 +29,25 @@ type expiryHeap struct {
// Initialize the heap. The buffer of 1 is really important because // Initialize the heap. The buffer of 1 is really important because
// its possible for the expiry loop to trigger the heap to update // its possible for the expiry loop to trigger the heap to update
// itself and it'd block forever otherwise. // itself and it'd block forever otherwise.
func newExpiryHeap() *expiryHeap { func NewExpiryHeap() *ExpiryHeap {
h := &expiryHeap{NotifyCh: make(chan struct{}, 1)} h := &ExpiryHeap{NotifyCh: make(chan struct{}, 1)}
heap.Init(h) heap.Init((*entryHeap)(h))
return h return h
} }
// Add an entry to the heap. // Add an entry to the heap.
// //
// Must be synchronized by the caller. // Must be synchronized by the caller.
func (h *expiryHeap) Add(key string, expiry time.Duration) *cacheEntryExpiry { func (h *ExpiryHeap) Add(key string, expiry time.Duration) *CacheEntryExpiry {
entry := &cacheEntryExpiry{ entry := &CacheEntryExpiry{
Key: key, Key: key,
Expires: time.Now().Add(expiry), Expires: time.Now().Add(expiry),
// Set the initial heap index to the last index. If the entry is swapped it // Set the initial heap index to the last index. If the entry is swapped it
// will have the correct index set, and if it remains at the end the last // will have the correct index set, and if it remains at the end the last
// index will be correct. // index will be correct.
HeapIndex: len(h.Entries), HeapIndex: len(h.entries),
} }
heap.Push(h, entry) heap.Push((*entryHeap)(h), entry)
if entry.HeapIndex == 0 { if entry.HeapIndex == 0 {
h.notify() h.notify()
} }
@ -58,10 +58,10 @@ func (h *expiryHeap) Add(key string, expiry time.Duration) *cacheEntryExpiry {
// will be rebalanced after the entry is updated. // will be rebalanced after the entry is updated.
// //
// Must be synchronized by the caller. // Must be synchronized by the caller.
func (h *expiryHeap) Update(idx int, expiry time.Duration) { func (h *ExpiryHeap) Update(idx int, expiry time.Duration) {
entry := h.Entries[idx] entry := h.entries[idx]
entry.Expires = time.Now().Add(expiry) entry.Expires = time.Now().Add(expiry)
heap.Fix(h, idx) heap.Fix((*entryHeap)(h), idx)
// If the previous index and current index are both zero then Fix did not // If the previous index and current index are both zero then Fix did not
// swap the entry, and notify must be called here. // swap the entry, and notify must be called here.
@ -71,9 +71,9 @@ func (h *expiryHeap) Update(idx int, expiry time.Duration) {
} }
// Must be synchronized by the caller. // Must be synchronized by the caller.
func (h *expiryHeap) Remove(idx int) { func (h *ExpiryHeap) Remove(idx int) {
entry := h.Entries[idx] entry := h.entries[idx]
heap.Remove(h, idx) heap.Remove((*entryHeap)(h), idx)
// A goroutine which is fetching a new value will have a reference to this // A goroutine which is fetching a new value will have a reference to this
// entry. When it re-acquires the lock it needs to be informed that // entry. When it re-acquires the lock it needs to be informed that
@ -86,38 +86,40 @@ func (h *expiryHeap) Remove(idx int) {
} }
} }
func (h *expiryHeap) Len() int { return len(h.Entries) } type entryHeap ExpiryHeap
func (h *expiryHeap) Swap(i, j int) { func (h *entryHeap) Len() int { return len(h.entries) }
h.Entries[i], h.Entries[j] = h.Entries[j], h.Entries[i]
h.Entries[i].HeapIndex = i func (h *entryHeap) Swap(i, j int) {
h.Entries[j].HeapIndex = j h.entries[i], h.entries[j] = h.entries[j], h.entries[i]
h.entries[i].HeapIndex = i
h.entries[j].HeapIndex = j
} }
func (h *expiryHeap) Less(i, j int) bool { func (h *entryHeap) Less(i, j int) bool {
// The usage of Before here is important (despite being obvious): // The usage of Before here is important (despite being obvious):
// this function uses the monotonic time that should be available // this function uses the monotonic time that should be available
// on the time.Time value so the heap is immune to wall clock changes. // on the time.Time value so the heap is immune to wall clock changes.
return h.Entries[i].Expires.Before(h.Entries[j].Expires) return h.entries[i].Expires.Before(h.entries[j].Expires)
} }
// heap.Interface, this isn't expected to be called directly. // heap.Interface, this isn't expected to be called directly.
func (h *expiryHeap) Push(x interface{}) { func (h *entryHeap) Push(x interface{}) {
h.Entries = append(h.Entries, x.(*cacheEntryExpiry)) h.entries = append(h.entries, x.(*CacheEntryExpiry))
} }
// heap.Interface, this isn't expected to be called directly. // heap.Interface, this isn't expected to be called directly.
func (h *expiryHeap) Pop() interface{} { func (h *entryHeap) Pop() interface{} {
n := len(h.Entries) n := len(h.entries)
entries := h.Entries entries := h.entries
last := entries[n-1] last := entries[n-1]
h.Entries = entries[0 : n-1] h.entries = entries[0 : n-1]
return last return last
} }
// notify the timer that the head value has changed, so the expiry time has // notify the timer that the head value has changed, so the expiry time has
// also likely changed. // also likely changed.
func (h *expiryHeap) notify() { func (h *ExpiryHeap) notify() {
// Send to channel without blocking. Skips sending if there is already // Send to channel without blocking. Skips sending if there is already
// an item in the buffered channel. // an item in the buffered channel.
select { select {
@ -127,30 +129,30 @@ func (h *expiryHeap) notify() {
} }
// Must be synchronized by the caller. // Must be synchronized by the caller.
func (h *expiryHeap) Next() timer { func (h *ExpiryHeap) Next() Timer {
if len(h.Entries) == 0 { if len(h.entries) == 0 {
return timer{} return Timer{}
} }
entry := h.Entries[0] entry := h.entries[0]
return timer{ return Timer{
timer: time.NewTimer(time.Until(entry.Expires)), timer: time.NewTimer(time.Until(entry.Expires)),
Entry: entry, Entry: entry,
} }
} }
type timer struct { type Timer struct {
timer *time.Timer timer *time.Timer
Entry *cacheEntryExpiry Entry *CacheEntryExpiry
} }
func (t *timer) Wait() <-chan time.Time { func (t *Timer) Wait() <-chan time.Time {
if t.timer == nil { if t.timer == nil {
return nil return nil
} }
return t.timer.C return t.timer.C
} }
func (t *timer) Stop() { func (t *Timer) Stop() {
if t.timer != nil { if t.timer != nil {
t.timer.Stop() t.timer.Stop()
} }

View file

@ -6,46 +6,43 @@ import (
"time" "time"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
) )
var _ heap.Interface = new(expiryHeap) var _ heap.Interface = (*entryHeap)(nil)
func TestExpiryHeap(t *testing.T) { func TestExpiryHeap(t *testing.T) {
require := require.New(t) h := NewExpiryHeap()
ch := make(chan struct{}, 10) // buffered to prevent blocking in tests ch := h.NotifyCh
h := &expiryHeap{NotifyCh: ch} var entry, entry2, entry3 *CacheEntryExpiry
var entry, entry2, entry3 *cacheEntryExpiry
// Init, shouldn't trigger anything // Init, shouldn't trigger anything
heap.Init(h)
testNoMessage(t, ch) testNoMessage(t, ch)
runStep(t, "add an entry", func(t *testing.T) { runStep(t, "add an entry", func(t *testing.T) {
entry = h.Add("foo", 100*time.Millisecond) entry = h.Add("foo", 100*time.Millisecond)
require.Equal(0, entry.HeapIndex) assert.Equal(t, 0, entry.HeapIndex)
testMessage(t, ch) testMessage(t, ch)
testNoMessage(t, ch) // exactly one asserted above testNoMessage(t, ch) // exactly one asserted above
}) })
runStep(t, "add a second entry in front", func(t *testing.T) { runStep(t, "add a second entry in front", func(t *testing.T) {
entry2 = h.Add("bar", 50*time.Millisecond) entry2 = h.Add("bar", 50*time.Millisecond)
require.Equal(0, entry2.HeapIndex) assert.Equal(t, 0, entry2.HeapIndex)
require.Equal(1, entry.HeapIndex) assert.Equal(t, 1, entry.HeapIndex)
testMessage(t, ch) testMessage(t, ch)
testNoMessage(t, ch) // exactly one asserted above testNoMessage(t, ch) // exactly one asserted above
}) })
runStep(t, "add a third entry at the end", func(t *testing.T) { runStep(t, "add a third entry at the end", func(t *testing.T) {
entry3 = h.Add("baz", 1000*time.Millisecond) entry3 = h.Add("baz", 1000*time.Millisecond)
require.Equal(2, entry3.HeapIndex) assert.Equal(t, 2, entry3.HeapIndex)
testNoMessage(t, ch) // no notify cause index 0 stayed the same testNoMessage(t, ch) // no notify cause index 0 stayed the same
}) })
runStep(t, "remove the first entry", func(t *testing.T) { runStep(t, "remove the first entry", func(t *testing.T) {
h.Remove(0) h.Remove(0)
require.Equal(0, entry.HeapIndex) assert.Equal(t, 0, entry.HeapIndex)
require.Equal(1, entry3.HeapIndex) assert.Equal(t, 1, entry3.HeapIndex)
testMessage(t, ch) testMessage(t, ch)
testNoMessage(t, ch) testNoMessage(t, ch)
}) })
@ -60,8 +57,8 @@ func TestExpiryHeap(t *testing.T) {
runStep(t, "0th element change triggers a notify", func(t *testing.T) { runStep(t, "0th element change triggers a notify", func(t *testing.T) {
h.Update(entry3.HeapIndex, 20) h.Update(entry3.HeapIndex, 20)
require.Equal(1, entry.HeapIndex) // no move assert.Equal(t, 1, entry.HeapIndex) // no move
require.Equal(0, entry3.HeapIndex) assert.Equal(t, 0, entry3.HeapIndex)
testMessage(t, ch) testMessage(t, ch)
testNoMessage(t, ch) // one message testNoMessage(t, ch) // one message
}) })
@ -97,7 +94,7 @@ func TestExpiryLoop_ExitsWhenStopped(t *testing.T) {
c := &Cache{ c := &Cache{
stopCh: make(chan struct{}), stopCh: make(chan struct{}),
entries: make(map[string]cacheEntry), entries: make(map[string]cacheEntry),
entriesExpiryHeap: newExpiryHeap(), entriesExpiryHeap: NewExpiryHeap(),
} }
chStart := make(chan struct{}) chStart := make(chan struct{})
chDone := make(chan struct{}) chDone := make(chan struct{})