From f4573177bacf29fe0747e58d97b203f4cd1f8fdf Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Tue, 23 Feb 2021 12:18:44 -0500 Subject: [PATCH] submatview: track requests instead of notifiers And only start expiration time when the last request ends. This makes tracking expiry simpler, and ensures that no entry can be expired while there are active requests. --- agent/submatview/materializer.go | 1 + agent/submatview/store.go | 108 +++++++++++++++---------------- agent/submatview/store_test.go | 10 ++- 3 files changed, 60 insertions(+), 59 deletions(-) diff --git a/agent/submatview/materializer.go b/agent/submatview/materializer.go index 85ac163ba..e9c8fb0c2 100644 --- a/agent/submatview/materializer.go +++ b/agent/submatview/materializer.go @@ -32,6 +32,7 @@ type View interface { // separately and passed in in case the return type needs an Index field // populating. This allows implementations to not worry about maintaining // indexes seen during Update. + // TODO: remove error return value. Result(index uint64) (interface{}, error) // Reset the view to the zero state, done in preparation for receiving a new diff --git a/agent/submatview/store.go b/agent/submatview/store.go index f5c37173d..69bcf8d41 100644 --- a/agent/submatview/store.go +++ b/agent/submatview/store.go @@ -20,9 +20,9 @@ type entry struct { materializer *Materializer expiry *ttlcache.Entry stop func() - // notifier is the count of active Notify goroutines. This entry will + // requests is the count of active requests using this entry. This entry will // remain in the store as long as this count remains > 0. - notifier int + requests int } // TODO: start expiration loop @@ -56,8 +56,8 @@ func (s *Store) Run(ctx context.Context) { e := s.byKey[he.Key()] - // Only stop the materializer if there are no active calls to Notify. - if e.notifier == 0 { + // Only stop the materializer if there are no active requests. + if e.requests == 0 { e.stop() delete(s.byKey, he.Key()) } @@ -68,24 +68,23 @@ func (s *Store) Run(ctx context.Context) { } // TODO: godoc -var idleTTL = 20 * time.Minute +type Request interface { + cache.Request + NewMaterializer() *Materializer + Type() string +} // Get a value from the store, blocking if the store has not yet seen the // req.Index value. // See agent/cache.Cache.Get for complete documentation. func (s *Store) Get( ctx context.Context, - // TODO: remove typ param, make it part of the Request interface. - typ string, req Request, // TODO: only the Index field of ResultMeta is relevant, return a result struct instead. ) (interface{}, cache.ResultMeta, error) { info := req.CacheInfo() - e := s.getEntry(getEntryOpts{ - typ: typ, - info: info, - newMaterializer: req.NewMaterializer, - }) + key, e := s.getEntry(req) + defer s.releaseEntry(key) // TODO: no longer any need to return cache.FetchResult from Materializer.Fetch // TODO: pass context instead of Done chan, also replaces Timeout param @@ -93,6 +92,7 @@ func (s *Store) Get( MinIndex: info.MinIndex, Timeout: info.Timeout, }) + return result.Value, cache.ResultMeta{Index: result.Index}, err } @@ -100,31 +100,17 @@ func (s *Store) Get( // See agent/cache.Cache.Notify for complete documentation. func (s *Store) Notify( ctx context.Context, - typ string, req Request, correlationID string, updateCh chan<- cache.UpdateEvent, ) error { info := req.CacheInfo() - e := s.getEntry(getEntryOpts{ - typ: typ, - info: info, - newMaterializer: req.NewMaterializer, - notifier: true, - }) + key, e := s.getEntry(req) go func() { + defer s.releaseEntry(key) + index := info.MinIndex - - // TODO: better way to handle this? - defer func() { - s.lock.Lock() - e.notifier-- - s.byKey[e.expiry.Key()] = e - s.expiryHeap.Update(e.expiry.Index(), idleTTL) - s.lock.Unlock() - }() - for { result, err := e.materializer.Fetch(ctx.Done(), cache.FetchOptions{MinIndex: index}) switch { @@ -149,56 +135,66 @@ func (s *Store) Notify( case <-ctx.Done(): return } - } }() return nil } -func (s *Store) getEntry(opts getEntryOpts) entry { - info := opts.info - key := makeEntryKey(opts.typ, info) +// getEntry from the store, and increment the requests counter. releaseEntry +// must be called when the request is finished to decrement the counter. +func (s *Store) getEntry(req Request) (string, entry) { + info := req.CacheInfo() + key := makeEntryKey(req.Type(), info) s.lock.Lock() defer s.lock.Unlock() e, ok := s.byKey[key] if ok { - s.expiryHeap.Update(e.expiry.Index(), info.Timeout+idleTTL) - if opts.notifier { - e.notifier++ - } - return e + e.requests++ + s.byKey[key] = e + return key, e } ctx, cancel := context.WithCancel(context.Background()) - mat := opts.newMaterializer() + mat := req.NewMaterializer() go mat.Run(ctx) e = entry{ materializer: mat, stop: cancel, - expiry: s.expiryHeap.Add(key, info.Timeout+idleTTL), - } - if opts.notifier { - e.notifier++ + requests: 1, } s.byKey[key] = e - return e + return key, e +} + +// idleTTL is the duration of time an entry should remain in the Store after the +// last request for that entry has been terminated. +var idleTTL = 20 * time.Minute + +// releaseEntry decrements the request count and starts an expiry timer if the +// count has reached 0. Must be called once for every call to getEntry. +func (s *Store) releaseEntry(key string) { + s.lock.Lock() + defer s.lock.Unlock() + e := s.byKey[key] + e.requests-- + s.byKey[key] = e + + if e.requests > 0 { + return + } + + if e.expiry.Index() == ttlcache.NotIndexed { + e.expiry = s.expiryHeap.Add(key, idleTTL) + s.byKey[key] = e + return + } + + s.expiryHeap.Update(e.expiry.Index(), idleTTL) } // makeEntryKey matches agent/cache.makeEntryKey, but may change in the future. func makeEntryKey(typ string, r cache.RequestInfo) string { return fmt.Sprintf("%s/%s/%s/%s", typ, r.Datacenter, r.Token, r.Key) } - -type Request interface { - cache.Request - NewMaterializer() *Materializer -} - -type getEntryOpts struct { - typ string - info cache.RequestInfo - newMaterializer func() *Materializer - notifier bool -} diff --git a/agent/submatview/store_test.go b/agent/submatview/store_test.go index a567720fa..97eed9ccc 100644 --- a/agent/submatview/store_test.go +++ b/agent/submatview/store_test.go @@ -30,7 +30,7 @@ func TestStore_Get_Fresh(t *testing.T) { newEventServiceHealthRegister(10, 1, "srv1"), newEventServiceHealthRegister(22, 2, "srv1")) - result, md, err := store.Get(ctx, "test", req) + result, md, err := store.Get(ctx, req) require.NoError(t, err) require.Equal(t, uint64(22), md.Index) @@ -39,11 +39,11 @@ func TestStore_Get_Fresh(t *testing.T) { require.Len(t, r.srvs, 2) require.Equal(t, uint64(22), r.index) + store.lock.Lock() require.Len(t, store.byKey, 1) - e := store.byKey[makeEntryKey("test", req.CacheInfo())] + e := store.byKey[makeEntryKey(req.Type(), req.CacheInfo())] require.Equal(t, 0, e.expiry.Index()) - store.lock.Lock() defer store.lock.Unlock() require.Equal(t, store.expiryHeap.Next().Entry, e.expiry) } @@ -80,6 +80,10 @@ func (r *fakeRequest) NewMaterializer() *Materializer { }) } +func (r *fakeRequest) Type() string { + return fmt.Sprintf("%T", r) +} + type fakeView struct { srvs map[string]*pbservice.CheckServiceNode }