submatview: track requests instead of notifiers

And only start expiration time when the last request ends. This makes tracking expiry simpler, and
ensures that no entry can be expired while there are active requests.
This commit is contained in:
Daniel Nephin 2021-02-23 12:18:44 -05:00
parent 468469bd27
commit f4573177ba
3 changed files with 60 additions and 59 deletions

View File

@ -32,6 +32,7 @@ type View interface {
// separately and passed in in case the return type needs an Index field // separately and passed in in case the return type needs an Index field
// populating. This allows implementations to not worry about maintaining // populating. This allows implementations to not worry about maintaining
// indexes seen during Update. // indexes seen during Update.
// TODO: remove error return value.
Result(index uint64) (interface{}, error) Result(index uint64) (interface{}, error)
// Reset the view to the zero state, done in preparation for receiving a new // Reset the view to the zero state, done in preparation for receiving a new

View File

@ -20,9 +20,9 @@ type entry struct {
materializer *Materializer materializer *Materializer
expiry *ttlcache.Entry expiry *ttlcache.Entry
stop func() stop func()
// notifier is the count of active Notify goroutines. This entry will // requests is the count of active requests using this entry. This entry will
// remain in the store as long as this count remains > 0. // remain in the store as long as this count remains > 0.
notifier int requests int
} }
// TODO: start expiration loop // TODO: start expiration loop
@ -56,8 +56,8 @@ func (s *Store) Run(ctx context.Context) {
e := s.byKey[he.Key()] e := s.byKey[he.Key()]
// Only stop the materializer if there are no active calls to Notify. // Only stop the materializer if there are no active requests.
if e.notifier == 0 { if e.requests == 0 {
e.stop() e.stop()
delete(s.byKey, he.Key()) delete(s.byKey, he.Key())
} }
@ -68,24 +68,23 @@ func (s *Store) Run(ctx context.Context) {
} }
// TODO: godoc // TODO: godoc
var idleTTL = 20 * time.Minute type Request interface {
cache.Request
NewMaterializer() *Materializer
Type() string
}
// Get a value from the store, blocking if the store has not yet seen the // Get a value from the store, blocking if the store has not yet seen the
// req.Index value. // req.Index value.
// See agent/cache.Cache.Get for complete documentation. // See agent/cache.Cache.Get for complete documentation.
func (s *Store) Get( func (s *Store) Get(
ctx context.Context, ctx context.Context,
// TODO: remove typ param, make it part of the Request interface.
typ string,
req Request, req Request,
// TODO: only the Index field of ResultMeta is relevant, return a result struct instead. // TODO: only the Index field of ResultMeta is relevant, return a result struct instead.
) (interface{}, cache.ResultMeta, error) { ) (interface{}, cache.ResultMeta, error) {
info := req.CacheInfo() info := req.CacheInfo()
e := s.getEntry(getEntryOpts{ key, e := s.getEntry(req)
typ: typ, defer s.releaseEntry(key)
info: info,
newMaterializer: req.NewMaterializer,
})
// TODO: no longer any need to return cache.FetchResult from Materializer.Fetch // TODO: no longer any need to return cache.FetchResult from Materializer.Fetch
// TODO: pass context instead of Done chan, also replaces Timeout param // TODO: pass context instead of Done chan, also replaces Timeout param
@ -93,6 +92,7 @@ func (s *Store) Get(
MinIndex: info.MinIndex, MinIndex: info.MinIndex,
Timeout: info.Timeout, Timeout: info.Timeout,
}) })
return result.Value, cache.ResultMeta{Index: result.Index}, err return result.Value, cache.ResultMeta{Index: result.Index}, err
} }
@ -100,31 +100,17 @@ func (s *Store) Get(
// See agent/cache.Cache.Notify for complete documentation. // See agent/cache.Cache.Notify for complete documentation.
func (s *Store) Notify( func (s *Store) Notify(
ctx context.Context, ctx context.Context,
typ string,
req Request, req Request,
correlationID string, correlationID string,
updateCh chan<- cache.UpdateEvent, updateCh chan<- cache.UpdateEvent,
) error { ) error {
info := req.CacheInfo() info := req.CacheInfo()
e := s.getEntry(getEntryOpts{ key, e := s.getEntry(req)
typ: typ,
info: info,
newMaterializer: req.NewMaterializer,
notifier: true,
})
go func() { go func() {
defer s.releaseEntry(key)
index := info.MinIndex index := info.MinIndex
// TODO: better way to handle this?
defer func() {
s.lock.Lock()
e.notifier--
s.byKey[e.expiry.Key()] = e
s.expiryHeap.Update(e.expiry.Index(), idleTTL)
s.lock.Unlock()
}()
for { for {
result, err := e.materializer.Fetch(ctx.Done(), cache.FetchOptions{MinIndex: index}) result, err := e.materializer.Fetch(ctx.Done(), cache.FetchOptions{MinIndex: index})
switch { switch {
@ -149,56 +135,66 @@ func (s *Store) Notify(
case <-ctx.Done(): case <-ctx.Done():
return return
} }
} }
}() }()
return nil return nil
} }
func (s *Store) getEntry(opts getEntryOpts) entry { // getEntry from the store, and increment the requests counter. releaseEntry
info := opts.info // must be called when the request is finished to decrement the counter.
key := makeEntryKey(opts.typ, info) func (s *Store) getEntry(req Request) (string, entry) {
info := req.CacheInfo()
key := makeEntryKey(req.Type(), info)
s.lock.Lock() s.lock.Lock()
defer s.lock.Unlock() defer s.lock.Unlock()
e, ok := s.byKey[key] e, ok := s.byKey[key]
if ok { if ok {
s.expiryHeap.Update(e.expiry.Index(), info.Timeout+idleTTL) e.requests++
if opts.notifier { s.byKey[key] = e
e.notifier++ return key, e
}
return e
} }
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
mat := opts.newMaterializer() mat := req.NewMaterializer()
go mat.Run(ctx) go mat.Run(ctx)
e = entry{ e = entry{
materializer: mat, materializer: mat,
stop: cancel, stop: cancel,
expiry: s.expiryHeap.Add(key, info.Timeout+idleTTL), requests: 1,
}
if opts.notifier {
e.notifier++
} }
s.byKey[key] = e s.byKey[key] = e
return e return key, e
}
// idleTTL is the duration of time an entry should remain in the Store after the
// last request for that entry has been terminated.
var idleTTL = 20 * time.Minute
// releaseEntry decrements the request count and starts an expiry timer if the
// count has reached 0. Must be called once for every call to getEntry.
func (s *Store) releaseEntry(key string) {
s.lock.Lock()
defer s.lock.Unlock()
e := s.byKey[key]
e.requests--
s.byKey[key] = e
if e.requests > 0 {
return
}
if e.expiry.Index() == ttlcache.NotIndexed {
e.expiry = s.expiryHeap.Add(key, idleTTL)
s.byKey[key] = e
return
}
s.expiryHeap.Update(e.expiry.Index(), idleTTL)
} }
// makeEntryKey matches agent/cache.makeEntryKey, but may change in the future. // makeEntryKey matches agent/cache.makeEntryKey, but may change in the future.
func makeEntryKey(typ string, r cache.RequestInfo) string { func makeEntryKey(typ string, r cache.RequestInfo) string {
return fmt.Sprintf("%s/%s/%s/%s", typ, r.Datacenter, r.Token, r.Key) return fmt.Sprintf("%s/%s/%s/%s", typ, r.Datacenter, r.Token, r.Key)
} }
type Request interface {
cache.Request
NewMaterializer() *Materializer
}
type getEntryOpts struct {
typ string
info cache.RequestInfo
newMaterializer func() *Materializer
notifier bool
}

View File

@ -30,7 +30,7 @@ func TestStore_Get_Fresh(t *testing.T) {
newEventServiceHealthRegister(10, 1, "srv1"), newEventServiceHealthRegister(10, 1, "srv1"),
newEventServiceHealthRegister(22, 2, "srv1")) newEventServiceHealthRegister(22, 2, "srv1"))
result, md, err := store.Get(ctx, "test", req) result, md, err := store.Get(ctx, req)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, uint64(22), md.Index) require.Equal(t, uint64(22), md.Index)
@ -39,11 +39,11 @@ func TestStore_Get_Fresh(t *testing.T) {
require.Len(t, r.srvs, 2) require.Len(t, r.srvs, 2)
require.Equal(t, uint64(22), r.index) require.Equal(t, uint64(22), r.index)
store.lock.Lock()
require.Len(t, store.byKey, 1) require.Len(t, store.byKey, 1)
e := store.byKey[makeEntryKey("test", req.CacheInfo())] e := store.byKey[makeEntryKey(req.Type(), req.CacheInfo())]
require.Equal(t, 0, e.expiry.Index()) require.Equal(t, 0, e.expiry.Index())
store.lock.Lock()
defer store.lock.Unlock() defer store.lock.Unlock()
require.Equal(t, store.expiryHeap.Next().Entry, e.expiry) require.Equal(t, store.expiryHeap.Next().Entry, e.expiry)
} }
@ -80,6 +80,10 @@ func (r *fakeRequest) NewMaterializer() *Materializer {
}) })
} }
func (r *fakeRequest) Type() string {
return fmt.Sprintf("%T", r)
}
type fakeView struct { type fakeView struct {
srvs map[string]*pbservice.CheckServiceNode srvs map[string]*pbservice.CheckServiceNode
} }