Remove core restriction in cache and turn it into an active/standby restriction instead (#3849)

This commit is contained in:
Jeff Mitchell 2018-01-25 22:21:51 -05:00 committed by GitHub
parent c7580b2961
commit 60e2209532
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 256 additions and 325 deletions

View File

@ -2,9 +2,8 @@ package physical
import (
"context"
"strings"
"sync/atomic"
iradix "github.com/hashicorp/go-immutable-radix"
"github.com/hashicorp/golang-lru"
"github.com/hashicorp/vault/helper/locksutil"
log "github.com/mgutz/logxi/v1"
@ -12,7 +11,7 @@ import (
const (
// DefaultCacheSize is used if no cache size is specified for NewCache
DefaultCacheSize = 32 * 1024
DefaultCacheSize = 128 * 1024
)
// Cache is used to wrap an underlying physical backend
@ -20,11 +19,11 @@ const (
// Vault are for policy objects so there is a large read reduction
// by using a simple write-through cache.
type Cache struct {
backend Backend
lru *lru.TwoQueueCache
locks []*locksutil.LockEntry
exceptions *iradix.Tree
logger log.Logger
backend Backend
lru *lru.TwoQueueCache
locks []*locksutil.LockEntry
logger log.Logger
enabled *uint32
}
// TransactionalCache is a Cache that wraps the physical that is transactional
@ -34,49 +33,51 @@ type TransactionalCache struct {
}
// Verify Cache satisfies the correct interfaces
var _ Purgable = (*Cache)(nil)
var _ ToggleablePurgemonster = (*Cache)(nil)
var _ ToggleablePurgemonster = (*TransactionalCache)(nil)
var _ Backend = (*Cache)(nil)
var _ Transactional = (*TransactionalCache)(nil)
var _ Purgable = (*TransactionalCache)(nil)
// NewCache returns a physical cache of the given size.
// If no size is provided, the default size is used.
func NewCache(b Backend, size int, coreExceptions []string, logger log.Logger) *Cache {
func NewCache(b Backend, size int, logger log.Logger) *Cache {
if logger.IsTrace() {
logger.Trace("physical/cache: creating LRU cache", "size", size)
}
if size <= 0 {
size = DefaultCacheSize
}
cacheExceptions := iradix.New()
for _, key := range coreExceptions {
cacheValue := true
if strings.HasPrefix(key, "!") {
key = strings.TrimPrefix(key, "!")
cacheValue = false
}
cacheExceptions, _, _ = cacheExceptions.Insert([]byte(key), cacheValue)
}
cache, _ := lru.New2Q(size)
c := &Cache{
backend: b,
lru: cache,
locks: locksutil.CreateLocks(),
exceptions: cacheExceptions,
logger: logger,
backend: b,
lru: cache,
locks: locksutil.CreateLocks(),
logger: logger,
// This fails safe.
enabled: new(uint32),
}
return c
}
func NewTransactionalCache(b Backend, size int, coreExceptions []string, logger log.Logger) *TransactionalCache {
func NewTransactionalCache(b Backend, size int, logger log.Logger) *TransactionalCache {
c := &TransactionalCache{
Cache: NewCache(b, size, coreExceptions, logger),
Cache: NewCache(b, size, logger),
Transactional: b.(Transactional),
}
return c
}
// SetEnabled is used to toggle whether the cache is on or off. It must be
// called with true to actually activate the cache after creation.
func (c *Cache) SetEnabled(enabled bool) {
if enabled {
atomic.StoreUint32(c.enabled, 1)
return
}
atomic.StoreUint32(c.enabled, 0)
}
// Purge is used to clear the cache
func (c *Cache) Purge(ctx context.Context) {
// Lock the world
@ -89,30 +90,30 @@ func (c *Cache) Purge(ctx context.Context) {
}
func (c *Cache) Put(ctx context.Context, entry *Entry) error {
if atomic.LoadUint32(c.enabled) == 0 {
return c.backend.Put(ctx, entry)
}
lock := locksutil.LockForKey(c.locks, entry.Key)
lock.Lock()
defer lock.Unlock()
err := c.backend.Put(ctx, entry)
if err == nil && c.shouldCache(entry.Key) {
if err == nil {
c.lru.Add(entry.Key, entry)
}
return err
}
func (c *Cache) Get(ctx context.Context, key string) (*Entry, error) {
if atomic.LoadUint32(c.enabled) == 0 {
return c.backend.Get(ctx, key)
}
lock := locksutil.LockForKey(c.locks, key)
lock.RLock()
defer lock.RUnlock()
// We do NOT cache negative results for keys in the 'core/' prefix
// otherwise we risk certain race conditions upstream. The primary issue is
// with the HA mode, we could potentially negatively cache the leader entry
// and cause leader discovery to fail.
if !c.shouldCache(key) {
return c.backend.Get(ctx, key)
}
// Check the LRU first
if raw, ok := c.lru.Get(key); ok {
if raw == nil {
@ -136,12 +137,16 @@ func (c *Cache) Get(ctx context.Context, key string) (*Entry, error) {
}
func (c *Cache) Delete(ctx context.Context, key string) error {
if atomic.LoadUint32(c.enabled) == 0 {
return c.backend.Delete(ctx, key)
}
lock := locksutil.LockForKey(c.locks, key)
lock.Lock()
defer lock.Unlock()
err := c.backend.Delete(ctx, key)
if err == nil && c.shouldCache(key) {
if err == nil {
c.lru.Remove(key)
}
return err
@ -170,8 +175,8 @@ func (c *TransactionalCache) Transaction(ctx context.Context, txns []*TxnEntry)
return err
}
for _, txn := range txns {
if c.shouldCache(txn.Entry.Key) {
if atomic.LoadUint32(c.enabled) == 1 {
for _, txn := range txns {
switch txn.Operation {
case PutOperation:
c.lru.Add(txn.Entry.Key, txn.Entry)
@ -183,20 +188,3 @@ func (c *TransactionalCache) Transaction(ctx context.Context, txns []*TxnEntry)
return nil
}
// shouldCache checks for any cache exceptions
func (c *Cache) shouldCache(key string) bool {
// prefix match if nested under core/
if strings.HasPrefix(key, "core/") {
if prefix, val, found := c.exceptions.Root().LongestPrefix([]byte(key)); found {
strPrefix := string(prefix)
if strings.HasSuffix(strPrefix, "/") || strPrefix == key {
return val.(bool)
}
}
// default for core/ values is false
return false
}
// default is true
return true
}

View File

@ -16,7 +16,7 @@ func TestCache(t *testing.T) {
if err != nil {
t.Fatal(err)
}
cache := physical.NewCache(inm, 0, nil, logger)
cache := physical.NewCache(inm, 0, logger)
physical.ExerciseBackend(t, cache)
physical.ExerciseBackend_ListPrefix(t, cache)
}
@ -28,7 +28,8 @@ func TestCache_Purge(t *testing.T) {
if err != nil {
t.Fatal(err)
}
cache := physical.NewCache(inm, 0, nil, logger)
cache := physical.NewCache(inm, 0, logger)
cache.SetEnabled(true)
ent := &physical.Entry{
Key: "foo",
@ -41,6 +42,9 @@ func TestCache_Purge(t *testing.T) {
// Delete from under
inm.Delete(context.Background(), "foo")
if err != nil {
t.Fatal(err)
}
// Read should work
out, err := cache.Get(context.Background(), "foo")
@ -64,267 +68,202 @@ func TestCache_Purge(t *testing.T) {
}
}
func TestCache_ExcludeCore(t *testing.T) {
func TestCache_Disable(t *testing.T) {
logger := logformat.NewVaultLogger(log.LevelTrace)
inm, err := NewInmem(nil, logger)
if err != nil {
t.Fatal(err)
}
cache := physical.NewCache(inm, 0, logger)
cache := physical.NewCache(inm, 0, nil, logger)
var ent *physical.Entry
// First try normal handling
ent = &physical.Entry{
Key: "foo",
Value: []byte("bar"),
}
if err := cache.Put(context.Background(), ent); err != nil {
t.Fatal(err)
}
ent = &physical.Entry{
Key: "foo",
Value: []byte("foobar"),
}
if err := inm.Put(context.Background(), ent); err != nil {
t.Fatal(err)
}
ent, err = cache.Get(context.Background(), "foo")
if err != nil {
t.Fatal(err)
}
if string(ent.Value) != "bar" {
t.Fatal("expected cached value")
}
// Now try core path
ent = &physical.Entry{
Key: "core/foo",
Value: []byte("bar"),
}
if err := cache.Put(context.Background(), ent); err != nil {
t.Fatal(err)
}
ent = &physical.Entry{
Key: "core/foo",
Value: []byte("foobar"),
}
if err := inm.Put(context.Background(), ent); err != nil {
t.Fatal(err)
}
ent, err = cache.Get(context.Background(), "core/foo")
if err != nil {
t.Fatal(err)
}
if string(ent.Value) != "foobar" {
t.Fatal("expected cached value")
}
// Now make sure looked-up values aren't added
ent = &physical.Entry{
Key: "core/zip",
Value: []byte("zap"),
}
if err := inm.Put(context.Background(), ent); err != nil {
t.Fatal(err)
}
ent, err = cache.Get(context.Background(), "core/zip")
if err != nil {
t.Fatal(err)
}
if string(ent.Value) != "zap" {
t.Fatal("expected non-cached value")
}
ent = &physical.Entry{
Key: "core/zip",
Value: []byte("zipzap"),
}
if err := inm.Put(context.Background(), ent); err != nil {
t.Fatal(err)
}
ent, err = cache.Get(context.Background(), "core/zip")
if err != nil {
t.Fatal(err)
}
if string(ent.Value) != "zipzap" {
t.Fatal("expected non-cached value")
}
}
func TestCache_ExcludeCoreTransactional(t *testing.T) {
logger := logformat.NewVaultLogger(log.LevelTrace)
inm, err := NewTransactionalInmem(nil, logger)
if err != nil {
t.Fatal(err)
}
cache := physical.NewTransactionalCache(inm, 0, nil, logger)
var ent *physical.TxnEntry
var entry *physical.Entry
// First try normal handling
ent = &physical.TxnEntry{
Operation: physical.PutOperation,
Entry: &physical.Entry{
disabledTests := func() {
ent := &physical.Entry{
Key: "foo",
Value: []byte("bar"),
},
}
err = inm.Put(context.Background(), ent)
if err != nil {
t.Fatalf("err: %v", err)
}
// Read should work
out, err := cache.Get(context.Background(), "foo")
if err != nil {
t.Fatalf("err: %v", err)
}
if out == nil {
t.Fatalf("should have key")
}
err = inm.Delete(context.Background(), ent.Key)
if err != nil {
t.Fatal(err)
}
// Should not work
out, err = cache.Get(context.Background(), "foo")
if err != nil {
t.Fatalf("err: %v", err)
}
if out != nil {
t.Fatalf("should not have key")
}
// Put through the cache and try again
err = cache.Put(context.Background(), ent)
if err != nil {
t.Fatalf("err: %v", err)
}
// Read should work in both
out, err = inm.Get(context.Background(), "foo")
if err != nil {
t.Fatalf("err: %v", err)
}
if out == nil {
t.Fatalf("should have key")
}
out, err = cache.Get(context.Background(), "foo")
if err != nil {
t.Fatalf("err: %v", err)
}
if out == nil {
t.Fatalf("should have key")
}
err = inm.Delete(context.Background(), ent.Key)
if err != nil {
t.Fatal(err)
}
// Should not work
out, err = cache.Get(context.Background(), "foo")
if err != nil {
t.Fatalf("err: %v", err)
}
if out != nil {
t.Fatalf("should not have key")
}
}
if err := cache.Transaction(context.Background(), []*physical.TxnEntry{ent}); err != nil {
t.Fatal(err)
}
ent = &physical.TxnEntry{
Operation: physical.PutOperation,
Entry: &physical.Entry{
enabledTests := func() {
ent := &physical.Entry{
Key: "foo",
Value: []byte("foobar"),
},
}
if err := inm.(physical.Transactional).Transaction(context.Background(), []*physical.TxnEntry{ent}); err != nil {
t.Fatal(err)
}
entry, err = cache.Get(context.Background(), "foo")
if err != nil {
t.Fatal(err)
}
if string(entry.Value) != "bar" {
t.Fatal("expected cached value")
}
// Now try core path
ent = &physical.TxnEntry{
Operation: physical.PutOperation,
Entry: &physical.Entry{
Key: "core/foo",
Value: []byte("bar"),
},
}
if err := cache.Transaction(context.Background(), []*physical.TxnEntry{ent}); err != nil {
t.Fatal(err)
}
ent = &physical.TxnEntry{
Operation: physical.PutOperation,
Entry: &physical.Entry{
Key: "core/foo",
Value: []byte("foobar"),
},
}
if err := inm.(physical.Transactional).Transaction(context.Background(), []*physical.TxnEntry{ent}); err != nil {
t.Fatal(err)
}
entry, err = cache.Get(context.Background(), "core/foo")
if err != nil {
t.Fatal(err)
}
if string(entry.Value) != "foobar" {
t.Fatal("expected non-cached value")
}
}
func TestCache_CoreExceptions(t *testing.T) {
logger := logformat.NewVaultLogger(log.LevelTrace)
inm, err := NewInmem(nil, logger)
if err != nil {
t.Fatal(err)
}
cache := physical.NewCache(inm, 0, []string{"core/bar", "!core/baz/", "core/baz/zzz"}, logger)
var ent *physical.Entry
// Now try core path
ent = &physical.Entry{
Key: "core/foo",
Value: []byte("bar"),
}
if err := cache.Put(context.Background(), ent); err != nil {
t.Fatal(err)
}
ent = &physical.Entry{
Key: "core/foo",
Value: []byte("foobar"),
}
if err := inm.Put(context.Background(), ent); err != nil {
t.Fatal(err)
}
ent, err = cache.Get(context.Background(), "core/foo")
if err != nil {
t.Fatal(err)
}
if string(ent.Value) != "foobar" {
t.Fatal("expected non-cached value")
}
// Now try an exception
ent = &physical.Entry{
Key: "core/bar",
Value: []byte("bar"),
}
if err := cache.Put(context.Background(), ent); err != nil {
t.Fatal(err)
}
ent = &physical.Entry{
Key: "core/bar",
Value: []byte("foobar"),
}
if err := inm.Put(context.Background(), ent); err != nil {
t.Fatal(err)
}
ent, err = cache.Get(context.Background(), "core/bar")
if err != nil {
t.Fatal(err)
}
if string(ent.Value) != "bar" {
t.Fatal("expected cached value")
}
// another one
ent = &physical.Entry{
Key: "core/baz/aaa",
Value: []byte("bar"),
}
if err := cache.Put(context.Background(), ent); err != nil {
t.Fatal(err)
}
ent = &physical.Entry{
Key: "core/baz/aaa",
Value: []byte("foobar"),
}
if err := inm.Put(context.Background(), ent); err != nil {
t.Fatal(err)
}
ent, err = cache.Get(context.Background(), "core/baz/aaa")
if err != nil {
t.Fatal(err)
}
if string(ent.Value) != "foobar" {
t.Fatal("expected non-cached value")
}
// another one
ent = &physical.Entry{
Key: "core/baz/zzz",
Value: []byte("bar"),
}
if err := cache.Put(context.Background(), ent); err != nil {
t.Fatal(err)
}
ent = &physical.Entry{
Key: "core/baz/zzz",
Value: []byte("foobar"),
}
if err := inm.Put(context.Background(), ent); err != nil {
t.Fatal(err)
}
ent, err = cache.Get(context.Background(), "core/baz/zzz")
if err != nil {
t.Fatal(err)
}
if string(ent.Value) != "bar" {
t.Fatal("expected cached value")
}
err = inm.Put(context.Background(), ent)
if err != nil {
t.Fatalf("err: %v", err)
}
// Read should work
out, err := cache.Get(context.Background(), "foo")
if err != nil {
t.Fatalf("err: %v", err)
}
if out == nil {
t.Fatalf("should have key")
}
err = inm.Delete(context.Background(), ent.Key)
if err != nil {
t.Fatal(err)
}
// Should work
out, err = cache.Get(context.Background(), "foo")
if err != nil {
t.Fatalf("err: %v", err)
}
if out == nil {
t.Fatalf("should have key")
}
// Put through the cache and try again
err = cache.Put(context.Background(), ent)
if err != nil {
t.Fatalf("err: %v", err)
}
// Read should work for both
out, err = inm.Get(context.Background(), "foo")
if err != nil {
t.Fatalf("err: %v", err)
}
if out == nil {
t.Fatalf("should have key")
}
out, err = cache.Get(context.Background(), "foo")
if err != nil {
t.Fatalf("err: %v", err)
}
if out == nil {
t.Fatalf("should have key")
}
err = inm.Delete(context.Background(), ent.Key)
if err != nil {
t.Fatal(err)
}
// Should work
out, err = cache.Get(context.Background(), "foo")
if err != nil {
t.Fatalf("err: %v", err)
}
if out == nil {
t.Fatalf("should have key")
}
// Put through the cache
err = cache.Put(context.Background(), ent)
if err != nil {
t.Fatalf("err: %v", err)
}
// Read should work for both
out, err = inm.Get(context.Background(), "foo")
if err != nil {
t.Fatalf("err: %v", err)
}
if out == nil {
t.Fatalf("should have key")
}
out, err = cache.Get(context.Background(), "foo")
if err != nil {
t.Fatalf("err: %v", err)
}
if out == nil {
t.Fatalf("should have key")
}
// Delete via cache
err = cache.Delete(context.Background(), ent.Key)
if err != nil {
t.Fatal(err)
}
// Read should not work for either
out, err = inm.Get(context.Background(), "foo")
if err != nil {
t.Fatalf("err: %v", err)
}
if out != nil {
t.Fatalf("should not have key")
}
out, err = cache.Get(context.Background(), "foo")
if err != nil {
t.Fatalf("err: %v", err)
}
if out != nil {
t.Fatalf("should not have key")
}
}
disabledTests()
cache.SetEnabled(true)
enabledTests()
cache.SetEnabled(false)
disabledTests()
}

View File

@ -56,10 +56,12 @@ type HABackend interface {
HAEnabled() bool
}
// Purgable is an optional interface for backends that support
// purging of their caches.
type Purgable interface {
// ToggleablePurgemonster is an interface for backends that can toggle on or
// off special functionality and/or support purging. This is only used for the
// cache, don't use it for other things.
type ToggleablePurgemonster interface {
Purge(ctx context.Context)
SetEnabled(bool)
}
// RedirectDetect is an optional interface that an HABackend

View File

@ -32,7 +32,7 @@ func (p *PhysicalAccess) List(ctx context.Context, prefix string) ([]string, err
}
func (p *PhysicalAccess) Purge(ctx context.Context) {
if purgeable, ok := p.physical.(Purgable); ok {
if purgeable, ok := p.physical.(ToggleablePurgemonster); ok {
purgeable.Purge(ctx)
}
}

View File

@ -282,6 +282,9 @@ type Core struct {
// cachingDisabled indicates whether caches are disabled
cachingDisabled bool
// Cache stores the actual cache; we always have this but may bypass it if
// disabled
physicalCache physical.ToggleablePurgemonster
// reloadFuncs is a map containing reload functions
reloadFuncs map[string][]reload.ReloadFunc
@ -517,13 +520,12 @@ func NewCore(conf *CoreConfig) (*Core, error) {
var ok bool
// Wrap the physical backend in a cache layer if enabled
if !conf.DisableCache {
if txnOK {
c.physical = physical.NewTransactionalCache(phys, conf.CacheSize, nil, conf.Logger)
} else {
c.physical = physical.NewCache(phys, conf.CacheSize, nil, conf.Logger)
}
if txnOK {
c.physical = physical.NewTransactionalCache(phys, conf.CacheSize, conf.Logger)
} else {
c.physical = physical.NewCache(phys, conf.CacheSize, conf.Logger)
}
c.physicalCache = c.physical.(physical.ToggleablePurgemonster)
if !conf.DisableMlock {
// Ensure our memory usage is locked into physical RAM
@ -1573,9 +1575,9 @@ func (c *Core) postUnseal() (retErr error) {
c.clearForwardingClients()
c.requestForwardingConnectionLock.Unlock()
// Purge the backend if supported
if purgable, ok := c.physical.(physical.Purgable); ok {
purgable.Purge(c.activeContext)
c.physicalCache.Purge(c.activeContext)
if !c.cachingDisabled {
c.physicalCache.SetEnabled(true)
}
// Purge these for safety in case of a rekey
@ -1683,10 +1685,10 @@ func (c *Core) preSeal() error {
result = multierror.Append(result, err)
}
// Purge the backend if supported
if purgable, ok := c.physical.(physical.Purgable); ok {
purgable.Purge(c.activeContext)
}
// Purge the cache
c.physicalCache.SetEnabled(false)
c.physicalCache.Purge(c.activeContext)
c.logger.Info("core: pre-seal teardown complete")
return result
}