boltdd: add iterate and prefix deletion helpers
This PR adds 2 helper functions to the helpers/bbolt package - Iterate: iterate every key in a bucket. Automatically decodes the msg pack value into the provided value argument. - DeletePrefix: deletes every key in a bucket starting with a given prefix. Manages the wrapper's hash values accordingly. Uses a cursor & sync to operate efficiently.
This commit is contained in:
parent
646ac6e38e
commit
a49b98ac6b
|
@ -353,6 +353,35 @@ func (b *Bucket) Get(key []byte, obj interface{}) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// Iterate iterates each key in Bucket b that starts with prefix. fn is called on
|
||||
// the key and msg-pack decoded value. If prefix is empty or nil, all keys in the
|
||||
// bucket are iterated.
|
||||
func Iterate[T any](b *Bucket, prefix []byte, fn func([]byte, T)) error {
|
||||
c := b.boltBucket.Cursor()
|
||||
for k, data := c.Seek(prefix); k != nil && bytes.HasPrefix(k, prefix); k, data = c.Next() {
|
||||
var obj T
|
||||
if err := codec.NewDecoderBytes(data, structs.MsgpackHandle).Decode(&obj); err != nil {
|
||||
return fmt.Errorf("failed to decode data into passed object: %v", err)
|
||||
}
|
||||
fn(k, obj)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// DeletePrefix removes all keys starting with prefix from the bucket. If no keys
|
||||
// with prefix exist then nothing is done and a nil error is returned. Returns an
|
||||
// error if the bucket was created from a read-only transaction.
|
||||
func (b *Bucket) DeletePrefix(prefix []byte) error {
|
||||
c := b.boltBucket.Cursor()
|
||||
for k, _ := c.Seek(prefix); k != nil && bytes.HasPrefix(k, prefix); k, _ = c.Next() {
|
||||
if err := c.Delete(); err != nil {
|
||||
return err
|
||||
}
|
||||
b.bm.delHash(string(k))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Delete removes a key from the bucket. If the key does not exist then nothing
|
||||
// is done and a nil error is returned. Returns an error if the bucket was
|
||||
// created from a read-only transaction.
|
||||
|
|
|
@ -10,21 +10,30 @@ import (
|
|||
"github.com/hashicorp/nomad/ci"
|
||||
"github.com/hashicorp/nomad/nomad/mock"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/shoenig/test/must"
|
||||
"go.etcd.io/bbolt"
|
||||
)
|
||||
|
||||
const (
|
||||
testDB = "nomad-test.db"
|
||||
testDBPerms = 0600
|
||||
)
|
||||
|
||||
// a simple struct type for testing msg pack en/decoding
|
||||
type employee struct {
|
||||
Name string
|
||||
ID int
|
||||
}
|
||||
|
||||
func setupBoltDB(t testing.TB) *DB {
|
||||
dir := t.TempDir()
|
||||
|
||||
dbFilename := filepath.Join(dir, "nomadtest.db")
|
||||
db, err := Open(dbFilename, 0600, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("error creating boltdb: %v", err)
|
||||
}
|
||||
dbFilename := filepath.Join(dir, testDB)
|
||||
db, err := Open(dbFilename, testDBPerms, nil)
|
||||
must.NoError(t, err)
|
||||
|
||||
t.Cleanup(func() {
|
||||
db.Close()
|
||||
must.NoError(t, db.Close())
|
||||
})
|
||||
|
||||
return db
|
||||
|
@ -32,11 +41,8 @@ func setupBoltDB(t testing.TB) *DB {
|
|||
|
||||
func TestDB_Open(t *testing.T) {
|
||||
ci.Parallel(t)
|
||||
require := require.New(t)
|
||||
|
||||
db := setupBoltDB(t)
|
||||
|
||||
require.Equal(0, db.BoltDB().Stats().TxStats.Write)
|
||||
must.Zero(t, db.BoltDB().Stats().TxStats.Write)
|
||||
}
|
||||
|
||||
func TestDB_Close(t *testing.T) {
|
||||
|
@ -44,14 +50,14 @@ func TestDB_Close(t *testing.T) {
|
|||
|
||||
db := setupBoltDB(t)
|
||||
|
||||
db.Close()
|
||||
must.NoError(t, db.Close())
|
||||
|
||||
require.Equal(t, db.Update(func(tx *Tx) error {
|
||||
must.Eq(t, db.Update(func(tx *Tx) error {
|
||||
_, err := tx.CreateBucketIfNotExists([]byte("foo"))
|
||||
return err
|
||||
}), bbolt.ErrDatabaseNotOpen)
|
||||
|
||||
require.Equal(t, db.Update(func(tx *Tx) error {
|
||||
must.Eq(t, db.Update(func(tx *Tx) error {
|
||||
_, err := tx.CreateBucket([]byte("foo"))
|
||||
return err
|
||||
}), bbolt.ErrDatabaseNotOpen)
|
||||
|
@ -59,43 +65,125 @@ func TestDB_Close(t *testing.T) {
|
|||
|
||||
func TestBucket_Create(t *testing.T) {
|
||||
ci.Parallel(t)
|
||||
require := require.New(t)
|
||||
|
||||
db := setupBoltDB(t)
|
||||
|
||||
name := []byte("create_test")
|
||||
|
||||
require.NoError(db.Update(func(tx *Tx) error {
|
||||
must.NoError(t, db.Update(func(tx *Tx) error {
|
||||
// Trying to get a nonexistent bucket should return nil
|
||||
require.Nil(tx.Bucket(name))
|
||||
must.Nil(t, tx.Bucket(name))
|
||||
|
||||
// Creating a nonexistent bucket should work
|
||||
b, err := tx.CreateBucket(name)
|
||||
require.NoError(err)
|
||||
require.NotNil(b)
|
||||
must.NoError(t, err)
|
||||
must.NotNil(t, b)
|
||||
|
||||
// Recreating a bucket that exists should fail
|
||||
b, err = tx.CreateBucket(name)
|
||||
require.Error(err)
|
||||
require.Nil(b)
|
||||
must.Error(t, err)
|
||||
must.Nil(t, b)
|
||||
|
||||
// get or create should work
|
||||
b, err = tx.CreateBucketIfNotExists(name)
|
||||
require.NoError(err)
|
||||
require.NotNil(b)
|
||||
must.NoError(t, err)
|
||||
must.NotNil(t, b)
|
||||
return nil
|
||||
}))
|
||||
|
||||
// Bucket should be visible
|
||||
require.NoError(db.View(func(tx *Tx) error {
|
||||
require.NotNil(tx.Bucket(name))
|
||||
must.NoError(t, db.View(func(tx *Tx) error {
|
||||
must.NotNil(t, tx.Bucket(name))
|
||||
return nil
|
||||
}))
|
||||
}
|
||||
|
||||
func TestBucket_Iterate(t *testing.T) {
|
||||
ci.Parallel(t)
|
||||
|
||||
db := setupBoltDB(t)
|
||||
|
||||
bucket := []byte("iterate_test")
|
||||
|
||||
must.NoError(t, db.Update(func(tx *Tx) error {
|
||||
b, err := tx.CreateBucketIfNotExists(bucket)
|
||||
must.NoError(t, err)
|
||||
must.NotNil(t, b)
|
||||
|
||||
must.NoError(t, b.Put([]byte("ceo"), employee{Name: "dave", ID: 15}))
|
||||
must.NoError(t, b.Put([]byte("founder"), employee{Name: "mitchel", ID: 1}))
|
||||
must.NoError(t, b.Put([]byte("cto"), employee{Name: "armon", ID: 2}))
|
||||
return nil
|
||||
}))
|
||||
|
||||
t.Run("success", func(t *testing.T) {
|
||||
var result []employee
|
||||
err := db.View(func(tx *Tx) error {
|
||||
b := tx.Bucket(bucket)
|
||||
return Iterate(b, nil, func(key []byte, e employee) {
|
||||
result = append(result, e)
|
||||
})
|
||||
})
|
||||
must.NoError(t, err)
|
||||
must.Eq(t, []employee{
|
||||
{"dave", 15}, {"armon", 2}, {"mitchel", 1},
|
||||
}, result)
|
||||
})
|
||||
|
||||
t.Run("failure", func(t *testing.T) {
|
||||
err := db.View(func(tx *Tx) error {
|
||||
b := tx.Bucket(bucket)
|
||||
// will fail to encode employee into an int
|
||||
return Iterate(b, nil, func(key []byte, i int) {
|
||||
must.True(t, false) // must not get here
|
||||
})
|
||||
})
|
||||
must.Error(t, err)
|
||||
})
|
||||
}
|
||||
|
||||
func TestBucket_DeletePrefix(t *testing.T) {
|
||||
ci.Parallel(t)
|
||||
|
||||
db := setupBoltDB(t)
|
||||
|
||||
bucket := []byte("delete_prefix_test")
|
||||
|
||||
must.NoError(t, db.Update(func(tx *Tx) error {
|
||||
b, err := tx.CreateBucketIfNotExists(bucket)
|
||||
must.NoError(t, err)
|
||||
must.NotNil(t, b)
|
||||
|
||||
must.NoError(t, b.Put([]byte("exec_a"), employee{Name: "dave", ID: 15}))
|
||||
must.NoError(t, b.Put([]byte("intern_a"), employee{Name: "alice", ID: 7384}))
|
||||
must.NoError(t, b.Put([]byte("exec_c"), employee{Name: "armon", ID: 2}))
|
||||
must.NoError(t, b.Put([]byte("intern_b"), employee{Name: "bob", ID: 7312}))
|
||||
must.NoError(t, b.Put([]byte("exec_b"), employee{Name: "mitchel", ID: 1}))
|
||||
return nil
|
||||
}))
|
||||
|
||||
// remove interns
|
||||
must.NoError(t, db.Update(func(tx *Tx) error {
|
||||
bkt := tx.Bucket(bucket)
|
||||
return bkt.DeletePrefix([]byte("intern_"))
|
||||
}))
|
||||
|
||||
// assert 3 exec remain
|
||||
var result []employee
|
||||
err := db.View(func(tx *Tx) error {
|
||||
bkt := tx.Bucket(bucket)
|
||||
return Iterate(bkt, nil, func(k []byte, e employee) {
|
||||
result = append(result, e)
|
||||
})
|
||||
})
|
||||
must.NoError(t, err)
|
||||
must.Eq(t, []employee{
|
||||
{"dave", 15}, {"mitchel", 1}, {"armon", 2},
|
||||
}, result)
|
||||
}
|
||||
|
||||
func TestBucket_DedupeWrites(t *testing.T) {
|
||||
ci.Parallel(t)
|
||||
require := require.New(t)
|
||||
|
||||
db := setupBoltDB(t)
|
||||
|
||||
|
@ -104,38 +192,37 @@ func TestBucket_DedupeWrites(t *testing.T) {
|
|||
k2name := []byte("k2")
|
||||
|
||||
// Put 2 keys
|
||||
require.NoError(db.Update(func(tx *Tx) error {
|
||||
must.NoError(t, db.Update(func(tx *Tx) error {
|
||||
b, err := tx.CreateBucket(bname)
|
||||
require.NoError(err)
|
||||
|
||||
require.NoError(b.Put(k1name, k1name))
|
||||
require.NoError(b.Put(k2name, k2name))
|
||||
must.NoError(t, err)
|
||||
must.NoError(t, b.Put(k1name, k1name))
|
||||
must.NoError(t, b.Put(k2name, k2name))
|
||||
return nil
|
||||
}))
|
||||
|
||||
// Assert there was at least 1 write
|
||||
origWrites := db.BoltDB().Stats().TxStats.Write
|
||||
require.NotZero(origWrites)
|
||||
must.Greater(t, origWrites, 0)
|
||||
|
||||
// Write the same values again and expect no new writes
|
||||
require.NoError(db.Update(func(tx *Tx) error {
|
||||
must.NoError(t, db.Update(func(tx *Tx) error {
|
||||
b := tx.Bucket(bname)
|
||||
require.NoError(b.Put(k1name, k1name))
|
||||
require.NoError(b.Put(k2name, k2name))
|
||||
must.NoError(t, b.Put(k1name, k1name))
|
||||
must.NoError(t, b.Put(k2name, k2name))
|
||||
return nil
|
||||
}))
|
||||
|
||||
putWrites := db.BoltDB().Stats().TxStats.Write
|
||||
|
||||
// Unforunately every committed transaction causes two writes, so this
|
||||
// Unfortunately every committed transaction causes two writes, so this
|
||||
// only saves 1 write operation
|
||||
require.Equal(origWrites+2, putWrites)
|
||||
must.Eq(t, origWrites+2, putWrites)
|
||||
|
||||
// Write new values and assert more writes took place
|
||||
require.NoError(db.Update(func(tx *Tx) error {
|
||||
must.NoError(t, db.Update(func(tx *Tx) error {
|
||||
b := tx.Bucket(bname)
|
||||
require.NoError(b.Put(k1name, []byte("newval1")))
|
||||
require.NoError(b.Put(k2name, []byte("newval2")))
|
||||
must.NoError(t, b.Put(k1name, []byte("newval1")))
|
||||
must.NoError(t, b.Put(k2name, []byte("newval2")))
|
||||
return nil
|
||||
}))
|
||||
|
||||
|
@ -143,12 +230,11 @@ func TestBucket_DedupeWrites(t *testing.T) {
|
|||
|
||||
// Expect 3 additional writes: 2 for the transaction and one for the
|
||||
// dirty page
|
||||
require.Equal(putWrites+3, putWrites2)
|
||||
must.Eq(t, putWrites+3, putWrites2)
|
||||
}
|
||||
|
||||
func TestBucket_Delete(t *testing.T) {
|
||||
ci.Parallel(t)
|
||||
require := require.New(t)
|
||||
|
||||
db := setupBoltDB(t)
|
||||
|
||||
|
@ -162,93 +248,92 @@ func TestBucket_Delete(t *testing.T) {
|
|||
grandchildKey2 := []byte("grandchild_key2")
|
||||
|
||||
// Create a parent bucket with 1 child and 2 grandchildren
|
||||
require.NoError(db.Update(func(tx *Tx) error {
|
||||
must.NoError(t, db.Update(func(tx *Tx) error {
|
||||
pb, err := tx.CreateBucket(parentName)
|
||||
require.NoError(err)
|
||||
must.NoError(t, err)
|
||||
|
||||
require.NoError(pb.Put(parentKey, parentKey))
|
||||
must.NoError(t, pb.Put(parentKey, parentKey))
|
||||
|
||||
child, err := pb.CreateBucket(childName)
|
||||
require.NoError(err)
|
||||
must.NoError(t, err)
|
||||
|
||||
require.NoError(child.Put(childKey, childKey))
|
||||
must.NoError(t, child.Put(childKey, childKey))
|
||||
|
||||
grandchild1, err := child.CreateBucket(grandchildName1)
|
||||
require.NoError(err)
|
||||
must.NoError(t, err)
|
||||
|
||||
require.NoError(grandchild1.Put(grandchildKey1, grandchildKey1))
|
||||
must.NoError(t, grandchild1.Put(grandchildKey1, grandchildKey1))
|
||||
|
||||
grandchild2, err := child.CreateBucket(grandchildName2)
|
||||
require.NoError(err)
|
||||
must.NoError(t, err)
|
||||
|
||||
require.NoError(grandchild2.Put(grandchildKey2, grandchildKey2))
|
||||
must.NoError(t, grandchild2.Put(grandchildKey2, grandchildKey2))
|
||||
return nil
|
||||
}))
|
||||
|
||||
// Verify grandchild keys wrote
|
||||
require.NoError(db.View(func(tx *Tx) error {
|
||||
must.NoError(t, db.View(func(tx *Tx) error {
|
||||
grandchild1 := tx.Bucket(parentName).Bucket(childName).Bucket(grandchildName1)
|
||||
var v1 []byte
|
||||
grandchild1.Get(grandchildKey1, &v1)
|
||||
require.Equal(grandchildKey1, v1)
|
||||
must.NoError(t, grandchild1.Get(grandchildKey1, &v1))
|
||||
must.Eq(t, grandchildKey1, v1)
|
||||
|
||||
grandchild2 := tx.Bucket(parentName).Bucket(childName).Bucket(grandchildName2)
|
||||
var v2 []byte
|
||||
grandchild2.Get(grandchildKey2, &v2)
|
||||
require.Equal(grandchildKey2, v2)
|
||||
must.NoError(t, grandchild2.Get(grandchildKey2, &v2))
|
||||
must.Eq(t, grandchildKey2, v2)
|
||||
return nil
|
||||
}))
|
||||
|
||||
// Delete grandchildKey1 and grandchild2
|
||||
require.NoError(db.Update(func(tx *Tx) error {
|
||||
must.NoError(t, db.Update(func(tx *Tx) error {
|
||||
child := tx.Bucket(parentName).Bucket(childName)
|
||||
|
||||
require.NoError(child.DeleteBucket(grandchildName2))
|
||||
must.NoError(t, child.DeleteBucket(grandchildName2))
|
||||
|
||||
grandchild1 := child.Bucket(grandchildName1)
|
||||
require.NoError(grandchild1.Delete(grandchildKey1))
|
||||
must.NoError(t, grandchild1.Delete(grandchildKey1))
|
||||
return nil
|
||||
}))
|
||||
|
||||
// Ensure grandchild2 alone was deleted
|
||||
require.NoError(db.View(func(tx *Tx) error {
|
||||
must.NoError(t, db.View(func(tx *Tx) error {
|
||||
grandchild1 := tx.Bucket(parentName).Bucket(childName).Bucket(grandchildName1)
|
||||
var v1 []byte
|
||||
grandchild1.Get(grandchildKey1, &v1)
|
||||
require.Equal(([]byte)(nil), v1)
|
||||
must.Error(t, grandchild1.Get(grandchildKey1, &v1))
|
||||
must.Eq(t, ([]byte)(nil), v1)
|
||||
|
||||
grandchild2 := tx.Bucket(parentName).Bucket(childName).Bucket(grandchildName2)
|
||||
require.Nil(grandchild2)
|
||||
must.Nil(t, grandchild2)
|
||||
return nil
|
||||
}))
|
||||
|
||||
// Deleting child bucket should delete grandchild1 as well
|
||||
require.NoError(db.Update(func(tx *Tx) error {
|
||||
must.NoError(t, db.Update(func(tx *Tx) error {
|
||||
parent := tx.Bucket(parentName)
|
||||
require.NoError(parent.DeleteBucket(childName))
|
||||
must.NoError(t, parent.DeleteBucket(childName))
|
||||
|
||||
// Recreate child bucket and ensure childKey and grandchild are gone
|
||||
child, err := parent.CreateBucket(childName)
|
||||
require.NoError(err)
|
||||
must.NoError(t, err)
|
||||
|
||||
var v []byte
|
||||
err = child.Get(childKey, &v)
|
||||
require.Error(err)
|
||||
require.True(IsErrNotFound(err))
|
||||
require.Equal(([]byte)(nil), v)
|
||||
must.Error(t, err)
|
||||
must.True(t, IsErrNotFound(err))
|
||||
must.Eq(t, ([]byte)(nil), v)
|
||||
|
||||
require.Nil(child.Bucket(grandchildName1))
|
||||
must.Nil(t, child.Bucket(grandchildName1))
|
||||
|
||||
// Rewrite childKey1 to make sure it doesn't get dedupe incorrectly
|
||||
require.NoError(child.Put(childKey, childKey))
|
||||
// Rewrite childKey1 to make sure it doesn't get de-dupe incorrectly
|
||||
must.NoError(t, child.Put(childKey, childKey))
|
||||
return nil
|
||||
}))
|
||||
|
||||
// Ensure childKey1 was rewritten and not deduped incorrectly
|
||||
require.NoError(db.View(func(tx *Tx) error {
|
||||
// Ensure childKey1 was rewritten and not de-duped incorrectly
|
||||
must.NoError(t, db.View(func(tx *Tx) error {
|
||||
var v []byte
|
||||
require.NoError(tx.Bucket(parentName).Bucket(childName).Get(childKey, &v))
|
||||
require.Equal(childKey, v)
|
||||
must.NoError(t, tx.Bucket(parentName).Bucket(childName).Get(childKey, &v))
|
||||
must.Eq(t, childKey, v)
|
||||
return nil
|
||||
}))
|
||||
}
|
||||
|
@ -260,77 +345,60 @@ func BenchmarkWriteDeduplication_On(b *testing.B) {
|
|||
alloc := mock.Alloc()
|
||||
allocID := []byte(alloc.ID)
|
||||
|
||||
err := db.Update(func(tx *Tx) error {
|
||||
must.NoError(b, db.Update(func(tx *Tx) error {
|
||||
allocs, err := tx.CreateBucket(bucketName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return allocs.Put(allocID, alloc)
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
b.Fatalf("error setting up: %v", err)
|
||||
}
|
||||
}))
|
||||
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
err := db.Update(func(tx *Tx) error {
|
||||
must.NoError(b, db.Update(func(tx *Tx) error {
|
||||
return tx.Bucket(bucketName).Put(allocID, alloc)
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
b.Fatalf("error at runtime: %v", err)
|
||||
}
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkWriteDeduplication_Off(b *testing.B) {
|
||||
dir := b.TempDir()
|
||||
|
||||
dbFilename := filepath.Join(dir, "nomadtest.db")
|
||||
db, err := Open(dbFilename, 0600, nil)
|
||||
if err != nil {
|
||||
b.Fatalf("error creating boltdb: %v", err)
|
||||
}
|
||||
dbFilename := filepath.Join(dir, testDB)
|
||||
db, openErr := Open(dbFilename, testDBPerms, nil)
|
||||
must.NoError(b, openErr)
|
||||
|
||||
defer db.Close()
|
||||
b.Cleanup(func() {
|
||||
must.NoError(b, db.Close())
|
||||
})
|
||||
|
||||
bucketName := []byte("allocations")
|
||||
alloc := mock.Alloc()
|
||||
allocID := []byte(alloc.ID)
|
||||
|
||||
err = db.Update(func(tx *Tx) error {
|
||||
must.NoError(b, db.Update(func(tx *Tx) error {
|
||||
allocs, err := tx.CreateBucket(bucketName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var buf bytes.Buffer
|
||||
if err := codec.NewEncoder(&buf, structs.MsgpackHandle).Encode(alloc); err != nil {
|
||||
if err = codec.NewEncoder(&buf, structs.MsgpackHandle).Encode(alloc); err != nil {
|
||||
return fmt.Errorf("failed to encode passed object: %v", err)
|
||||
}
|
||||
|
||||
return allocs.Put(allocID, buf)
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
b.Fatalf("error setting up: %v", err)
|
||||
}
|
||||
}))
|
||||
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
err := db.Update(func(tx *Tx) error {
|
||||
must.NoError(b, db.Update(func(tx *Tx) error {
|
||||
var buf bytes.Buffer
|
||||
if err := codec.NewEncoder(&buf, structs.MsgpackHandle).Encode(alloc); err != nil {
|
||||
return fmt.Errorf("failed to encode passed object: %v", err)
|
||||
}
|
||||
|
||||
return tx.Bucket(bucketName).Put(allocID, buf)
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
b.Fatalf("error at runtime: %v", err)
|
||||
}
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue