agent/cache: support timeouts for cache reads and empty fetch results
This commit is contained in:
parent
b0f70f17db
commit
4509589427
44
agent/cache/cache.go
vendored
44
agent/cache/cache.go
vendored
|
@ -136,6 +136,9 @@ func (c *Cache) Get(t string, r Request) (interface{}, error) {
|
|||
// First time through
|
||||
first := true
|
||||
|
||||
// timeoutCh for watching our tmeout
|
||||
var timeoutCh <-chan time.Time
|
||||
|
||||
RETRY_GET:
|
||||
// Get the current value
|
||||
c.entriesLock.RLock()
|
||||
|
@ -164,16 +167,27 @@ RETRY_GET:
|
|||
// No longer our first time through
|
||||
first = false
|
||||
|
||||
// Set our timeout channel if we must
|
||||
if info.Timeout > 0 && timeoutCh == nil {
|
||||
timeoutCh = time.After(info.Timeout)
|
||||
}
|
||||
|
||||
// At this point, we know we either don't have a value at all or the
|
||||
// value we have is too old. We need to wait for new data.
|
||||
waiter, err := c.fetch(t, key, r)
|
||||
waiterCh, err := c.fetch(t, key, r)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Wait on our waiter and then retry the cache load
|
||||
<-waiter
|
||||
goto RETRY_GET
|
||||
select {
|
||||
case <-waiterCh:
|
||||
// Our fetch returned, retry the get from the cache
|
||||
goto RETRY_GET
|
||||
|
||||
case <-timeoutCh:
|
||||
// Timeout on the cache read, just return whatever we have.
|
||||
return entry.Value, nil
|
||||
}
|
||||
}
|
||||
|
||||
// entryKey returns the key for the entry in the cache. See the note
|
||||
|
@ -216,16 +230,26 @@ func (c *Cache) fetch(t, key string, r Request) (<-chan struct{}, error) {
|
|||
// The actual Fetch must be performed in a goroutine.
|
||||
go func() {
|
||||
// Start building the new entry by blocking on the fetch.
|
||||
var newEntry cacheEntry
|
||||
result, err := tEntry.Type.Fetch(FetchOptions{
|
||||
MinIndex: entry.Index,
|
||||
}, r)
|
||||
newEntry.Value = result.Value
|
||||
newEntry.Index = result.Index
|
||||
newEntry.Error = err
|
||||
|
||||
// This is a valid entry with a result
|
||||
newEntry.Valid = true
|
||||
var newEntry cacheEntry
|
||||
if result.Value == nil {
|
||||
// If no value was set, then we do not change the prior entry.
|
||||
// Instead, we just update the waiter to be new so that another
|
||||
// Get will wait on the correct value.
|
||||
newEntry = entry
|
||||
newEntry.Fetching = false
|
||||
} else {
|
||||
// A new value was given, so we create a brand new entry.
|
||||
newEntry.Value = result.Value
|
||||
newEntry.Index = result.Index
|
||||
newEntry.Error = err
|
||||
|
||||
// This is a valid entry with a result
|
||||
newEntry.Valid = true
|
||||
}
|
||||
|
||||
// Create a new waiter that will be used for the next fetch.
|
||||
newEntry.Waiter = make(chan struct{})
|
||||
|
|
71
agent/cache/cache_test.go
vendored
71
agent/cache/cache_test.go
vendored
|
@ -194,6 +194,77 @@ func TestCacheGet_blockingIndex(t *testing.T) {
|
|||
TestCacheGetChResult(t, resultCh, 42)
|
||||
}
|
||||
|
||||
// Test a get with an index set will timeout if the fetch doesn't return
|
||||
// anything.
|
||||
func TestCacheGet_blockingIndexTimeout(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
typ := TestType(t)
|
||||
defer typ.AssertExpectations(t)
|
||||
c := TestCache(t)
|
||||
c.RegisterType("t", typ, nil)
|
||||
|
||||
// Configure the type
|
||||
triggerCh := make(chan time.Time)
|
||||
typ.Static(FetchResult{Value: 1, Index: 4}, nil).Once()
|
||||
typ.Static(FetchResult{Value: 12, Index: 5}, nil).Once()
|
||||
typ.Static(FetchResult{Value: 42, Index: 6}, nil).WaitUntil(triggerCh)
|
||||
|
||||
// Fetch should block
|
||||
resultCh := TestCacheGetCh(t, c, "t", TestRequest(t, RequestInfo{
|
||||
Key: "hello", MinIndex: 5, Timeout: 200 * time.Millisecond}))
|
||||
|
||||
// Should block
|
||||
select {
|
||||
case <-resultCh:
|
||||
t.Fatal("should block")
|
||||
case <-time.After(50 * time.Millisecond):
|
||||
}
|
||||
|
||||
// Should return after more of the timeout
|
||||
select {
|
||||
case result := <-resultCh:
|
||||
require.Equal(t, 12, result)
|
||||
case <-time.After(300 * time.Millisecond):
|
||||
t.Fatal("should've returned")
|
||||
}
|
||||
}
|
||||
|
||||
// Test that if a Type returns an empty value on Fetch that the previous
|
||||
// value is preserved.
|
||||
func TestCacheGet_emptyFetchResult(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
require := require.New(t)
|
||||
|
||||
typ := TestType(t)
|
||||
defer typ.AssertExpectations(t)
|
||||
c := TestCache(t)
|
||||
c.RegisterType("t", typ, nil)
|
||||
|
||||
// Configure the type
|
||||
typ.Static(FetchResult{Value: 42, Index: 1}, nil).Times(1)
|
||||
typ.Static(FetchResult{Value: nil}, nil)
|
||||
|
||||
// Get, should fetch
|
||||
req := TestRequest(t, RequestInfo{Key: "hello"})
|
||||
result, err := c.Get("t", req)
|
||||
require.Nil(err)
|
||||
require.Equal(42, result)
|
||||
|
||||
// Get, should not fetch since we already have a satisfying value
|
||||
req = TestRequest(t, RequestInfo{
|
||||
Key: "hello", MinIndex: 1, Timeout: 100 * time.Millisecond})
|
||||
result, err = c.Get("t", req)
|
||||
require.Nil(err)
|
||||
require.Equal(42, result)
|
||||
|
||||
// Sleep a tiny bit just to let maybe some background calls happen
|
||||
// then verify that we still only got the one call
|
||||
time.Sleep(20 * time.Millisecond)
|
||||
typ.AssertExpectations(t)
|
||||
}
|
||||
|
||||
// Test that a type registered with a periodic refresh will perform
|
||||
// that refresh after the timer is up.
|
||||
func TestCacheGet_periodicRefresh(t *testing.T) {
|
||||
|
|
9
agent/cache/request.go
vendored
9
agent/cache/request.go
vendored
|
@ -1,5 +1,9 @@
|
|||
package cache
|
||||
|
||||
import (
|
||||
"time"
|
||||
)
|
||||
|
||||
// Request is a cache-able request.
|
||||
//
|
||||
// This interface is typically implemented by request structures in
|
||||
|
@ -36,4 +40,9 @@ type RequestInfo struct {
|
|||
// to block until new data is available. If no index is available, the
|
||||
// default value (zero) is acceptable.
|
||||
MinIndex uint64
|
||||
|
||||
// Timeout is the timeout for waiting on a blocking query. When the
|
||||
// timeout is reached, the last known value is returned (or maybe nil
|
||||
// if there was no prior value).
|
||||
Timeout time.Duration
|
||||
}
|
||||
|
|
6
agent/cache/type.go
vendored
6
agent/cache/type.go
vendored
|
@ -15,6 +15,12 @@ type Type interface {
|
|||
//
|
||||
// The return value is a FetchResult which contains information about
|
||||
// the fetch.
|
||||
//
|
||||
// On timeout, FetchResult can behave one of two ways. First, it can
|
||||
// return the last known value. This is the default behavior of blocking
|
||||
// RPC calls in Consul so this allows cache types to be implemented with
|
||||
// no extra logic. Second, FetchResult can return an unset value and index.
|
||||
// In this case, the cache will reuse the last value automatically.
|
||||
Fetch(FetchOptions, Request) (FetchResult, error)
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue