diff --git a/agent/submatview/materializer.go b/agent/submatview/materializer.go index 413b5d3f1..51402987d 100644 --- a/agent/submatview/materializer.go +++ b/agent/submatview/materializer.go @@ -235,8 +235,7 @@ func (m *Materializer) getFromView(ctx context.Context, minIndex uint64) (Result // If our index is > req.Index return right away. If index is zero then we // haven't loaded a snapshot at all yet which means we should wait for one on - // the update chan. Note it's opts.MinIndex that the cache is using here the - // request min index might be different and from initial user request. + // the update chan. if result.Index > 0 && result.Index > minIndex { return result, nil } @@ -248,26 +247,28 @@ func (m *Materializer) getFromView(ctx context.Context, minIndex uint64) (Result m.lock.Lock() result.Index = m.index - if m.err != nil { + switch { + case m.err != nil: err := m.err m.lock.Unlock() return result, err - } - - result.Value = m.view.Result(m.index) - // Grab the new updateCh in case we need to keep waiting for the next update. - updateCh = m.updateCh - m.lock.Unlock() - - if result.Index <= minIndex { - // The result is still older/same as the requested index, continue to - // wait for further updates. + case result.Index <= minIndex: + // get a reference to the new updateCh, the previous one was closed + updateCh = m.updateCh + m.lock.Unlock() continue } + result.Value = m.view.Result(m.index) + m.lock.Unlock() return result, nil case <-ctx.Done(): + // Update the result value to the latest because callers may still + // use the value when the error is context.DeadlineExceeded + m.lock.Lock() + result.Value = m.view.Result(m.index) + m.lock.Unlock() return result, ctx.Err() } } diff --git a/agent/submatview/store.go b/agent/submatview/store.go index aaf083d94..e7f9a18e7 100644 --- a/agent/submatview/store.go +++ b/agent/submatview/store.go @@ -154,10 +154,6 @@ func (s *Store) Notify( case ctx.Err() != nil: return case err != nil: - // TODO: cache.Notify sends errors on updateCh, should this do the same? - // It seems like only fetch errors would ever get sent along and eventually - // logged, so sending may not provide any benefit here. - s.logger.Warn("handling error in Store.Notify", "error", err, "request-type", req.Type(), @@ -170,7 +166,6 @@ func (s *Store) Notify( CorrelationID: correlationID, Result: result.Value, Meta: cache.ResultMeta{Index: result.Index}, - Err: err, } select { case updateCh <- u: diff --git a/agent/submatview/store_test.go b/agent/submatview/store_test.go index 4f1e4d11a..ea0e01c2b 100644 --- a/agent/submatview/store_test.go +++ b/agent/submatview/store_test.go @@ -77,14 +77,28 @@ func TestStore_Get(t *testing.T) { require.Equal(t, store.expiryHeap.Next().Entry, e.expiry) }) - runStep(t, "blocks with an index that is not yet in the view", func(t *testing.T) { - req.index = 23 + chResult := make(chan resultOrError, 1) + req.index = 40 + go func() { + result, err := store.Get(ctx, req) + chResult <- resultOrError{Result: result, Err: err} + }() - chResult := make(chan resultOrError, 1) - go func() { - result, err := store.Get(ctx, req) - chResult <- resultOrError{Result: result, Err: err} - }() + runStep(t, "blocks with an index that is not yet in the view", func(t *testing.T) { + select { + case <-chResult: + t.Fatalf("expected Get to block") + case <-time.After(50 * time.Millisecond): + } + + store.lock.Lock() + e := store.byKey[makeEntryKey(req.Type(), req.CacheInfo())] + store.lock.Unlock() + require.Equal(t, 1, e.requests) + }) + + runStep(t, "blocks when an event is received but the index is still below minIndex", func(t *testing.T) { + req.client.QueueEvents(newEventServiceHealthRegister(24, 1, "srv1")) select { case <-chResult: @@ -96,9 +110,10 @@ func TestStore_Get(t *testing.T) { e := store.byKey[makeEntryKey(req.Type(), req.CacheInfo())] store.lock.Unlock() require.Equal(t, 1, e.requests) + }) - req.client.QueueEvents(newEventServiceHealthRegister(24, 1, "srv1")) - + runStep(t, "unblocks when an event with index past minIndex", func(t *testing.T) { + req.client.QueueEvents(newEventServiceHealthRegister(41, 1, "srv1")) var getResult resultOrError select { case getResult = <-chResult: @@ -107,17 +122,17 @@ func TestStore_Get(t *testing.T) { } require.NoError(t, getResult.Err) - require.Equal(t, uint64(24), getResult.Result.Index) + require.Equal(t, uint64(41), getResult.Result.Index) r, ok := getResult.Result.Value.(fakeResult) require.True(t, ok) require.Len(t, r.srvs, 2) - require.Equal(t, uint64(24), r.index) + require.Equal(t, uint64(41), r.index) store.lock.Lock() defer store.lock.Unlock() require.Len(t, store.byKey, 1) - e = store.byKey[makeEntryKey(req.Type(), req.CacheInfo())] + e := store.byKey[makeEntryKey(req.Type(), req.CacheInfo())] require.Equal(t, 0, e.expiry.Index()) require.Equal(t, 0, e.requests)