From 872bb9db149e8817930a57c03a61c643e5224a35 Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Tue, 27 Apr 2021 18:15:57 -0400 Subject: [PATCH] submatview: avoid sorting results unnecessarily Previous getFromView would call view.Result when the result may not have been returned (because the index is updated past the minIndex. This would allocate a slice and sort it for no reason, because the values would never be returned. Fix this by re-ordering the operations in getFromView. The test changes in this commit were an attempt to cover the case where an update is received but the index does not exceed the minIndex. --- agent/submatview/materializer.go | 27 +++++++++++----------- agent/submatview/store.go | 5 ---- agent/submatview/store_test.go | 39 ++++++++++++++++++++++---------- 3 files changed, 41 insertions(+), 30 deletions(-) diff --git a/agent/submatview/materializer.go b/agent/submatview/materializer.go index 413b5d3f1..51402987d 100644 --- a/agent/submatview/materializer.go +++ b/agent/submatview/materializer.go @@ -235,8 +235,7 @@ func (m *Materializer) getFromView(ctx context.Context, minIndex uint64) (Result // If our index is > req.Index return right away. If index is zero then we // haven't loaded a snapshot at all yet which means we should wait for one on - // the update chan. Note it's opts.MinIndex that the cache is using here the - // request min index might be different and from initial user request. + // the update chan. if result.Index > 0 && result.Index > minIndex { return result, nil } @@ -248,26 +247,28 @@ func (m *Materializer) getFromView(ctx context.Context, minIndex uint64) (Result m.lock.Lock() result.Index = m.index - if m.err != nil { + switch { + case m.err != nil: err := m.err m.lock.Unlock() return result, err - } - - result.Value = m.view.Result(m.index) - // Grab the new updateCh in case we need to keep waiting for the next update. - updateCh = m.updateCh - m.lock.Unlock() - - if result.Index <= minIndex { - // The result is still older/same as the requested index, continue to - // wait for further updates. + case result.Index <= minIndex: + // get a reference to the new updateCh, the previous one was closed + updateCh = m.updateCh + m.lock.Unlock() continue } + result.Value = m.view.Result(m.index) + m.lock.Unlock() return result, nil case <-ctx.Done(): + // Update the result value to the latest because callers may still + // use the value when the error is context.DeadlineExceeded + m.lock.Lock() + result.Value = m.view.Result(m.index) + m.lock.Unlock() return result, ctx.Err() } } diff --git a/agent/submatview/store.go b/agent/submatview/store.go index aaf083d94..e7f9a18e7 100644 --- a/agent/submatview/store.go +++ b/agent/submatview/store.go @@ -154,10 +154,6 @@ func (s *Store) Notify( case ctx.Err() != nil: return case err != nil: - // TODO: cache.Notify sends errors on updateCh, should this do the same? - // It seems like only fetch errors would ever get sent along and eventually - // logged, so sending may not provide any benefit here. - s.logger.Warn("handling error in Store.Notify", "error", err, "request-type", req.Type(), @@ -170,7 +166,6 @@ func (s *Store) Notify( CorrelationID: correlationID, Result: result.Value, Meta: cache.ResultMeta{Index: result.Index}, - Err: err, } select { case updateCh <- u: diff --git a/agent/submatview/store_test.go b/agent/submatview/store_test.go index 4f1e4d11a..ea0e01c2b 100644 --- a/agent/submatview/store_test.go +++ b/agent/submatview/store_test.go @@ -77,14 +77,28 @@ func TestStore_Get(t *testing.T) { require.Equal(t, store.expiryHeap.Next().Entry, e.expiry) }) - runStep(t, "blocks with an index that is not yet in the view", func(t *testing.T) { - req.index = 23 + chResult := make(chan resultOrError, 1) + req.index = 40 + go func() { + result, err := store.Get(ctx, req) + chResult <- resultOrError{Result: result, Err: err} + }() - chResult := make(chan resultOrError, 1) - go func() { - result, err := store.Get(ctx, req) - chResult <- resultOrError{Result: result, Err: err} - }() + runStep(t, "blocks with an index that is not yet in the view", func(t *testing.T) { + select { + case <-chResult: + t.Fatalf("expected Get to block") + case <-time.After(50 * time.Millisecond): + } + + store.lock.Lock() + e := store.byKey[makeEntryKey(req.Type(), req.CacheInfo())] + store.lock.Unlock() + require.Equal(t, 1, e.requests) + }) + + runStep(t, "blocks when an event is received but the index is still below minIndex", func(t *testing.T) { + req.client.QueueEvents(newEventServiceHealthRegister(24, 1, "srv1")) select { case <-chResult: @@ -96,9 +110,10 @@ func TestStore_Get(t *testing.T) { e := store.byKey[makeEntryKey(req.Type(), req.CacheInfo())] store.lock.Unlock() require.Equal(t, 1, e.requests) + }) - req.client.QueueEvents(newEventServiceHealthRegister(24, 1, "srv1")) - + runStep(t, "unblocks when an event with index past minIndex", func(t *testing.T) { + req.client.QueueEvents(newEventServiceHealthRegister(41, 1, "srv1")) var getResult resultOrError select { case getResult = <-chResult: @@ -107,17 +122,17 @@ func TestStore_Get(t *testing.T) { } require.NoError(t, getResult.Err) - require.Equal(t, uint64(24), getResult.Result.Index) + require.Equal(t, uint64(41), getResult.Result.Index) r, ok := getResult.Result.Value.(fakeResult) require.True(t, ok) require.Len(t, r.srvs, 2) - require.Equal(t, uint64(24), r.index) + require.Equal(t, uint64(41), r.index) store.lock.Lock() defer store.lock.Unlock() require.Len(t, store.byKey, 1) - e = store.byKey[makeEntryKey(req.Type(), req.CacheInfo())] + e := store.byKey[makeEntryKey(req.Type(), req.CacheInfo())] require.Equal(t, 0, e.expiry.Index()) require.Equal(t, 0, e.requests)