Make useCache explicit everywhere in lock manager (#6035)

* Make useCache explicit everywhere in lock manager

This also clears up a case where we could insert into the cache when it
wasn't active

* Address feedback
This commit is contained in:
Jeff Mitchell 2019-01-14 11:58:03 -05:00 committed by Brian Kassouf
parent c07c946b7b
commit 4dfb25927a

View file

@ -74,8 +74,10 @@ func (lm *LockManager) CacheActive() bool {
} }
func (lm *LockManager) InvalidatePolicy(name string) { func (lm *LockManager) InvalidatePolicy(name string) {
if lm.useCache {
lm.cache.Delete(name) lm.cache.Delete(name)
} }
}
// RestorePolicy acquires an exclusive lock on the policy name and restores the // RestorePolicy acquires an exclusive lock on the policy name and restores the
// given policy along with the archive. // given policy along with the archive.
@ -103,13 +105,18 @@ func (lm *LockManager) RestorePolicy(ctx context.Context, storage logical.Storag
lock.Lock() lock.Lock()
defer lock.Unlock() defer lock.Unlock()
var ok bool
var pRaw interface{}
// If the policy is in cache and 'force' is not specified, error out. Anywhere // If the policy is in cache and 'force' is not specified, error out. Anywhere
// that would put it in the cache will also be protected by the mutex above, // that would put it in the cache will also be protected by the mutex above,
// so we don't need to re-check the cache later. // so we don't need to re-check the cache later.
pRaw, ok := lm.cache.Load(name) if lm.useCache {
pRaw, ok = lm.cache.Load(name)
if ok && !force { if ok && !force {
return fmt.Errorf("key %q already exists", name) return fmt.Errorf("key %q already exists", name)
} }
}
// Conditionally look up the policy from storage, depending on the use of // Conditionally look up the policy from storage, depending on the use of
// 'force' and if the policy was found in cache. // 'force' and if the policy was found in cache.
@ -168,7 +175,9 @@ func (lm *LockManager) RestorePolicy(ctx context.Context, storage logical.Storag
keyData.Policy.l = new(sync.RWMutex) keyData.Policy.l = new(sync.RWMutex)
// Update the cache to contain the restored policy // Update the cache to contain the restored policy
if lm.useCache {
lm.cache.Store(name, keyData.Policy) lm.cache.Store(name, keyData.Policy)
}
return nil return nil
} }
@ -183,7 +192,12 @@ func (lm *LockManager) BackupPolicy(ctx context.Context, storage logical.Storage
lock.Lock() lock.Lock()
defer lock.Unlock() defer lock.Unlock()
pRaw, ok := lm.cache.Load(name) var ok bool
var pRaw interface{}
if lm.useCache {
pRaw, ok = lm.cache.Load(name)
}
if ok { if ok {
p = pRaw.(*Policy) p = pRaw.(*Policy)
p.l.Lock() p.l.Lock()
@ -216,9 +230,13 @@ func (lm *LockManager) BackupPolicy(ctx context.Context, storage logical.Storage
func (lm *LockManager) GetPolicy(ctx context.Context, req PolicyRequest) (retP *Policy, retUpserted bool, retErr error) { func (lm *LockManager) GetPolicy(ctx context.Context, req PolicyRequest) (retP *Policy, retUpserted bool, retErr error) {
var p *Policy var p *Policy
var err error var err error
var ok bool
var pRaw interface{}
// Check if it's in our cache. If so, return right away. // Check if it's in our cache. If so, return right away.
pRaw, ok := lm.cache.Load(req.Name) if lm.useCache {
pRaw, ok = lm.cache.Load(req.Name)
}
if ok { if ok {
p = pRaw.(*Policy) p = pRaw.(*Policy)
if atomic.LoadUint32(&p.deleted) == 1 { if atomic.LoadUint32(&p.deleted) == 1 {
@ -241,6 +259,7 @@ func (lm *LockManager) GetPolicy(ctx context.Context, req PolicyRequest) (retP *
// themselves // themselves
case lm.useCache: case lm.useCache:
lock.Unlock() lock.Unlock()
// If not using the cache, if we aren't returning a policy the caller // If not using the cache, if we aren't returning a policy the caller
// doesn't have a lock, so we must unlock // doesn't have a lock, so we must unlock
case retP == nil: case retP == nil:
@ -249,7 +268,9 @@ func (lm *LockManager) GetPolicy(ctx context.Context, req PolicyRequest) (retP *
} }
// Check the cache again // Check the cache again
if lm.useCache {
pRaw, ok = lm.cache.Load(req.Name) pRaw, ok = lm.cache.Load(req.Name)
}
if ok { if ok {
p = pRaw.(*Policy) p = pRaw.(*Policy)
if atomic.LoadUint32(&p.deleted) == 1 { if atomic.LoadUint32(&p.deleted) == 1 {
@ -378,6 +399,8 @@ func (lm *LockManager) GetPolicy(ctx context.Context, req PolicyRequest) (retP *
func (lm *LockManager) DeletePolicy(ctx context.Context, storage logical.Storage, name string) error { func (lm *LockManager) DeletePolicy(ctx context.Context, storage logical.Storage, name string) error {
var p *Policy var p *Policy
var err error var err error
var ok bool
var pRaw interface{}
// We may be writing to disk, so grab an exclusive lock. This prevents bad // We may be writing to disk, so grab an exclusive lock. This prevents bad
// behavior when the cache is turned off. We also lock the shared policy // behavior when the cache is turned off. We also lock the shared policy
@ -386,7 +409,9 @@ func (lm *LockManager) DeletePolicy(ctx context.Context, storage logical.Storage
lock.Lock() lock.Lock()
defer lock.Unlock() defer lock.Unlock()
pRaw, ok := lm.cache.Load(name) if lm.useCache {
pRaw, ok = lm.cache.Load(name)
}
if ok { if ok {
p = pRaw.(*Policy) p = pRaw.(*Policy)
p.l.Lock() p.l.Lock()
@ -409,7 +434,9 @@ func (lm *LockManager) DeletePolicy(ctx context.Context, storage logical.Storage
atomic.StoreUint32(&p.deleted, 1) atomic.StoreUint32(&p.deleted, 1)
if lm.useCache {
lm.cache.Delete(name) lm.cache.Delete(name)
}
err = storage.Delete(ctx, "policy/"+name) err = storage.Delete(ctx, "policy/"+name)
if err != nil { if err != nil {