submatview: avoid sorting results unnecessarily

Previous getFromView would call view.Result when the result may not have been returned
(because the index is updated past the minIndex. This would allocate a slice and sort it
for no reason, because the values would never be returned.

Fix this by re-ordering the operations in getFromView.

The test changes in this commit were an attempt to cover the case where
an update is received but the index does not exceed the minIndex.
This commit is contained in:
Daniel Nephin 2021-04-27 18:15:57 -04:00
parent cf8520d85c
commit 872bb9db14
3 changed files with 41 additions and 30 deletions

View File

@ -235,8 +235,7 @@ func (m *Materializer) getFromView(ctx context.Context, minIndex uint64) (Result
// If our index is > req.Index return right away. If index is zero then we // If our index is > req.Index return right away. If index is zero then we
// haven't loaded a snapshot at all yet which means we should wait for one on // haven't loaded a snapshot at all yet which means we should wait for one on
// the update chan. Note it's opts.MinIndex that the cache is using here the // the update chan.
// request min index might be different and from initial user request.
if result.Index > 0 && result.Index > minIndex { if result.Index > 0 && result.Index > minIndex {
return result, nil return result, nil
} }
@ -248,26 +247,28 @@ func (m *Materializer) getFromView(ctx context.Context, minIndex uint64) (Result
m.lock.Lock() m.lock.Lock()
result.Index = m.index result.Index = m.index
if m.err != nil { switch {
case m.err != nil:
err := m.err err := m.err
m.lock.Unlock() m.lock.Unlock()
return result, err return result, err
} case result.Index <= minIndex:
// get a reference to the new updateCh, the previous one was closed
result.Value = m.view.Result(m.index) updateCh = m.updateCh
// Grab the new updateCh in case we need to keep waiting for the next update. m.lock.Unlock()
updateCh = m.updateCh
m.lock.Unlock()
if result.Index <= minIndex {
// The result is still older/same as the requested index, continue to
// wait for further updates.
continue continue
} }
result.Value = m.view.Result(m.index)
m.lock.Unlock()
return result, nil return result, nil
case <-ctx.Done(): case <-ctx.Done():
// Update the result value to the latest because callers may still
// use the value when the error is context.DeadlineExceeded
m.lock.Lock()
result.Value = m.view.Result(m.index)
m.lock.Unlock()
return result, ctx.Err() return result, ctx.Err()
} }
} }

View File

@ -154,10 +154,6 @@ func (s *Store) Notify(
case ctx.Err() != nil: case ctx.Err() != nil:
return return
case err != nil: case err != nil:
// TODO: cache.Notify sends errors on updateCh, should this do the same?
// It seems like only fetch errors would ever get sent along and eventually
// logged, so sending may not provide any benefit here.
s.logger.Warn("handling error in Store.Notify", s.logger.Warn("handling error in Store.Notify",
"error", err, "error", err,
"request-type", req.Type(), "request-type", req.Type(),
@ -170,7 +166,6 @@ func (s *Store) Notify(
CorrelationID: correlationID, CorrelationID: correlationID,
Result: result.Value, Result: result.Value,
Meta: cache.ResultMeta{Index: result.Index}, Meta: cache.ResultMeta{Index: result.Index},
Err: err,
} }
select { select {
case updateCh <- u: case updateCh <- u:

View File

@ -77,14 +77,28 @@ func TestStore_Get(t *testing.T) {
require.Equal(t, store.expiryHeap.Next().Entry, e.expiry) require.Equal(t, store.expiryHeap.Next().Entry, e.expiry)
}) })
runStep(t, "blocks with an index that is not yet in the view", func(t *testing.T) { chResult := make(chan resultOrError, 1)
req.index = 23 req.index = 40
go func() {
result, err := store.Get(ctx, req)
chResult <- resultOrError{Result: result, Err: err}
}()
chResult := make(chan resultOrError, 1) runStep(t, "blocks with an index that is not yet in the view", func(t *testing.T) {
go func() { select {
result, err := store.Get(ctx, req) case <-chResult:
chResult <- resultOrError{Result: result, Err: err} t.Fatalf("expected Get to block")
}() case <-time.After(50 * time.Millisecond):
}
store.lock.Lock()
e := store.byKey[makeEntryKey(req.Type(), req.CacheInfo())]
store.lock.Unlock()
require.Equal(t, 1, e.requests)
})
runStep(t, "blocks when an event is received but the index is still below minIndex", func(t *testing.T) {
req.client.QueueEvents(newEventServiceHealthRegister(24, 1, "srv1"))
select { select {
case <-chResult: case <-chResult:
@ -96,9 +110,10 @@ func TestStore_Get(t *testing.T) {
e := store.byKey[makeEntryKey(req.Type(), req.CacheInfo())] e := store.byKey[makeEntryKey(req.Type(), req.CacheInfo())]
store.lock.Unlock() store.lock.Unlock()
require.Equal(t, 1, e.requests) require.Equal(t, 1, e.requests)
})
req.client.QueueEvents(newEventServiceHealthRegister(24, 1, "srv1")) runStep(t, "unblocks when an event with index past minIndex", func(t *testing.T) {
req.client.QueueEvents(newEventServiceHealthRegister(41, 1, "srv1"))
var getResult resultOrError var getResult resultOrError
select { select {
case getResult = <-chResult: case getResult = <-chResult:
@ -107,17 +122,17 @@ func TestStore_Get(t *testing.T) {
} }
require.NoError(t, getResult.Err) require.NoError(t, getResult.Err)
require.Equal(t, uint64(24), getResult.Result.Index) require.Equal(t, uint64(41), getResult.Result.Index)
r, ok := getResult.Result.Value.(fakeResult) r, ok := getResult.Result.Value.(fakeResult)
require.True(t, ok) require.True(t, ok)
require.Len(t, r.srvs, 2) require.Len(t, r.srvs, 2)
require.Equal(t, uint64(24), r.index) require.Equal(t, uint64(41), r.index)
store.lock.Lock() store.lock.Lock()
defer store.lock.Unlock() defer store.lock.Unlock()
require.Len(t, store.byKey, 1) require.Len(t, store.byKey, 1)
e = store.byKey[makeEntryKey(req.Type(), req.CacheInfo())] e := store.byKey[makeEntryKey(req.Type(), req.CacheInfo())]
require.Equal(t, 0, e.expiry.Index()) require.Equal(t, 0, e.expiry.Index())
require.Equal(t, 0, e.requests) require.Equal(t, 0, e.requests)