diff --git a/agent/submatview/store.go b/agent/submatview/store.go index 8cdf0a0a8..f5c37173d 100644 --- a/agent/submatview/store.go +++ b/agent/submatview/store.go @@ -20,7 +20,9 @@ type entry struct { materializer *Materializer expiry *ttlcache.Entry stop func() - // TODO: add watchCount + // notifier is the count of active Notify goroutines. This entry will + // remain in the store as long as this count remains > 0. + notifier int } // TODO: start expiration loop @@ -52,11 +54,13 @@ func (s *Store) Run(ctx context.Context) { he := timer.Entry s.expiryHeap.Remove(he.Index()) - // TODO: expiry here - // if e.watchCount == 0 {} e := s.byKey[he.Key()] - e.stop() - //delete(s.entries, entry.Key()) + + // Only stop the materializer if there are no active calls to Notify. + if e.notifier == 0 { + e.stop() + delete(s.byKey, he.Key()) + } s.lock.Unlock() } @@ -77,11 +81,11 @@ func (s *Store) Get( // TODO: only the Index field of ResultMeta is relevant, return a result struct instead. ) (interface{}, cache.ResultMeta, error) { info := req.CacheInfo() - key := makeEntryKey(typ, info) - e := s.getEntry(key, req.NewMaterializer) - - // TODO: requires a lock to update the heap. - //s.expiryHeap.Update(e.expiry.Index(), info.Timeout + ttl) + e := s.getEntry(getEntryOpts{ + typ: typ, + info: info, + newMaterializer: req.NewMaterializer, + }) // TODO: no longer any need to return cache.FetchResult from Materializer.Fetch // TODO: pass context instead of Done chan, also replaces Timeout param @@ -101,15 +105,26 @@ func (s *Store) Notify( correlationID string, updateCh chan<- cache.UpdateEvent, ) error { - // TODO: set entry to not expire until ctx is cancelled. - info := req.CacheInfo() - key := makeEntryKey(typ, info) - e := s.getEntry(key, req.NewMaterializer) - - var index uint64 + e := s.getEntry(getEntryOpts{ + typ: typ, + info: info, + newMaterializer: req.NewMaterializer, + notifier: true, + }) go func() { + index := info.MinIndex + + // TODO: better way to handle this? + defer func() { + s.lock.Lock() + e.notifier-- + s.byKey[e.expiry.Key()] = e + s.expiryHeap.Update(e.expiry.Index(), idleTTL) + s.lock.Unlock() + }() + for { result, err := e.materializer.Fetch(ctx.Done(), cache.FetchOptions{MinIndex: index}) switch { @@ -140,28 +155,33 @@ func (s *Store) Notify( return nil } -func (s *Store) getEntry(key string, newMat func() *Materializer) entry { - s.lock.RLock() - e, ok := s.byKey[key] - s.lock.RUnlock() - if ok { - return e - } +func (s *Store) getEntry(opts getEntryOpts) entry { + info := opts.info + key := makeEntryKey(opts.typ, info) s.lock.Lock() defer s.lock.Unlock() - // Check again after acquiring the write lock, in case we raced to create the entry. - e, ok = s.byKey[key] + e, ok := s.byKey[key] if ok { + s.expiryHeap.Update(e.expiry.Index(), info.Timeout+idleTTL) + if opts.notifier { + e.notifier++ + } return e } - e = entry{materializer: newMat()} - ctx, cancel := context.WithCancel(context.Background()) - e.stop = cancel - go e.materializer.Run(ctx) + mat := opts.newMaterializer() + go mat.Run(ctx) + e = entry{ + materializer: mat, + stop: cancel, + expiry: s.expiryHeap.Add(key, info.Timeout+idleTTL), + } + if opts.notifier { + e.notifier++ + } s.byKey[key] = e return e } @@ -175,3 +195,10 @@ type Request interface { cache.Request NewMaterializer() *Materializer } + +type getEntryOpts struct { + typ string + info cache.RequestInfo + newMaterializer func() *Materializer + notifier bool +}