submatview: rough outline of the Get and Notify methods.

This commit is contained in:
Daniel Nephin 2021-02-22 14:29:45 -05:00
parent c23e98a5e6
commit e4c503c28e
2 changed files with 82 additions and 46 deletions

View File

@ -204,9 +204,8 @@ func (m *Materializer) notifyUpdateLocked(err error) {
m.updateCh = make(chan struct{}) m.updateCh = make(chan struct{})
} }
// Fetch implements the logic a StreamingCacheType will need during it's Fetch // Fetch the value stored in the View. Fetch blocks until the index of the View
// call. Cache types that use streaming should just be able to proxy to this // is greater than opts.MinIndex, or the context is cancelled.
// once they have a subscription object and return it's results directly.
func (m *Materializer) Fetch(done <-chan struct{}, opts cache.FetchOptions) (cache.FetchResult, error) { func (m *Materializer) Fetch(done <-chan struct{}, opts cache.FetchOptions) (cache.FetchResult, error) {
var result cache.FetchResult var result cache.FetchResult

View File

@ -12,26 +12,48 @@ import (
type Store struct { type Store struct {
lock sync.RWMutex lock sync.RWMutex
byKey map[string]*Materializer byKey map[string]entry
expiryHeap *ttlcache.ExpiryHeap expiryHeap *ttlcache.ExpiryHeap
} }
type entry struct {
materializer *Materializer
expiry *ttlcache.Entry
}
// TODO: start expiration loop
func NewStore() *Store { func NewStore() *Store {
return &Store{ return &Store{
byKey: make(map[string]*Materializer), byKey: make(map[string]entry),
expiryHeap: ttlcache.NewExpiryHeap(), expiryHeap: ttlcache.NewExpiryHeap(),
} }
} }
var ttl = 20 * time.Minute
// Get a value from the store, blocking if the store has not yet seen the // Get a value from the store, blocking if the store has not yet seen the
// req.Index value. // req.Index value.
// See agent/cache.Cache.Get for complete documentation. // See agent/cache.Cache.Get for complete documentation.
func (s *Store) Get( func (s *Store) Get(
ctx context.Context, ctx context.Context,
typ string, typ string,
req cache.Request, req Request,
) (result interface{}, meta cache.ResultMeta, err error) { // TODO: only the Index field of ResultMeta is relevant, return a result struct instead.
return nil, cache.ResultMeta{}, nil ) (interface{}, cache.ResultMeta, error) {
info := req.CacheInfo()
key := makeEntryKey(typ, info)
e := s.getEntry(key, req.NewMaterializer)
// TODO: requires a lock to update the heap.
s.expiryHeap.Update(e.expiry.Index(), ttl)
// TODO: no longer any need to return cache.FetchResult from Materializer.Fetch
// TODO: pass context instead of Done chan, also replaces Timeout param
result, err := e.materializer.Fetch(ctx.Done(), cache.FetchOptions{
MinIndex: info.MinIndex,
Timeout: info.Timeout,
})
return result.Value, cache.ResultMeta{Index: result.Index}, err
} }
// Notify the updateCh when there are updates to the entry identified by req. // Notify the updateCh when there are updates to the entry identified by req.
@ -39,60 +61,75 @@ func (s *Store) Get(
func (s *Store) Notify( func (s *Store) Notify(
ctx context.Context, ctx context.Context,
typ string, typ string,
req cache.Request, req Request,
correlationID string, correlationID string,
updateCh chan<- cache.UpdateEvent, updateCh chan<- cache.UpdateEvent,
) error { ) error {
// TODO: set entry to not expire until ctx is cancelled.
info := req.CacheInfo()
key := makeEntryKey(typ, info)
e := s.getEntry(key, req.NewMaterializer)
var index uint64
go func() {
for {
result, err := e.materializer.Fetch(ctx.Done(), cache.FetchOptions{MinIndex: index})
switch {
case ctx.Err() != nil:
return
case err != nil:
// TODO: cache.Notify sends errors on updateCh, should this do the same?
// It seems like only fetch errors would ever get sent along.
// TODO: log warning
continue
}
index = result.Index
u := cache.UpdateEvent{
CorrelationID: correlationID,
Result: result.Value,
Meta: cache.ResultMeta{Index: result.Index},
Err: err,
}
select {
case updateCh <- u:
case <-ctx.Done():
return
}
}
}()
return nil return nil
} }
func (s *Store) getMaterializer(opts GetOptions) *Materializer { func (s *Store) getEntry(key string, newMat func() *Materializer) entry {
// TODO: use makeEntryKey
var key string
s.lock.RLock() s.lock.RLock()
mat, ok := s.byKey[key] e, ok := s.byKey[key]
s.lock.RUnlock() s.lock.RUnlock()
if ok { if ok {
return mat return e
} }
s.lock.Lock() s.lock.Lock()
mat, ok = s.byKey[key] defer s.lock.Unlock()
if !ok { e, ok = s.byKey[key]
mat = opts.NewMaterializer() if ok {
s.byKey[opts.Key] = mat return e
} }
s.lock.Unlock()
return mat e = entry{materializer: newMat()}
s.byKey[key] = e
return e
} }
// makeEntryKey matches agent/cache.makeEntryKey, but may change in the future. // makeEntryKey matches agent/cache.makeEntryKey, but may change in the future.
func makeEntryKey(t, dc, token, key string) string { func makeEntryKey(typ string, r cache.RequestInfo) string {
return fmt.Sprintf("%s/%s/%s/%s", t, dc, token, key) return fmt.Sprintf("%s/%s/%s/%s", typ, r.Datacenter, r.Token, r.Key)
} }
type GetOptions struct { type Request interface {
// TODO: needs to use makeEntryKey cache.Request
Key string NewMaterializer() *Materializer
// MinIndex is the index previously seen by the caller. If MinIndex>0 Fetch
// will not return until the index is >MinIndex, or Timeout is hit.
MinIndex uint64
// TODO: maybe remove and use a context deadline.
Timeout time.Duration
// NewMaterializer returns a new Materializer to be used if the store does
// not have one already running for the given key.
NewMaterializer func() *Materializer
}
type FetchResult struct {
// Value is the result of the fetch.
Value interface{}
// Index is the corresponding index value for this data.
Index uint64
} }