diff --git a/helper/boltdd/boltdd.go b/helper/boltdd/boltdd.go index 7c29385d3..86273afd7 100644 --- a/helper/boltdd/boltdd.go +++ b/helper/boltdd/boltdd.go @@ -353,6 +353,35 @@ func (b *Bucket) Get(key []byte, obj interface{}) error { return nil } +// Iterate iterates each key in Bucket b that starts with prefix. fn is called on +// the key and msg-pack decoded value. If prefix is empty or nil, all keys in the +// bucket are iterated. +func Iterate[T any](b *Bucket, prefix []byte, fn func([]byte, T)) error { + c := b.boltBucket.Cursor() + for k, data := c.Seek(prefix); k != nil && bytes.HasPrefix(k, prefix); k, data = c.Next() { + var obj T + if err := codec.NewDecoderBytes(data, structs.MsgpackHandle).Decode(&obj); err != nil { + return fmt.Errorf("failed to decode data into passed object: %v", err) + } + fn(k, obj) + } + return nil +} + +// DeletePrefix removes all keys starting with prefix from the bucket. If no keys +// with prefix exist then nothing is done and a nil error is returned. Returns an +// error if the bucket was created from a read-only transaction. +func (b *Bucket) DeletePrefix(prefix []byte) error { + c := b.boltBucket.Cursor() + for k, _ := c.Seek(prefix); k != nil && bytes.HasPrefix(k, prefix); k, _ = c.Next() { + if err := c.Delete(); err != nil { + return err + } + b.bm.delHash(string(k)) + } + return nil +} + // Delete removes a key from the bucket. If the key does not exist then nothing // is done and a nil error is returned. Returns an error if the bucket was // created from a read-only transaction. diff --git a/helper/boltdd/boltdd_test.go b/helper/boltdd/boltdd_test.go index 0f217cd6a..553c39e71 100644 --- a/helper/boltdd/boltdd_test.go +++ b/helper/boltdd/boltdd_test.go @@ -10,21 +10,30 @@ import ( "github.com/hashicorp/nomad/ci" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" - "github.com/stretchr/testify/require" + "github.com/shoenig/test/must" "go.etcd.io/bbolt" ) +const ( + testDB = "nomad-test.db" + testDBPerms = 0600 +) + +// a simple struct type for testing msg pack en/decoding +type employee struct { + Name string + ID int +} + func setupBoltDB(t testing.TB) *DB { dir := t.TempDir() - dbFilename := filepath.Join(dir, "nomadtest.db") - db, err := Open(dbFilename, 0600, nil) - if err != nil { - t.Fatalf("error creating boltdb: %v", err) - } + dbFilename := filepath.Join(dir, testDB) + db, err := Open(dbFilename, testDBPerms, nil) + must.NoError(t, err) t.Cleanup(func() { - db.Close() + must.NoError(t, db.Close()) }) return db @@ -32,11 +41,8 @@ func setupBoltDB(t testing.TB) *DB { func TestDB_Open(t *testing.T) { ci.Parallel(t) - require := require.New(t) - db := setupBoltDB(t) - - require.Equal(0, db.BoltDB().Stats().TxStats.Write) + must.Zero(t, db.BoltDB().Stats().TxStats.Write) } func TestDB_Close(t *testing.T) { @@ -44,14 +50,14 @@ func TestDB_Close(t *testing.T) { db := setupBoltDB(t) - db.Close() + must.NoError(t, db.Close()) - require.Equal(t, db.Update(func(tx *Tx) error { + must.Eq(t, db.Update(func(tx *Tx) error { _, err := tx.CreateBucketIfNotExists([]byte("foo")) return err }), bbolt.ErrDatabaseNotOpen) - require.Equal(t, db.Update(func(tx *Tx) error { + must.Eq(t, db.Update(func(tx *Tx) error { _, err := tx.CreateBucket([]byte("foo")) return err }), bbolt.ErrDatabaseNotOpen) @@ -59,43 +65,125 @@ func TestDB_Close(t *testing.T) { func TestBucket_Create(t *testing.T) { ci.Parallel(t) - require := require.New(t) db := setupBoltDB(t) name := []byte("create_test") - require.NoError(db.Update(func(tx *Tx) error { + must.NoError(t, db.Update(func(tx *Tx) error { // Trying to get a nonexistent bucket should return nil - require.Nil(tx.Bucket(name)) + must.Nil(t, tx.Bucket(name)) // Creating a nonexistent bucket should work b, err := tx.CreateBucket(name) - require.NoError(err) - require.NotNil(b) + must.NoError(t, err) + must.NotNil(t, b) // Recreating a bucket that exists should fail b, err = tx.CreateBucket(name) - require.Error(err) - require.Nil(b) + must.Error(t, err) + must.Nil(t, b) // get or create should work b, err = tx.CreateBucketIfNotExists(name) - require.NoError(err) - require.NotNil(b) + must.NoError(t, err) + must.NotNil(t, b) return nil })) // Bucket should be visible - require.NoError(db.View(func(tx *Tx) error { - require.NotNil(tx.Bucket(name)) + must.NoError(t, db.View(func(tx *Tx) error { + must.NotNil(t, tx.Bucket(name)) return nil })) } +func TestBucket_Iterate(t *testing.T) { + ci.Parallel(t) + + db := setupBoltDB(t) + + bucket := []byte("iterate_test") + + must.NoError(t, db.Update(func(tx *Tx) error { + b, err := tx.CreateBucketIfNotExists(bucket) + must.NoError(t, err) + must.NotNil(t, b) + + must.NoError(t, b.Put([]byte("ceo"), employee{Name: "dave", ID: 15})) + must.NoError(t, b.Put([]byte("founder"), employee{Name: "mitchel", ID: 1})) + must.NoError(t, b.Put([]byte("cto"), employee{Name: "armon", ID: 2})) + return nil + })) + + t.Run("success", func(t *testing.T) { + var result []employee + err := db.View(func(tx *Tx) error { + b := tx.Bucket(bucket) + return Iterate(b, nil, func(key []byte, e employee) { + result = append(result, e) + }) + }) + must.NoError(t, err) + must.Eq(t, []employee{ + {"dave", 15}, {"armon", 2}, {"mitchel", 1}, + }, result) + }) + + t.Run("failure", func(t *testing.T) { + err := db.View(func(tx *Tx) error { + b := tx.Bucket(bucket) + // will fail to encode employee into an int + return Iterate(b, nil, func(key []byte, i int) { + must.True(t, false) // must not get here + }) + }) + must.Error(t, err) + }) +} + +func TestBucket_DeletePrefix(t *testing.T) { + ci.Parallel(t) + + db := setupBoltDB(t) + + bucket := []byte("delete_prefix_test") + + must.NoError(t, db.Update(func(tx *Tx) error { + b, err := tx.CreateBucketIfNotExists(bucket) + must.NoError(t, err) + must.NotNil(t, b) + + must.NoError(t, b.Put([]byte("exec_a"), employee{Name: "dave", ID: 15})) + must.NoError(t, b.Put([]byte("intern_a"), employee{Name: "alice", ID: 7384})) + must.NoError(t, b.Put([]byte("exec_c"), employee{Name: "armon", ID: 2})) + must.NoError(t, b.Put([]byte("intern_b"), employee{Name: "bob", ID: 7312})) + must.NoError(t, b.Put([]byte("exec_b"), employee{Name: "mitchel", ID: 1})) + return nil + })) + + // remove interns + must.NoError(t, db.Update(func(tx *Tx) error { + bkt := tx.Bucket(bucket) + return bkt.DeletePrefix([]byte("intern_")) + })) + + // assert 3 exec remain + var result []employee + err := db.View(func(tx *Tx) error { + bkt := tx.Bucket(bucket) + return Iterate(bkt, nil, func(k []byte, e employee) { + result = append(result, e) + }) + }) + must.NoError(t, err) + must.Eq(t, []employee{ + {"dave", 15}, {"mitchel", 1}, {"armon", 2}, + }, result) +} + func TestBucket_DedupeWrites(t *testing.T) { ci.Parallel(t) - require := require.New(t) db := setupBoltDB(t) @@ -104,38 +192,37 @@ func TestBucket_DedupeWrites(t *testing.T) { k2name := []byte("k2") // Put 2 keys - require.NoError(db.Update(func(tx *Tx) error { + must.NoError(t, db.Update(func(tx *Tx) error { b, err := tx.CreateBucket(bname) - require.NoError(err) - - require.NoError(b.Put(k1name, k1name)) - require.NoError(b.Put(k2name, k2name)) + must.NoError(t, err) + must.NoError(t, b.Put(k1name, k1name)) + must.NoError(t, b.Put(k2name, k2name)) return nil })) // Assert there was at least 1 write origWrites := db.BoltDB().Stats().TxStats.Write - require.NotZero(origWrites) + must.Greater(t, origWrites, 0) // Write the same values again and expect no new writes - require.NoError(db.Update(func(tx *Tx) error { + must.NoError(t, db.Update(func(tx *Tx) error { b := tx.Bucket(bname) - require.NoError(b.Put(k1name, k1name)) - require.NoError(b.Put(k2name, k2name)) + must.NoError(t, b.Put(k1name, k1name)) + must.NoError(t, b.Put(k2name, k2name)) return nil })) putWrites := db.BoltDB().Stats().TxStats.Write - // Unforunately every committed transaction causes two writes, so this + // Unfortunately every committed transaction causes two writes, so this // only saves 1 write operation - require.Equal(origWrites+2, putWrites) + must.Eq(t, origWrites+2, putWrites) // Write new values and assert more writes took place - require.NoError(db.Update(func(tx *Tx) error { + must.NoError(t, db.Update(func(tx *Tx) error { b := tx.Bucket(bname) - require.NoError(b.Put(k1name, []byte("newval1"))) - require.NoError(b.Put(k2name, []byte("newval2"))) + must.NoError(t, b.Put(k1name, []byte("newval1"))) + must.NoError(t, b.Put(k2name, []byte("newval2"))) return nil })) @@ -143,12 +230,11 @@ func TestBucket_DedupeWrites(t *testing.T) { // Expect 3 additional writes: 2 for the transaction and one for the // dirty page - require.Equal(putWrites+3, putWrites2) + must.Eq(t, putWrites+3, putWrites2) } func TestBucket_Delete(t *testing.T) { ci.Parallel(t) - require := require.New(t) db := setupBoltDB(t) @@ -162,93 +248,92 @@ func TestBucket_Delete(t *testing.T) { grandchildKey2 := []byte("grandchild_key2") // Create a parent bucket with 1 child and 2 grandchildren - require.NoError(db.Update(func(tx *Tx) error { + must.NoError(t, db.Update(func(tx *Tx) error { pb, err := tx.CreateBucket(parentName) - require.NoError(err) + must.NoError(t, err) - require.NoError(pb.Put(parentKey, parentKey)) + must.NoError(t, pb.Put(parentKey, parentKey)) child, err := pb.CreateBucket(childName) - require.NoError(err) + must.NoError(t, err) - require.NoError(child.Put(childKey, childKey)) + must.NoError(t, child.Put(childKey, childKey)) grandchild1, err := child.CreateBucket(grandchildName1) - require.NoError(err) + must.NoError(t, err) - require.NoError(grandchild1.Put(grandchildKey1, grandchildKey1)) + must.NoError(t, grandchild1.Put(grandchildKey1, grandchildKey1)) grandchild2, err := child.CreateBucket(grandchildName2) - require.NoError(err) + must.NoError(t, err) - require.NoError(grandchild2.Put(grandchildKey2, grandchildKey2)) + must.NoError(t, grandchild2.Put(grandchildKey2, grandchildKey2)) return nil })) // Verify grandchild keys wrote - require.NoError(db.View(func(tx *Tx) error { + must.NoError(t, db.View(func(tx *Tx) error { grandchild1 := tx.Bucket(parentName).Bucket(childName).Bucket(grandchildName1) var v1 []byte - grandchild1.Get(grandchildKey1, &v1) - require.Equal(grandchildKey1, v1) + must.NoError(t, grandchild1.Get(grandchildKey1, &v1)) + must.Eq(t, grandchildKey1, v1) grandchild2 := tx.Bucket(parentName).Bucket(childName).Bucket(grandchildName2) var v2 []byte - grandchild2.Get(grandchildKey2, &v2) - require.Equal(grandchildKey2, v2) + must.NoError(t, grandchild2.Get(grandchildKey2, &v2)) + must.Eq(t, grandchildKey2, v2) return nil })) // Delete grandchildKey1 and grandchild2 - require.NoError(db.Update(func(tx *Tx) error { + must.NoError(t, db.Update(func(tx *Tx) error { child := tx.Bucket(parentName).Bucket(childName) - - require.NoError(child.DeleteBucket(grandchildName2)) + must.NoError(t, child.DeleteBucket(grandchildName2)) grandchild1 := child.Bucket(grandchildName1) - require.NoError(grandchild1.Delete(grandchildKey1)) + must.NoError(t, grandchild1.Delete(grandchildKey1)) return nil })) // Ensure grandchild2 alone was deleted - require.NoError(db.View(func(tx *Tx) error { + must.NoError(t, db.View(func(tx *Tx) error { grandchild1 := tx.Bucket(parentName).Bucket(childName).Bucket(grandchildName1) var v1 []byte - grandchild1.Get(grandchildKey1, &v1) - require.Equal(([]byte)(nil), v1) + must.Error(t, grandchild1.Get(grandchildKey1, &v1)) + must.Eq(t, ([]byte)(nil), v1) grandchild2 := tx.Bucket(parentName).Bucket(childName).Bucket(grandchildName2) - require.Nil(grandchild2) + must.Nil(t, grandchild2) return nil })) // Deleting child bucket should delete grandchild1 as well - require.NoError(db.Update(func(tx *Tx) error { + must.NoError(t, db.Update(func(tx *Tx) error { parent := tx.Bucket(parentName) - require.NoError(parent.DeleteBucket(childName)) + must.NoError(t, parent.DeleteBucket(childName)) // Recreate child bucket and ensure childKey and grandchild are gone child, err := parent.CreateBucket(childName) - require.NoError(err) + must.NoError(t, err) var v []byte err = child.Get(childKey, &v) - require.Error(err) - require.True(IsErrNotFound(err)) - require.Equal(([]byte)(nil), v) + must.Error(t, err) + must.True(t, IsErrNotFound(err)) + must.Eq(t, ([]byte)(nil), v) - require.Nil(child.Bucket(grandchildName1)) + must.Nil(t, child.Bucket(grandchildName1)) - // Rewrite childKey1 to make sure it doesn't get dedupe incorrectly - require.NoError(child.Put(childKey, childKey)) + // Rewrite childKey1 to make sure it doesn't get de-dupe incorrectly + must.NoError(t, child.Put(childKey, childKey)) return nil })) - // Ensure childKey1 was rewritten and not deduped incorrectly - require.NoError(db.View(func(tx *Tx) error { + // Ensure childKey1 was rewritten and not de-duped incorrectly + must.NoError(t, db.View(func(tx *Tx) error { var v []byte - require.NoError(tx.Bucket(parentName).Bucket(childName).Get(childKey, &v)) - require.Equal(childKey, v) + must.NoError(t, tx.Bucket(parentName).Bucket(childName).Get(childKey, &v)) + must.Eq(t, childKey, v) return nil })) } @@ -260,77 +345,60 @@ func BenchmarkWriteDeduplication_On(b *testing.B) { alloc := mock.Alloc() allocID := []byte(alloc.ID) - err := db.Update(func(tx *Tx) error { + must.NoError(b, db.Update(func(tx *Tx) error { allocs, err := tx.CreateBucket(bucketName) if err != nil { return err } return allocs.Put(allocID, alloc) - }) - - if err != nil { - b.Fatalf("error setting up: %v", err) - } + })) b.ResetTimer() for i := 0; i < b.N; i++ { - err := db.Update(func(tx *Tx) error { + must.NoError(b, db.Update(func(tx *Tx) error { return tx.Bucket(bucketName).Put(allocID, alloc) - }) - - if err != nil { - b.Fatalf("error at runtime: %v", err) - } + })) } } func BenchmarkWriteDeduplication_Off(b *testing.B) { dir := b.TempDir() - dbFilename := filepath.Join(dir, "nomadtest.db") - db, err := Open(dbFilename, 0600, nil) - if err != nil { - b.Fatalf("error creating boltdb: %v", err) - } + dbFilename := filepath.Join(dir, testDB) + db, openErr := Open(dbFilename, testDBPerms, nil) + must.NoError(b, openErr) - defer db.Close() + b.Cleanup(func() { + must.NoError(b, db.Close()) + }) bucketName := []byte("allocations") alloc := mock.Alloc() allocID := []byte(alloc.ID) - err = db.Update(func(tx *Tx) error { + must.NoError(b, db.Update(func(tx *Tx) error { allocs, err := tx.CreateBucket(bucketName) if err != nil { return err } var buf bytes.Buffer - if err := codec.NewEncoder(&buf, structs.MsgpackHandle).Encode(alloc); err != nil { + if err = codec.NewEncoder(&buf, structs.MsgpackHandle).Encode(alloc); err != nil { return fmt.Errorf("failed to encode passed object: %v", err) } return allocs.Put(allocID, buf) - }) - - if err != nil { - b.Fatalf("error setting up: %v", err) - } + })) b.ResetTimer() for i := 0; i < b.N; i++ { - err := db.Update(func(tx *Tx) error { + must.NoError(b, db.Update(func(tx *Tx) error { var buf bytes.Buffer if err := codec.NewEncoder(&buf, structs.MsgpackHandle).Encode(alloc); err != nil { return fmt.Errorf("failed to encode passed object: %v", err) } - return tx.Bucket(bucketName).Put(allocID, buf) - }) - - if err != nil { - b.Fatalf("error at runtime: %v", err) - } + })) } }