submatview: more test cases for Store.Get

And simplify the return value by using a Result type.
This commit is contained in:
Daniel Nephin 2021-02-23 15:02:28 -05:00
parent f7190b1c61
commit 54a402d772
3 changed files with 105 additions and 26 deletions

View file

@ -215,17 +215,17 @@ func (m *Materializer) notifyUpdateLocked(err error) {
m.updateCh = make(chan struct{}) m.updateCh = make(chan struct{})
} }
type viewResult struct { type Result struct {
Index uint64 Index uint64
Value interface{} Value interface{}
} }
// getFromView blocks until the index of the View is greater than opts.MinIndex, // getFromView blocks until the index of the View is greater than opts.MinIndex,
//or the context is cancelled. //or the context is cancelled.
func (m *Materializer) getFromView(ctx context.Context, minIndex uint64) (viewResult, error) { func (m *Materializer) getFromView(ctx context.Context, minIndex uint64) (Result, error) {
m.lock.Lock() m.lock.Lock()
result := viewResult{ result := Result{
Index: m.index, Index: m.index,
Value: m.view.Result(m.index), Value: m.view.Result(m.index),
} }

View file

@ -25,7 +25,6 @@ type entry struct {
requests int requests int
} }
// TODO: start expiration loop
func NewStore() *Store { func NewStore() *Store {
return &Store{ return &Store{
byKey: make(map[string]entry), byKey: make(map[string]entry),
@ -77,11 +76,7 @@ type Request interface {
// Get a value from the store, blocking if the store has not yet seen the // Get a value from the store, blocking if the store has not yet seen the
// req.Index value. // req.Index value.
// See agent/cache.Cache.Get for complete documentation. // See agent/cache.Cache.Get for complete documentation.
func (s *Store) Get( func (s *Store) Get(ctx context.Context, req Request) (Result, error) {
ctx context.Context,
req Request,
// TODO: only the Index field of ResultMeta is relevant, return a result struct instead.
) (interface{}, cache.ResultMeta, error) {
info := req.CacheInfo() info := req.CacheInfo()
key, e := s.getEntry(req) key, e := s.getEntry(req)
defer s.releaseEntry(key) defer s.releaseEntry(key)
@ -94,7 +89,7 @@ func (s *Store) Get(
// TODO: does context.DeadlineExceeded need to be translated into a nil error // TODO: does context.DeadlineExceeded need to be translated into a nil error
// to match the old interface? // to match the old interface?
return result.Value, cache.ResultMeta{Index: result.Index}, err return result, err
} }
// Notify the updateCh when there are updates to the entry identified by req. // Notify the updateCh when there are updates to the entry identified by req.

View file

@ -15,7 +15,7 @@ import (
"github.com/hashicorp/consul/proto/pbsubscribe" "github.com/hashicorp/consul/proto/pbsubscribe"
) )
func TestStore_Get_Fresh(t *testing.T) { func TestStore_Get(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
@ -30,25 +30,102 @@ func TestStore_Get_Fresh(t *testing.T) {
newEventServiceHealthRegister(10, 1, "srv1"), newEventServiceHealthRegister(10, 1, "srv1"),
newEventServiceHealthRegister(22, 2, "srv1")) newEventServiceHealthRegister(22, 2, "srv1"))
result, md, err := store.Get(ctx, req) runStep(t, "from empty store, starts materializer", func(t *testing.T) {
result, err := store.Get(ctx, req)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, uint64(22), md.Index) require.Equal(t, uint64(22), result.Index)
r, ok := result.(fakeResult) r, ok := result.Value.(fakeResult)
require.True(t, ok) require.True(t, ok)
require.Len(t, r.srvs, 2) require.Len(t, r.srvs, 2)
require.Equal(t, uint64(22), r.index) require.Equal(t, uint64(22), r.index)
store.lock.Lock() store.lock.Lock()
defer store.lock.Unlock()
require.Len(t, store.byKey, 1) require.Len(t, store.byKey, 1)
e := store.byKey[makeEntryKey(req.Type(), req.CacheInfo())] e := store.byKey[makeEntryKey(req.Type(), req.CacheInfo())]
require.Equal(t, 0, e.expiry.Index()) require.Equal(t, 0, e.expiry.Index())
require.Equal(t, 0, e.requests)
defer store.lock.Unlock()
require.Equal(t, store.expiryHeap.Next().Entry, e.expiry) require.Equal(t, store.expiryHeap.Next().Entry, e.expiry)
})
runStep(t, "with an index that already exists in the view", func(t *testing.T) {
req.index = 21
result, err := store.Get(ctx, req)
require.NoError(t, err)
require.Equal(t, uint64(22), result.Index)
r, ok := result.Value.(fakeResult)
require.True(t, ok)
require.Len(t, r.srvs, 2)
require.Equal(t, uint64(22), r.index)
store.lock.Lock()
defer store.lock.Unlock()
require.Len(t, store.byKey, 1)
e := store.byKey[makeEntryKey(req.Type(), req.CacheInfo())]
require.Equal(t, 0, e.expiry.Index())
require.Equal(t, 0, e.requests)
require.Equal(t, store.expiryHeap.Next().Entry, e.expiry)
})
runStep(t, "blocks with an index that is not yet in the view", func(t *testing.T) {
req.index = 23
chResult := make(chan resultOrError, 1)
go func() {
result, err := store.Get(ctx, req)
chResult <- resultOrError{Result: result, Err: err}
}()
select {
case <-chResult:
t.Fatalf("expected Get to block")
case <-time.After(50 * time.Millisecond):
}
store.lock.Lock()
e := store.byKey[makeEntryKey(req.Type(), req.CacheInfo())]
store.lock.Unlock()
require.Equal(t, 1, e.requests)
req.client.QueueEvents(newEventServiceHealthRegister(24, 1, "srv1"))
var getResult resultOrError
select {
case getResult = <-chResult:
case <-time.After(100 * time.Millisecond):
t.Fatalf("expected Get to unblock when new events are received")
}
require.NoError(t, getResult.Err)
require.Equal(t, uint64(24), getResult.Result.Index)
r, ok := getResult.Result.Value.(fakeResult)
require.True(t, ok)
require.Len(t, r.srvs, 2)
require.Equal(t, uint64(24), r.index)
store.lock.Lock()
defer store.lock.Unlock()
require.Len(t, store.byKey, 1)
e = store.byKey[makeEntryKey(req.Type(), req.CacheInfo())]
require.Equal(t, 0, e.expiry.Index())
require.Equal(t, 0, e.requests)
require.Equal(t, store.expiryHeap.Next().Entry, e.expiry)
})
}
type resultOrError struct {
Result Result
Err error
} }
type fakeRequest struct { type fakeRequest struct {
index uint64
client *TestStreamingClient client *TestStreamingClient
} }
@ -58,6 +135,7 @@ func (r *fakeRequest) CacheInfo() cache.RequestInfo {
Token: "abcd", Token: "abcd",
Datacenter: "dc1", Datacenter: "dc1",
Timeout: 4 * time.Second, Timeout: 4 * time.Second,
MinIndex: r.index,
} }
} }
@ -125,8 +203,7 @@ func (f *fakeView) Reset() {
f.srvs = make(map[string]*pbservice.CheckServiceNode) f.srvs = make(map[string]*pbservice.CheckServiceNode)
} }
// TODO: Get with an entry that already has index // TODO: Get with Notify
// TODO: Get with an entry that is not yet at index
func TestStore_Notify(t *testing.T) { func TestStore_Notify(t *testing.T) {
// TODO: Notify with no existing entry // TODO: Notify with no existing entry
@ -134,3 +211,10 @@ func TestStore_Notify(t *testing.T) {
// TODO: Notify multiple times same key // TODO: Notify multiple times same key
// TODO: Notify no update if index is not past MinIndex. // TODO: Notify no update if index is not past MinIndex.
} }
func runStep(t *testing.T, name string, fn func(t *testing.T)) {
t.Helper()
if !t.Run(name, fn) {
t.FailNow()
}
}