wrap boltdb in a write deduplicator
Saves a tiny bit of cpu and some IO. Sadly doesn't prevent all IO on duplicate writes as the transactions are still created and committed. $ go test -bench=. -benchmem goos: linux goarch: amd64 pkg: github.com/hashicorp/nomad/helper/boltdd BenchmarkWriteDeduplication_On-4 500 4059591 ns/op 23736 B/op 56 allocs/op BenchmarkWriteDeduplication_Off-4 300 4115319 ns/op 25942 B/op 55 allocs/op
This commit is contained in:
parent
990228a6e2
commit
820af27171
|
@ -15,7 +15,6 @@ import (
|
|||
"github.com/hashicorp/nomad/client/allocrunner/taskrunner"
|
||||
"github.com/hashicorp/nomad/client/config"
|
||||
consulApi "github.com/hashicorp/nomad/client/consul"
|
||||
"github.com/hashicorp/nomad/client/state"
|
||||
"github.com/hashicorp/nomad/client/vaultclient"
|
||||
"github.com/hashicorp/nomad/helper"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
|
@ -432,15 +431,16 @@ func (r *AllocRunner) saveAllocRunnerState() error {
|
|||
|
||||
// DestroyState is used to cleanup after ourselves
|
||||
func (r *AllocRunner) DestroyState() error {
|
||||
r.allocStateLock.Lock()
|
||||
defer r.allocStateLock.Unlock()
|
||||
//r.allocStateLock.Lock()
|
||||
//defer r.allocStateLock.Unlock()
|
||||
|
||||
return r.stateDB.Update(func(tx *bolt.Tx) error {
|
||||
if err := state.DeleteAllocationBucket(tx, r.allocID); err != nil {
|
||||
return fmt.Errorf("failed to delete allocation bucket: %v", err)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
//return r.stateDB.Update(func(tx *bolt.Tx) error {
|
||||
// if err := state.DeleteAllocationBucket(tx, r.allocID); err != nil {
|
||||
// return fmt.Errorf("failed to delete allocation bucket: %v", err)
|
||||
// }
|
||||
// return nil
|
||||
//})
|
||||
panic("deprecated: use allocrunnerv2")
|
||||
}
|
||||
|
||||
// DestroyContext is used to destroy the context
|
||||
|
|
|
@ -26,7 +26,6 @@ import (
|
|||
"github.com/hashicorp/nomad/client/config"
|
||||
consulApi "github.com/hashicorp/nomad/client/consul"
|
||||
"github.com/hashicorp/nomad/client/driver"
|
||||
"github.com/hashicorp/nomad/client/state"
|
||||
"github.com/hashicorp/nomad/client/vaultclient"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
"github.com/ugorji/go/codec"
|
||||
|
@ -536,15 +535,17 @@ func (r *TaskRunner) SaveState() error {
|
|||
|
||||
// DestroyState is used to cleanup after ourselves
|
||||
func (r *TaskRunner) DestroyState() error {
|
||||
r.persistLock.Lock()
|
||||
defer r.persistLock.Unlock()
|
||||
//r.persistLock.Lock()
|
||||
//defer r.persistLock.Unlock()
|
||||
|
||||
return r.stateDB.Update(func(tx *bolt.Tx) error {
|
||||
if err := state.DeleteTaskBucket(tx, r.alloc.ID, r.task.Name); err != nil {
|
||||
return fmt.Errorf("failed to delete task bucket: %v", err)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
//return r.stateDB.Update(func(tx *bolt.Tx) error {
|
||||
// if err := state.DeleteTaskBucket(tx, r.alloc.ID, r.task.Name); err != nil {
|
||||
// return fmt.Errorf("failed to delete task bucket: %v", err)
|
||||
// }
|
||||
// return nil
|
||||
//})
|
||||
//XXX Deprecated: see allocrunnerv2
|
||||
panic("deprecated")
|
||||
}
|
||||
|
||||
// setState is used to update the state of the task runner
|
||||
|
|
|
@ -1,132 +0,0 @@
|
|||
package state
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"io"
|
||||
"sync"
|
||||
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
"github.com/ugorji/go/codec"
|
||||
"golang.org/x/crypto/blake2b"
|
||||
)
|
||||
|
||||
type kvStore interface {
|
||||
Path() []byte
|
||||
Bucket(name []byte) kvStore
|
||||
CreateBucket(key []byte) (kvStore, error)
|
||||
CreateBucketIfNotExists(key []byte) (kvStore, error)
|
||||
DeleteBucket(key []byte) error
|
||||
Get(key []byte) (val []byte)
|
||||
Put(key, val []byte) error
|
||||
}
|
||||
|
||||
// keyValueCodec handles encoding and decoding values from a key/value store
|
||||
// such as boltdb.
|
||||
type keyValueCodec struct {
|
||||
// hashes maps buckets to keys to the hash of the last content written:
|
||||
// bucket -> key -> hash for example:
|
||||
// allocations/1234 -> alloc -> abcd
|
||||
// allocations/1234/redis -> task_state -> efff
|
||||
hashes map[string]map[string][]byte
|
||||
hashesLock sync.Mutex
|
||||
}
|
||||
|
||||
func newKeyValueCodec() *keyValueCodec {
|
||||
return &keyValueCodec{
|
||||
hashes: make(map[string]map[string][]byte),
|
||||
}
|
||||
}
|
||||
|
||||
// Put into kv store iff it has changed since the last write. A globally
|
||||
// unique key is constructed for each value by concatinating the path and key
|
||||
// passed in.
|
||||
func (c *keyValueCodec) Put(bkt kvStore, key []byte, val interface{}) error {
|
||||
// buffer for writing serialized state to
|
||||
var buf bytes.Buffer
|
||||
|
||||
// Hash for skipping unnecessary writes
|
||||
h, err := blake2b.New256(nil)
|
||||
if err != nil {
|
||||
// Programming error that should never happen!
|
||||
return err
|
||||
}
|
||||
|
||||
// Multiplex writes to both hasher and buffer
|
||||
w := io.MultiWriter(h, &buf)
|
||||
|
||||
// Serialize the object
|
||||
if err := codec.NewEncoder(w, structs.MsgpackHandle).Encode(val); err != nil {
|
||||
return fmt.Errorf("failed to encode passed object: %v", err)
|
||||
}
|
||||
|
||||
// If the hashes are equal, skip the write
|
||||
hashPath := string(bkt.Path())
|
||||
hashKey := string(key)
|
||||
hashVal := h.Sum(nil)
|
||||
|
||||
// lastHash value or nil if it hasn't been hashed yet
|
||||
var lastHash []byte
|
||||
|
||||
c.hashesLock.Lock()
|
||||
if hashBkt, ok := c.hashes[hashPath]; ok {
|
||||
lastHash = hashBkt[hashKey]
|
||||
} else {
|
||||
// Create hash bucket
|
||||
c.hashes[hashPath] = make(map[string][]byte, 2)
|
||||
}
|
||||
c.hashesLock.Unlock()
|
||||
|
||||
if bytes.Equal(hashVal, lastHash) {
|
||||
return nil
|
||||
}
|
||||
|
||||
// New value: write it to the underlying store
|
||||
if err := bkt.Put(key, buf.Bytes()); err != nil {
|
||||
return fmt.Errorf("failed to write data at key %s: %v", key, err)
|
||||
}
|
||||
|
||||
// New value written, store hash (bucket path map was created above)
|
||||
c.hashesLock.Lock()
|
||||
c.hashes[hashPath][hashKey] = hashVal
|
||||
c.hashesLock.Unlock()
|
||||
|
||||
return nil
|
||||
|
||||
}
|
||||
|
||||
// Get value by key from boltdb.
|
||||
func (c *keyValueCodec) Get(bkt kvStore, key []byte, obj interface{}) error {
|
||||
// Get the data
|
||||
data := bkt.Get(key)
|
||||
if data == nil {
|
||||
return fmt.Errorf("no data at key %v", string(key))
|
||||
}
|
||||
|
||||
// Deserialize the object
|
||||
if err := codec.NewDecoderBytes(data, structs.MsgpackHandle).Decode(obj); err != nil {
|
||||
return fmt.Errorf("failed to decode data into passed object: %v", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// DeleteBucket or do nothing if bucket doesn't exist.
|
||||
func (c *keyValueCodec) DeleteBucket(parent kvStore, bktName []byte) error {
|
||||
// Get the path of the bucket being deleted
|
||||
bkt := parent.Bucket(bktName)
|
||||
if bkt == nil {
|
||||
// Doesn't exist! Nothing to delete
|
||||
return nil
|
||||
}
|
||||
|
||||
// Delete the bucket
|
||||
err := parent.DeleteBucket(bktName)
|
||||
|
||||
// Always purge all corresponding hashes to prevent memory leaks
|
||||
c.hashesLock.Lock()
|
||||
delete(c.hashes, string(bkt.Path()))
|
||||
c.hashesLock.Unlock()
|
||||
|
||||
return err
|
||||
}
|
|
@ -1,80 +0,0 @@
|
|||
package state
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
// mockKVStore tracks puts and is useful for testing KVCodec's write-on-change
|
||||
// code.
|
||||
type mockKVStore struct {
|
||||
puts int
|
||||
}
|
||||
|
||||
func (mockKVStore) Path() []byte {
|
||||
return []byte{}
|
||||
}
|
||||
|
||||
func (m *mockKVStore) Bucket(name []byte) kvStore {
|
||||
return m
|
||||
}
|
||||
|
||||
func (m *mockKVStore) CreateBucket(key []byte) (kvStore, error) {
|
||||
return m, nil
|
||||
}
|
||||
|
||||
func (m *mockKVStore) CreateBucketIfNotExists(key []byte) (kvStore, error) {
|
||||
return m, nil
|
||||
}
|
||||
|
||||
func (m *mockKVStore) DeleteBucket(key []byte) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (mockKVStore) Get(key []byte) (val []byte) {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *mockKVStore) Put(key, val []byte) error {
|
||||
m.puts++
|
||||
return nil
|
||||
}
|
||||
|
||||
// TestKVCodec_PutHash asserts that Puts on the underlying kvstore only occur
|
||||
// when the data actually changes.
|
||||
func TestKVCodec_PutHash(t *testing.T) {
|
||||
require := require.New(t)
|
||||
codec := newKeyValueCodec()
|
||||
|
||||
// Create arguments for Put
|
||||
kv := new(mockKVStore)
|
||||
key := []byte("key1")
|
||||
val := &struct {
|
||||
Val int
|
||||
}{
|
||||
Val: 1,
|
||||
}
|
||||
|
||||
// Initial Put should be written
|
||||
require.NoError(codec.Put(kv, key, val))
|
||||
require.Equal(1, kv.puts)
|
||||
|
||||
// Writing the same values again should be a noop
|
||||
require.NoError(codec.Put(kv, key, val))
|
||||
require.Equal(1, kv.puts)
|
||||
|
||||
// Changing the value should write again
|
||||
val.Val++
|
||||
require.NoError(codec.Put(kv, key, val))
|
||||
require.Equal(2, kv.puts)
|
||||
|
||||
// Changing the key should write again
|
||||
key = []byte("key2")
|
||||
require.NoError(codec.Put(kv, key, val))
|
||||
require.Equal(3, kv.puts)
|
||||
|
||||
// Writing the same values again should be a noop
|
||||
require.NoError(codec.Put(kv, key, val))
|
||||
require.Equal(3, kv.puts)
|
||||
}
|
|
@ -1,123 +0,0 @@
|
|||
package state
|
||||
|
||||
import "github.com/boltdb/bolt"
|
||||
|
||||
// namedBucket is a wrapper around bolt.Bucket's to preserve their path
|
||||
// information and expose it via the Path() method.
|
||||
//
|
||||
// Knowing the full bucket path to a key is necessary for tracking accesses in
|
||||
// another datastructure such as the hashing writer keyValueCodec.
|
||||
type namedBucket struct {
|
||||
path []byte
|
||||
name []byte
|
||||
bkt *bolt.Bucket
|
||||
}
|
||||
|
||||
// newNamedBucket from a bolt transaction.
|
||||
func newNamedBucket(tx *bolt.Tx, root []byte) *namedBucket {
|
||||
b := tx.Bucket(root)
|
||||
if b == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
return &namedBucket{
|
||||
path: root,
|
||||
name: root,
|
||||
bkt: b,
|
||||
}
|
||||
}
|
||||
|
||||
// createNamedBucketIfNotExists from a bolt transaction.
|
||||
func createNamedBucketIfNotExists(tx *bolt.Tx, root []byte) (*namedBucket, error) {
|
||||
b, err := tx.CreateBucketIfNotExists(root)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &namedBucket{
|
||||
path: root,
|
||||
name: root,
|
||||
bkt: b,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Path to this bucket (including this bucket).
|
||||
func (n *namedBucket) Path() []byte {
|
||||
return n.path
|
||||
}
|
||||
|
||||
// Name of this bucket.
|
||||
func (n *namedBucket) Name() []byte {
|
||||
return n.name
|
||||
}
|
||||
|
||||
// Bucket returns a bucket inside the current one or nil if the bucket does not
|
||||
// exist.
|
||||
func (n *namedBucket) Bucket(name []byte) kvStore {
|
||||
b := n.bkt.Bucket(name)
|
||||
if b == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
return &namedBucket{
|
||||
path: n.chBkt(name),
|
||||
name: name,
|
||||
bkt: b,
|
||||
}
|
||||
}
|
||||
|
||||
// CreateBucketIfNotExists creates a bucket if it doesn't exist and returns it
|
||||
// or an error.
|
||||
func (n *namedBucket) CreateBucketIfNotExists(name []byte) (kvStore, error) {
|
||||
b, err := n.bkt.CreateBucketIfNotExists(name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &namedBucket{
|
||||
path: n.chBkt(name),
|
||||
name: name,
|
||||
bkt: b,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// CreateBucket creates a bucket and returns it.
|
||||
func (n *namedBucket) CreateBucket(name []byte) (kvStore, error) {
|
||||
b, err := n.bkt.CreateBucket(name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &namedBucket{
|
||||
path: n.chBkt(name),
|
||||
name: name,
|
||||
bkt: b,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// DeleteBucket calls DeleteBucket on the underlying bolt.Bucket.
|
||||
func (n *namedBucket) DeleteBucket(name []byte) error {
|
||||
return n.bkt.DeleteBucket(name)
|
||||
}
|
||||
|
||||
// Get calls Get on the underlying bolt.Bucket.
|
||||
func (n *namedBucket) Get(key []byte) []byte {
|
||||
return n.bkt.Get(key)
|
||||
}
|
||||
|
||||
// Put calls Put on the underlying bolt.Bucket.
|
||||
func (n *namedBucket) Put(key, value []byte) error {
|
||||
return n.bkt.Put(key, value)
|
||||
}
|
||||
|
||||
// chBkt is like chdir but for buckets: it appends the new name to the end of
|
||||
// a copy of the path and returns it.
|
||||
func (n *namedBucket) chBkt(name []byte) []byte {
|
||||
// existing path + new path element + path separator
|
||||
path := make([]byte, len(n.path)+len(name)+1)
|
||||
copy(path[0:len(n.path)], n.path)
|
||||
path[len(n.path)] = '/'
|
||||
copy(path[len(n.path)+1:], name)
|
||||
|
||||
return path
|
||||
}
|
|
@ -1,151 +0,0 @@
|
|||
package state
|
||||
|
||||
import (
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
|
||||
"github.com/boltdb/bolt"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func setupBoltDB(t *testing.T) (*bolt.DB, func()) {
|
||||
dir, err := ioutil.TempDir("", "nomadtest_")
|
||||
require.NoError(t, err)
|
||||
cleanup := func() {
|
||||
if err := os.RemoveAll(dir); err != nil {
|
||||
t.Logf("error removing test dir: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
dbFilename := filepath.Join(dir, "nomadtest.db")
|
||||
db, err := bolt.Open(dbFilename, 0600, nil)
|
||||
if err != nil {
|
||||
cleanup()
|
||||
t.Fatalf("error creating boltdb: %v", err)
|
||||
}
|
||||
|
||||
return db, func() {
|
||||
db.Close()
|
||||
cleanup()
|
||||
}
|
||||
}
|
||||
|
||||
// TestNamedBucket_Path asserts that creating and changing buckets are tracked
|
||||
// properly by the namedBucket wrapper.
|
||||
func TestNamedBucket_Path(t *testing.T) {
|
||||
t.Parallel()
|
||||
require := require.New(t)
|
||||
|
||||
db, cleanup := setupBoltDB(t)
|
||||
defer cleanup()
|
||||
|
||||
parentBktName, childBktName := []byte("root"), []byte("child")
|
||||
parentKey, parentVal := []byte("pkey"), []byte("pval")
|
||||
childKey, childVal := []byte("ckey"), []byte("cval")
|
||||
|
||||
require.NoError(db.Update(func(tx *bolt.Tx) error {
|
||||
// Trying to open a named bucket from a nonexistent bucket
|
||||
// should return nil.
|
||||
require.Nil(newNamedBucket(tx, []byte("nonexistent")))
|
||||
|
||||
// Creating a named bucket from a bolt tx should work and set
|
||||
// the path and name properly.
|
||||
b, err := createNamedBucketIfNotExists(tx, parentBktName)
|
||||
require.NoError(err)
|
||||
require.Equal(parentBktName, b.Name())
|
||||
require.Equal(parentBktName, b.Path())
|
||||
|
||||
// Trying to descend into a nonexistent bucket should return
|
||||
// nil.
|
||||
require.Nil(b.Bucket([]byte("nonexistent")))
|
||||
|
||||
// Descending into a new bucket should update the path.
|
||||
childBkt, err := b.CreateBucket(childBktName)
|
||||
require.NoError(err)
|
||||
require.Equal(childBktName, childBkt.(*namedBucket).Name())
|
||||
require.Equal([]byte("root/child"), childBkt.Path())
|
||||
|
||||
// Assert the parent bucket did not get changed.
|
||||
require.Equal(parentBktName, b.Name())
|
||||
require.Equal(parentBktName, b.Path())
|
||||
|
||||
// Add entries to both buckets
|
||||
require.NoError(b.Put(parentKey, parentVal))
|
||||
require.NoError(childBkt.Put(childKey, childVal))
|
||||
return nil
|
||||
}))
|
||||
|
||||
// Read buckets and values back out
|
||||
require.NoError(db.View(func(tx *bolt.Tx) error {
|
||||
b := newNamedBucket(tx, parentBktName)
|
||||
require.NotNil(b)
|
||||
require.Equal(parentVal, b.Get(parentKey))
|
||||
require.Nil(b.Get(childKey))
|
||||
|
||||
childBkt := b.Bucket(childBktName)
|
||||
require.NotNil(childBkt)
|
||||
require.Nil(childBkt.Get(parentKey))
|
||||
require.Equal(childVal, childBkt.Get(childKey))
|
||||
return nil
|
||||
}))
|
||||
}
|
||||
|
||||
// TestNamedBucket_DeleteBucket asserts that deleting a bucket properly purges
|
||||
// all related keys from the internal hashes map.
|
||||
func TestNamedBucket_DeleteBucket(t *testing.T) {
|
||||
t.Parallel()
|
||||
require := require.New(t)
|
||||
|
||||
db, cleanup := setupBoltDB(t)
|
||||
defer cleanup()
|
||||
|
||||
// Create some nested buckets and keys (key values will just be their names)
|
||||
b1Name, c1Name, c2Name, c1c1Name := []byte("b1"), []byte("c1"), []byte("c2"), []byte("c1c1")
|
||||
b1k1, c1k1, c2k1, c1c1k1 := []byte("b1k1"), []byte("c1k1"), []byte("c2k1"), []byte("c1c1k1")
|
||||
|
||||
codec := newKeyValueCodec()
|
||||
|
||||
// Create initial db state
|
||||
require.NoError(db.Update(func(tx *bolt.Tx) error {
|
||||
// Create bucket 1 and key
|
||||
b1, err := createNamedBucketIfNotExists(tx, b1Name)
|
||||
require.NoError(err)
|
||||
require.NoError(codec.Put(b1, b1k1, b1k1))
|
||||
|
||||
// Create child bucket 1 and key
|
||||
c1, err := b1.CreateBucketIfNotExists(c1Name)
|
||||
require.NoError(err)
|
||||
require.NoError(codec.Put(c1, c1k1, c1k1))
|
||||
|
||||
// Create child-child bucket 1 and key
|
||||
c1c1, err := c1.CreateBucketIfNotExists(c1c1Name)
|
||||
require.NoError(err)
|
||||
require.NoError(codec.Put(c1c1, c1c1k1, c1c1k1))
|
||||
|
||||
// Create child bucket 2 and key
|
||||
c2, err := b1.CreateBucketIfNotExists(c2Name)
|
||||
require.NoError(err)
|
||||
require.NoError(codec.Put(c2, c2k1, c2k1))
|
||||
return nil
|
||||
}))
|
||||
|
||||
// codec should be tracking 4 hash buckets (b1, c1, c2, c1c1)
|
||||
require.Len(codec.hashes, 4)
|
||||
|
||||
// Delete c1
|
||||
require.NoError(db.Update(func(tx *bolt.Tx) error {
|
||||
b1 := newNamedBucket(tx, b1Name)
|
||||
return codec.DeleteBucket(b1, c1Name)
|
||||
}))
|
||||
|
||||
START HERE // We don't appear to be properly deleting the sub-bucket
|
||||
// codec should be tracking 2 hash buckets (b1, c2)
|
||||
require.Len(codec.hashes, 2)
|
||||
|
||||
// Assert all of c1 is gone
|
||||
require.NoError(db.View(func(tx *bolt.Tx) error {
|
||||
return nil
|
||||
}))
|
||||
}
|
|
@ -4,8 +4,8 @@ import (
|
|||
"fmt"
|
||||
"path/filepath"
|
||||
|
||||
"github.com/boltdb/bolt"
|
||||
trstate "github.com/hashicorp/nomad/client/allocrunnerv2/taskrunner/state"
|
||||
"github.com/hashicorp/nomad/helper/boltdd"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
)
|
||||
|
||||
|
@ -57,20 +57,18 @@ func GetStateDBFactory(devMode bool) NewStateDBFunc {
|
|||
// methods are safe for concurrent access. Create via NewStateDB by setting
|
||||
// devMode=false.
|
||||
type BoltStateDB struct {
|
||||
db *bolt.DB
|
||||
codec *keyValueCodec
|
||||
db *boltdd.DB
|
||||
}
|
||||
|
||||
func NewBoltStateDB(stateDir string) (StateDB, error) {
|
||||
// Create or open the boltdb state database
|
||||
db, err := bolt.Open(filepath.Join(stateDir, "state.db"), 0600, nil)
|
||||
db, err := boltdd.Open(filepath.Join(stateDir, "state.db"), 0600, nil)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create state database: %v", err)
|
||||
}
|
||||
|
||||
sdb := &BoltStateDB{
|
||||
db: db,
|
||||
codec: newKeyValueCodec(),
|
||||
db: db,
|
||||
}
|
||||
return sdb, nil
|
||||
}
|
||||
|
@ -83,7 +81,7 @@ func NewBoltStateDB(stateDir string) (StateDB, error) {
|
|||
func (s *BoltStateDB) GetAllAllocations() ([]*structs.Allocation, map[string]error, error) {
|
||||
var allocs []*structs.Allocation
|
||||
var errs map[string]error
|
||||
err := s.db.View(func(tx *bolt.Tx) error {
|
||||
err := s.db.View(func(tx *boltdd.Tx) error {
|
||||
allocs, errs = s.getAllAllocations(tx)
|
||||
return nil
|
||||
})
|
||||
|
@ -101,8 +99,8 @@ type allocEntry struct {
|
|||
Alloc *structs.Allocation
|
||||
}
|
||||
|
||||
func (s *BoltStateDB) getAllAllocations(tx *bolt.Tx) ([]*structs.Allocation, map[string]error) {
|
||||
allocationsBkt := newNamedBucket(tx, allocationsBucket)
|
||||
func (s *BoltStateDB) getAllAllocations(tx *boltdd.Tx) ([]*structs.Allocation, map[string]error) {
|
||||
allocationsBkt := tx.Bucket(allocationsBucket)
|
||||
if allocationsBkt == nil {
|
||||
// No allocs
|
||||
return nil, nil
|
||||
|
@ -112,7 +110,7 @@ func (s *BoltStateDB) getAllAllocations(tx *bolt.Tx) ([]*structs.Allocation, map
|
|||
errs := map[string]error{}
|
||||
|
||||
// Create a cursor for iteration.
|
||||
c := allocationsBkt.bkt.Cursor()
|
||||
c := allocationsBkt.BoltBucket().Cursor()
|
||||
|
||||
// Iterate over all the allocation buckets
|
||||
for k, _ := c.First(); k != nil; k, _ = c.Next() {
|
||||
|
@ -124,7 +122,7 @@ func (s *BoltStateDB) getAllAllocations(tx *bolt.Tx) ([]*structs.Allocation, map
|
|||
}
|
||||
|
||||
var allocState allocEntry
|
||||
if err := s.codec.Get(allocBkt, allocKey, &allocState); err != nil {
|
||||
if err := allocBkt.Get(allocKey, &allocState); err != nil {
|
||||
errs[allocID] = fmt.Errorf("failed to decode alloc %v", err)
|
||||
continue
|
||||
}
|
||||
|
@ -137,9 +135,9 @@ func (s *BoltStateDB) getAllAllocations(tx *bolt.Tx) ([]*structs.Allocation, map
|
|||
|
||||
// PutAllocation stores an allocation or returns an error.
|
||||
func (s *BoltStateDB) PutAllocation(alloc *structs.Allocation) error {
|
||||
return s.db.Update(func(tx *bolt.Tx) error {
|
||||
return s.db.Update(func(tx *boltdd.Tx) error {
|
||||
// Retrieve the root allocations bucket
|
||||
allocsBkt, err := createNamedBucketIfNotExists(tx, allocationsBucket)
|
||||
allocsBkt, err := tx.CreateBucketIfNotExists(allocationsBucket)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -154,7 +152,7 @@ func (s *BoltStateDB) PutAllocation(alloc *structs.Allocation) error {
|
|||
allocState := allocEntry{
|
||||
Alloc: alloc,
|
||||
}
|
||||
return s.codec.Put(allocBkt, allocKey, &allocState)
|
||||
return allocBkt.Put(allocKey, &allocState)
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -164,7 +162,7 @@ func (s *BoltStateDB) GetTaskRunnerState(allocID, taskName string) (*trstate.Loc
|
|||
var ls trstate.LocalState
|
||||
var ts structs.TaskState
|
||||
|
||||
err := s.db.View(func(tx *bolt.Tx) error {
|
||||
err := s.db.View(func(tx *boltdd.Tx) error {
|
||||
bkt, err := getTaskBucket(tx, allocID, taskName)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get task %q bucket: %v", taskName, err)
|
||||
|
@ -172,12 +170,12 @@ func (s *BoltStateDB) GetTaskRunnerState(allocID, taskName string) (*trstate.Loc
|
|||
|
||||
// Restore Local State
|
||||
//XXX set persisted hash to avoid immediate write on first use?
|
||||
if err := s.codec.Get(bkt, taskLocalStateKey, &ls); err != nil {
|
||||
if err := bkt.Get(taskLocalStateKey, &ls); err != nil {
|
||||
return fmt.Errorf("failed to read local task runner state: %v", err)
|
||||
}
|
||||
|
||||
// Restore Task State
|
||||
if err := s.codec.Get(bkt, taskStateKey, &ts); err != nil {
|
||||
if err := bkt.Get(taskStateKey, &ts); err != nil {
|
||||
return fmt.Errorf("failed to read task state: %v", err)
|
||||
}
|
||||
|
||||
|
@ -199,13 +197,13 @@ func (s *BoltStateDB) GetTaskRunnerState(allocID, taskName string) (*trstate.Loc
|
|||
// PutTaskRunnerLocalState stores TaskRunner's LocalState or returns an error.
|
||||
// It is up to the caller to serialize the state to bytes.
|
||||
func (s *BoltStateDB) PutTaskRunnerLocalState(allocID, taskName string, val interface{}) error {
|
||||
return s.db.Update(func(tx *bolt.Tx) error {
|
||||
return s.db.Update(func(tx *boltdd.Tx) error {
|
||||
taskBkt, err := getTaskBucket(tx, allocID, taskName)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to retrieve allocation bucket: %v", err)
|
||||
}
|
||||
|
||||
if err := s.codec.Put(taskBkt, taskLocalStateKey, val); err != nil {
|
||||
if err := taskBkt.Put(taskLocalStateKey, val); err != nil {
|
||||
return fmt.Errorf("failed to write task_runner state: %v", err)
|
||||
}
|
||||
|
||||
|
@ -215,21 +213,21 @@ func (s *BoltStateDB) PutTaskRunnerLocalState(allocID, taskName string, val inte
|
|||
|
||||
// PutTaskState stores a task's state or returns an error.
|
||||
func (s *BoltStateDB) PutTaskState(allocID, taskName string, state *structs.TaskState) error {
|
||||
return s.db.Update(func(tx *bolt.Tx) error {
|
||||
return s.db.Update(func(tx *boltdd.Tx) error {
|
||||
taskBkt, err := getTaskBucket(tx, allocID, taskName)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to retrieve allocation bucket: %v", err)
|
||||
}
|
||||
|
||||
return s.codec.Put(taskBkt, taskStateKey, state)
|
||||
return taskBkt.Put(taskStateKey, state)
|
||||
})
|
||||
}
|
||||
|
||||
// DeleteTaskBucket is used to delete a task bucket if it exists.
|
||||
func (s *BoltStateDB) DeleteTaskBucket(allocID, taskName string) error {
|
||||
return s.db.Update(func(tx *bolt.Tx) error {
|
||||
return s.db.Update(func(tx *boltdd.Tx) error {
|
||||
// Retrieve the root allocations bucket
|
||||
allocations := newNamedBucket(tx, allocationsBucket)
|
||||
allocations := tx.Bucket(allocationsBucket)
|
||||
if allocations == nil {
|
||||
return nil
|
||||
}
|
||||
|
@ -242,21 +240,21 @@ func (s *BoltStateDB) DeleteTaskBucket(allocID, taskName string) error {
|
|||
|
||||
// Check if the bucket exists
|
||||
key := []byte(taskName)
|
||||
return s.codec.DeleteBucket(alloc, key)
|
||||
return alloc.DeleteBucket(key)
|
||||
})
|
||||
}
|
||||
|
||||
// DeleteAllocationBucket is used to delete an allocation bucket if it exists.
|
||||
func (s *BoltStateDB) DeleteAllocationBucket(allocID string) error {
|
||||
return s.db.Update(func(tx *bolt.Tx) error {
|
||||
return s.db.Update(func(tx *boltdd.Tx) error {
|
||||
// Retrieve the root allocations bucket
|
||||
allocations := newNamedBucket(tx, allocationsBucket)
|
||||
allocations := tx.Bucket(allocationsBucket)
|
||||
if allocations == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
key := []byte(allocID)
|
||||
return s.codec.DeleteBucket(allocations, key)
|
||||
return allocations.DeleteBucket(key)
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -270,18 +268,18 @@ func (s *BoltStateDB) Close() error {
|
|||
// particular allocation. If the root allocation bucket or the specific
|
||||
// allocation bucket doesn't exist, it will be created as long as the
|
||||
// transaction is writable.
|
||||
func getAllocationBucket(tx *bolt.Tx, allocID string) (kvStore, error) {
|
||||
func getAllocationBucket(tx *boltdd.Tx, allocID string) (*boltdd.Bucket, error) {
|
||||
var err error
|
||||
w := tx.Writable()
|
||||
|
||||
// Retrieve the root allocations bucket
|
||||
allocations := newNamedBucket(tx, allocationsBucket)
|
||||
allocations := tx.Bucket(allocationsBucket)
|
||||
if allocations == nil {
|
||||
if !w {
|
||||
return nil, fmt.Errorf("Allocations bucket doesn't exist and transaction is not writable")
|
||||
}
|
||||
|
||||
allocations, err = createNamedBucketIfNotExists(tx, allocationsBucket)
|
||||
allocations, err = tx.CreateBucketIfNotExists(allocationsBucket)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -308,7 +306,7 @@ func getAllocationBucket(tx *bolt.Tx, allocID string) (kvStore, error) {
|
|||
// particular task. If the root allocation bucket, the specific
|
||||
// allocation or task bucket doesn't exist, they will be created as long as the
|
||||
// transaction is writable.
|
||||
func getTaskBucket(tx *bolt.Tx, allocID, taskName string) (kvStore, error) {
|
||||
func getTaskBucket(tx *boltdd.Tx, allocID, taskName string) (*boltdd.Bucket, error) {
|
||||
alloc, err := getAllocationBucket(tx, allocID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
|
|
@ -0,0 +1,360 @@
|
|||
// boltdd contains a wrapper around BoltDB to deduplicate writes and encode
|
||||
// values using mgspack. (dd stands for DeDuplicate)
|
||||
package boltdd
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"os"
|
||||
"sync"
|
||||
|
||||
"github.com/boltdb/bolt"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
"github.com/ugorji/go/codec"
|
||||
"golang.org/x/crypto/blake2b"
|
||||
)
|
||||
|
||||
// DB wraps an underlying bolt.DB to create write deduplicating buckets and
|
||||
// msgpack encoded values.
|
||||
type DB struct {
|
||||
rootBuckets map[string]*bucketMeta
|
||||
rootBucketsLock sync.Mutex
|
||||
|
||||
bdb *bolt.DB
|
||||
}
|
||||
|
||||
// Open a bolt.DB and wrap it in a write-deduplicating msgpack-encoding
|
||||
// implementation.
|
||||
func Open(path string, mode os.FileMode, options *bolt.Options) (*DB, error) {
|
||||
bdb, err := bolt.Open(path, mode, options)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &DB{
|
||||
rootBuckets: make(map[string]*bucketMeta),
|
||||
bdb: bdb,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (db *DB) bucket(btx *bolt.Tx, name []byte) *Bucket {
|
||||
bb := btx.Bucket(name)
|
||||
if bb == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
db.rootBucketsLock.Lock()
|
||||
defer db.rootBucketsLock.Unlock()
|
||||
|
||||
b, ok := db.rootBuckets[string(name)]
|
||||
if !ok {
|
||||
b = newBucketMeta()
|
||||
db.rootBuckets[string(name)] = b
|
||||
}
|
||||
|
||||
return newBucket(b, bb)
|
||||
}
|
||||
|
||||
func (db *DB) createBucket(btx *bolt.Tx, name []byte) (*Bucket, error) {
|
||||
bb, err := btx.CreateBucket(name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
db.rootBucketsLock.Lock()
|
||||
defer db.rootBucketsLock.Unlock()
|
||||
|
||||
// Always create a new Bucket since CreateBucket above fails if the
|
||||
// bucket already exists.
|
||||
b := newBucketMeta()
|
||||
db.rootBuckets[string(name)] = b
|
||||
|
||||
return newBucket(b, bb), nil
|
||||
}
|
||||
|
||||
func (db *DB) createBucketIfNotExists(btx *bolt.Tx, name []byte) (*Bucket, error) {
|
||||
bb, err := btx.CreateBucketIfNotExists(name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
db.rootBucketsLock.Lock()
|
||||
defer db.rootBucketsLock.Unlock()
|
||||
|
||||
b, ok := db.rootBuckets[string(name)]
|
||||
if !ok {
|
||||
b = newBucketMeta()
|
||||
db.rootBuckets[string(name)] = b
|
||||
}
|
||||
|
||||
return newBucket(b, bb), nil
|
||||
}
|
||||
|
||||
func (db *DB) Update(fn func(*Tx) error) error {
|
||||
return db.bdb.Update(func(btx *bolt.Tx) error {
|
||||
tx := newTx(db, btx)
|
||||
return fn(tx)
|
||||
})
|
||||
}
|
||||
|
||||
func (db *DB) View(fn func(*Tx) error) error {
|
||||
return db.bdb.View(func(btx *bolt.Tx) error {
|
||||
tx := newTx(db, btx)
|
||||
return fn(tx)
|
||||
})
|
||||
}
|
||||
|
||||
// Close closes the underlying bolt.DB and clears all bucket hashes. DB is
|
||||
// unusable after closing.
|
||||
func (db *DB) Close() error {
|
||||
db.rootBuckets = nil
|
||||
return db.bdb.Close()
|
||||
}
|
||||
|
||||
// BoltDB returns the underlying bolt.DB.
|
||||
func (db *DB) BoltDB() *bolt.DB {
|
||||
return db.bdb
|
||||
}
|
||||
|
||||
type Tx struct {
|
||||
db *DB
|
||||
btx *bolt.Tx
|
||||
}
|
||||
|
||||
func newTx(db *DB, btx *bolt.Tx) *Tx {
|
||||
return &Tx{
|
||||
db: db,
|
||||
btx: btx,
|
||||
}
|
||||
}
|
||||
|
||||
// Bucket returns a root bucket or nil if it doesn't exist.
|
||||
func (tx *Tx) Bucket(name []byte) *Bucket {
|
||||
return tx.db.bucket(tx.btx, name)
|
||||
}
|
||||
|
||||
func (tx *Tx) CreateBucket(name []byte) (*Bucket, error) {
|
||||
return tx.db.createBucket(tx.btx, name)
|
||||
}
|
||||
|
||||
// CreateBucketIfNotExists returns a root bucket or creates a new one if it
|
||||
// doesn't already exist.
|
||||
func (tx *Tx) CreateBucketIfNotExists(name []byte) (*Bucket, error) {
|
||||
return tx.db.createBucketIfNotExists(tx.btx, name)
|
||||
}
|
||||
|
||||
// Writable wraps boltdb Tx.Writable.
|
||||
func (tx *Tx) Writable() bool {
|
||||
return tx.btx.Writable()
|
||||
}
|
||||
|
||||
// BoltTx returns the unerlying bolt.Tx
|
||||
func (tx *Tx) BoltTx() *bolt.Tx {
|
||||
return tx.btx
|
||||
}
|
||||
|
||||
// bucketMeta persists metadata -- such as key hashes and child buckets --
|
||||
// about boltdb Buckets across transactions.
|
||||
type bucketMeta struct {
|
||||
// hashes holds all of the value hashes for keys in this bucket
|
||||
hashes map[string][]byte
|
||||
hashesLock sync.Mutex
|
||||
|
||||
// buckets holds all of the child buckets
|
||||
buckets map[string]*bucketMeta
|
||||
bucketsLock sync.Mutex
|
||||
}
|
||||
|
||||
func newBucketMeta() *bucketMeta {
|
||||
return &bucketMeta{
|
||||
hashes: make(map[string][]byte),
|
||||
buckets: make(map[string]*bucketMeta),
|
||||
}
|
||||
}
|
||||
|
||||
// getHash of last value written to a key or nil if no hash exists.
|
||||
func (bm *bucketMeta) getHash(hashKey string) []byte {
|
||||
bm.hashesLock.Lock()
|
||||
lastHash := bm.hashes[hashKey]
|
||||
bm.hashesLock.Unlock()
|
||||
return lastHash
|
||||
}
|
||||
|
||||
// setHash of last value written to key.
|
||||
func (bm *bucketMeta) setHash(hashKey string, hashVal []byte) {
|
||||
bm.hashesLock.Lock()
|
||||
bm.hashes[hashKey] = hashVal
|
||||
bm.hashesLock.Unlock()
|
||||
}
|
||||
|
||||
// delHash deletes a hash value or does nothing if the hash key does not exist.
|
||||
func (bm *bucketMeta) delHash(hashKey string) {
|
||||
bm.hashesLock.Lock()
|
||||
delete(bm.hashes, hashKey)
|
||||
bm.hashesLock.Unlock()
|
||||
}
|
||||
|
||||
// createBucket metadata entry for the given nested bucket. Overwrites any
|
||||
// existing entry so caller should ensure bucket does not already exist.
|
||||
func (bm *bucketMeta) createBucket(name []byte) *bucketMeta {
|
||||
bm.bucketsLock.Lock()
|
||||
defer bm.bucketsLock.Unlock()
|
||||
|
||||
// Always create a new Bucket since CreateBucket above fails if the
|
||||
// bucket already exists.
|
||||
b := newBucketMeta()
|
||||
bm.buckets[string(name)] = b
|
||||
return b
|
||||
}
|
||||
|
||||
// deleteBucket metadata entry for the given nested bucket. Does nothing if
|
||||
// nested bucket metadata does not exist.
|
||||
func (bm *bucketMeta) deleteBucket(name []byte) {
|
||||
bm.bucketsLock.Lock()
|
||||
delete(bm.buckets, string(name))
|
||||
bm.bucketsLock.Unlock()
|
||||
|
||||
}
|
||||
|
||||
// getOrCreateBucket metadata entry for the given nested bucket.
|
||||
func (bm *bucketMeta) getOrCreateBucket(name []byte) *bucketMeta {
|
||||
bm.bucketsLock.Lock()
|
||||
defer bm.bucketsLock.Unlock()
|
||||
|
||||
b, ok := bm.buckets[string(name)]
|
||||
if !ok {
|
||||
b = newBucketMeta()
|
||||
bm.buckets[string(name)] = b
|
||||
}
|
||||
return b
|
||||
}
|
||||
|
||||
type Bucket struct {
|
||||
bm *bucketMeta
|
||||
boltBucket *bolt.Bucket
|
||||
}
|
||||
|
||||
// newBucket creates a new view into a bucket backed by a boltdb
|
||||
// transaction.
|
||||
func newBucket(b *bucketMeta, bb *bolt.Bucket) *Bucket {
|
||||
return &Bucket{
|
||||
bm: b,
|
||||
boltBucket: bb,
|
||||
}
|
||||
}
|
||||
|
||||
// Put into boltdb iff it has changed since the last write.
|
||||
func (b *Bucket) Put(key []byte, val interface{}) error {
|
||||
// buffer for writing serialized state to
|
||||
var buf bytes.Buffer
|
||||
|
||||
// Serialize the object
|
||||
if err := codec.NewEncoder(&buf, structs.MsgpackHandle).Encode(val); err != nil {
|
||||
return fmt.Errorf("failed to encode passed object: %v", err)
|
||||
}
|
||||
|
||||
// Hash for skipping unnecessary writes
|
||||
hashKey := string(key)
|
||||
hashVal := blake2b.Sum256(buf.Bytes())
|
||||
|
||||
// lastHash value or nil if it hasn't been hashed yet
|
||||
lastHash := b.bm.getHash(hashKey)
|
||||
|
||||
// If the hashes are equal, skip the write
|
||||
if bytes.Equal(hashVal[:], lastHash) {
|
||||
return nil
|
||||
}
|
||||
|
||||
// New value: write it to the underlying boltdb
|
||||
if err := b.boltBucket.Put(key, buf.Bytes()); err != nil {
|
||||
return fmt.Errorf("failed to write data at key %s: %v", key, err)
|
||||
}
|
||||
|
||||
// New value written, store hash (bucket path map was created above)
|
||||
b.bm.setHash(hashKey, hashVal[:])
|
||||
|
||||
return nil
|
||||
|
||||
}
|
||||
|
||||
// Get value by key from boltdb or return an error if key not found.
|
||||
func (b *Bucket) Get(key []byte, obj interface{}) error {
|
||||
// Get the raw data from the underlying boltdb
|
||||
data := b.boltBucket.Get(key)
|
||||
if data == nil {
|
||||
return fmt.Errorf("no data at key %v", string(key))
|
||||
}
|
||||
|
||||
// Deserialize the object
|
||||
if err := codec.NewDecoderBytes(data, structs.MsgpackHandle).Decode(obj); err != nil {
|
||||
return fmt.Errorf("failed to decode data into passed object: %v", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Delete removes a key from the bucket. If the key does not exist then nothing
|
||||
// is done and a nil error is returned. Returns an error if the bucket was
|
||||
// created from a read-only transaction.
|
||||
func (b *Bucket) Delete(key []byte) error {
|
||||
err := b.boltBucket.Delete(key)
|
||||
b.bm.delHash(string(key))
|
||||
return err
|
||||
}
|
||||
|
||||
// Bucket represents a boltdb Bucket and its associated metadata necessary for
|
||||
// write deduplication. Like bolt.Buckets it is only valid for the duration of
|
||||
// the transaction that created it.
|
||||
func (b *Bucket) Bucket(name []byte) *Bucket {
|
||||
bb := b.boltBucket.Bucket(name)
|
||||
if bb == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
bmeta := b.bm.getOrCreateBucket(name)
|
||||
return newBucket(bmeta, bb)
|
||||
}
|
||||
|
||||
// CreateBucket creates a new bucket at the given key and returns the new
|
||||
// bucket. Returns an error if the key already exists, if the bucket name is
|
||||
// blank, or if the bucket name is too long. The bucket instance is only valid
|
||||
// for the lifetime of the transaction.
|
||||
func (b *Bucket) CreateBucket(name []byte) (*Bucket, error) {
|
||||
bb, err := b.boltBucket.CreateBucket(name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
bmeta := b.bm.createBucket(name)
|
||||
return newBucket(bmeta, bb), nil
|
||||
}
|
||||
|
||||
// CreateBucketIfNotExists creates a new bucket if it doesn't already exist and
|
||||
// returns a reference to it. The bucket instance is only valid for the
|
||||
// lifetime of the transaction.
|
||||
func (b *Bucket) CreateBucketIfNotExists(name []byte) (*Bucket, error) {
|
||||
bb, err := b.boltBucket.CreateBucketIfNotExists(name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
bmeta := b.bm.getOrCreateBucket(name)
|
||||
return newBucket(bmeta, bb), nil
|
||||
}
|
||||
|
||||
// DeleteBucket deletes a child bucket. Returns an error if the bucket does not
|
||||
// exist or corresponds to a non-bucket key.
|
||||
func (b *Bucket) DeleteBucket(name []byte) error {
|
||||
// Delete the bucket from the underlying boltdb
|
||||
err := b.boltBucket.DeleteBucket(name)
|
||||
|
||||
// Remove reference to child bucket
|
||||
b.bm.deleteBucket(name)
|
||||
return err
|
||||
}
|
||||
|
||||
// BoltBucket returns the internal bolt.Bucket for this Bucket. Only valid
|
||||
// for the duration of the current transaction.
|
||||
func (b *Bucket) BoltBucket() *bolt.Bucket {
|
||||
return b.boltBucket
|
||||
}
|
|
@ -0,0 +1,344 @@
|
|||
package boltdd
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
|
||||
"github.com/hashicorp/nomad/nomad/mock"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/ugorji/go/codec"
|
||||
)
|
||||
|
||||
type testingT interface {
|
||||
Fatalf(format string, args ...interface{})
|
||||
Logf(format string, args ...interface{})
|
||||
}
|
||||
|
||||
func setupBoltDB(t testingT) (*DB, func()) {
|
||||
dir, err := ioutil.TempDir("", "nomadtest_")
|
||||
if err != nil {
|
||||
t.Fatalf("error creating tempdir: %v", err)
|
||||
}
|
||||
|
||||
cleanup := func() {
|
||||
if err := os.RemoveAll(dir); err != nil {
|
||||
t.Logf("error removing test dir: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
dbFilename := filepath.Join(dir, "nomadtest.db")
|
||||
db, err := Open(dbFilename, 0600, nil)
|
||||
if err != nil {
|
||||
cleanup()
|
||||
t.Fatalf("error creating boltdb: %v", err)
|
||||
}
|
||||
|
||||
return db, func() {
|
||||
db.Close()
|
||||
cleanup()
|
||||
}
|
||||
}
|
||||
|
||||
func TestDB_Open(t *testing.T) {
|
||||
t.Parallel()
|
||||
require := require.New(t)
|
||||
|
||||
db, cleanup := setupBoltDB(t)
|
||||
defer cleanup()
|
||||
|
||||
require.Equal(0, db.BoltDB().Stats().TxStats.Write)
|
||||
}
|
||||
|
||||
func TestBucket_Create(t *testing.T) {
|
||||
t.Parallel()
|
||||
require := require.New(t)
|
||||
|
||||
db, cleanup := setupBoltDB(t)
|
||||
defer cleanup()
|
||||
|
||||
name := []byte("create_test")
|
||||
|
||||
require.NoError(db.Update(func(tx *Tx) error {
|
||||
// Trying to get a nonexistent bucket should return nil
|
||||
require.Nil(tx.Bucket(name))
|
||||
|
||||
// Creating a nonexistent bucket should work
|
||||
b, err := tx.CreateBucket(name)
|
||||
require.NoError(err)
|
||||
require.NotNil(b)
|
||||
|
||||
// Recreating a bucket that exists should fail
|
||||
b, err = tx.CreateBucket(name)
|
||||
require.Error(err)
|
||||
require.Nil(b)
|
||||
|
||||
// get or create should work
|
||||
b, err = tx.CreateBucketIfNotExists(name)
|
||||
require.NoError(err)
|
||||
require.NotNil(b)
|
||||
return nil
|
||||
}))
|
||||
|
||||
// Bucket should be visible
|
||||
require.NoError(db.View(func(tx *Tx) error {
|
||||
require.NotNil(tx.Bucket(name))
|
||||
return nil
|
||||
}))
|
||||
}
|
||||
|
||||
func TestBucket_DedupeWrites(t *testing.T) {
|
||||
t.Parallel()
|
||||
require := require.New(t)
|
||||
|
||||
db, cleanup := setupBoltDB(t)
|
||||
defer cleanup()
|
||||
|
||||
bname := []byte("dedupewrites_test")
|
||||
k1name := []byte("k1")
|
||||
k2name := []byte("k2")
|
||||
|
||||
// Put 2 keys
|
||||
require.NoError(db.Update(func(tx *Tx) error {
|
||||
b, err := tx.CreateBucket(bname)
|
||||
require.NoError(err)
|
||||
|
||||
require.NoError(b.Put(k1name, k1name))
|
||||
require.NoError(b.Put(k2name, k2name))
|
||||
return nil
|
||||
}))
|
||||
|
||||
// Assert there was at least 1 write
|
||||
origWrites := db.BoltDB().Stats().TxStats.Write
|
||||
require.NotZero(origWrites)
|
||||
|
||||
// Write the same values again and expect no new writes
|
||||
require.NoError(db.Update(func(tx *Tx) error {
|
||||
b := tx.Bucket(bname)
|
||||
require.NoError(b.Put(k1name, k1name))
|
||||
require.NoError(b.Put(k2name, k2name))
|
||||
return nil
|
||||
}))
|
||||
|
||||
putWrites := db.BoltDB().Stats().TxStats.Write
|
||||
|
||||
// Unforunately every committed transaction causes two writes, so this
|
||||
// only saves 1 write operation
|
||||
require.Equal(origWrites+2, putWrites)
|
||||
|
||||
// Write new values and assert more writes took place
|
||||
require.NoError(db.Update(func(tx *Tx) error {
|
||||
b := tx.Bucket(bname)
|
||||
require.NoError(b.Put(k1name, []byte("newval1")))
|
||||
require.NoError(b.Put(k2name, []byte("newval2")))
|
||||
return nil
|
||||
}))
|
||||
|
||||
putWrites2 := db.BoltDB().Stats().TxStats.Write
|
||||
|
||||
// Expect 3 additional writes: 2 for the transaction and one for the
|
||||
// dirty page
|
||||
require.Equal(putWrites+3, putWrites2)
|
||||
}
|
||||
|
||||
func TestBucket_Delete(t *testing.T) {
|
||||
t.Parallel()
|
||||
require := require.New(t)
|
||||
|
||||
db, cleanup := setupBoltDB(t)
|
||||
defer cleanup()
|
||||
|
||||
parentName := []byte("delete_test")
|
||||
parentKey := []byte("parent_key")
|
||||
childName := []byte("child")
|
||||
childKey := []byte("child_key")
|
||||
grandchildName1 := []byte("grandchild1")
|
||||
grandchildKey1 := []byte("grandchild_key1")
|
||||
grandchildName2 := []byte("grandchild2")
|
||||
grandchildKey2 := []byte("grandchild_key2")
|
||||
|
||||
// Create a parent bucket with 1 child and 2 grandchildren
|
||||
require.NoError(db.Update(func(tx *Tx) error {
|
||||
pb, err := tx.CreateBucket(parentName)
|
||||
require.NoError(err)
|
||||
|
||||
require.NoError(pb.Put(parentKey, parentKey))
|
||||
|
||||
child, err := pb.CreateBucket(childName)
|
||||
require.NoError(err)
|
||||
|
||||
require.NoError(child.Put(childKey, childKey))
|
||||
|
||||
grandchild1, err := child.CreateBucket(grandchildName1)
|
||||
require.NoError(err)
|
||||
|
||||
require.NoError(grandchild1.Put(grandchildKey1, grandchildKey1))
|
||||
|
||||
grandchild2, err := child.CreateBucket(grandchildName2)
|
||||
require.NoError(err)
|
||||
|
||||
require.NoError(grandchild2.Put(grandchildKey2, grandchildKey2))
|
||||
return nil
|
||||
}))
|
||||
|
||||
// Verify grandchild keys wrote
|
||||
require.NoError(db.View(func(tx *Tx) error {
|
||||
grandchild1 := tx.Bucket(parentName).Bucket(childName).Bucket(grandchildName1)
|
||||
var v1 []byte
|
||||
grandchild1.Get(grandchildKey1, &v1)
|
||||
require.Equal(grandchildKey1, v1)
|
||||
|
||||
grandchild2 := tx.Bucket(parentName).Bucket(childName).Bucket(grandchildName2)
|
||||
var v2 []byte
|
||||
grandchild2.Get(grandchildKey2, &v2)
|
||||
require.Equal(grandchildKey2, v2)
|
||||
return nil
|
||||
}))
|
||||
|
||||
// Delete grandchildKey1 and grandchild2
|
||||
require.NoError(db.Update(func(tx *Tx) error {
|
||||
child := tx.Bucket(parentName).Bucket(childName)
|
||||
|
||||
require.NoError(child.DeleteBucket(grandchildName2))
|
||||
|
||||
grandchild1 := child.Bucket(grandchildName1)
|
||||
require.NoError(grandchild1.Delete(grandchildKey1))
|
||||
return nil
|
||||
}))
|
||||
|
||||
// Ensure grandchild2 alone was deleted
|
||||
require.NoError(db.View(func(tx *Tx) error {
|
||||
grandchild1 := tx.Bucket(parentName).Bucket(childName).Bucket(grandchildName1)
|
||||
var v1 []byte
|
||||
grandchild1.Get(grandchildKey1, &v1)
|
||||
require.Equal(([]byte)(nil), v1)
|
||||
|
||||
grandchild2 := tx.Bucket(parentName).Bucket(childName).Bucket(grandchildName2)
|
||||
require.Nil(grandchild2)
|
||||
return nil
|
||||
}))
|
||||
|
||||
// Deleting child bucket should delete grandchild1 as well
|
||||
require.NoError(db.Update(func(tx *Tx) error {
|
||||
parent := tx.Bucket(parentName)
|
||||
require.NoError(parent.DeleteBucket(childName))
|
||||
|
||||
// Recreate child bucket and ensure childKey and grandchild are gone
|
||||
child, err := parent.CreateBucket(childName)
|
||||
require.NoError(err)
|
||||
|
||||
var v []byte
|
||||
require.Error(child.Get(childKey, &v))
|
||||
require.Equal(([]byte)(nil), v)
|
||||
|
||||
require.Nil(child.Bucket(grandchildName1))
|
||||
|
||||
// Rewrite childKey1 to make sure it doesn't get dedupe incorrectly
|
||||
require.NoError(child.Put(childKey, childKey))
|
||||
return nil
|
||||
}))
|
||||
|
||||
// Ensure childKey1 was rewritten and not deduped incorrectly
|
||||
require.NoError(db.View(func(tx *Tx) error {
|
||||
var v []byte
|
||||
require.NoError(tx.Bucket(parentName).Bucket(childName).Get(childKey, &v))
|
||||
require.Equal(childKey, v)
|
||||
return nil
|
||||
}))
|
||||
}
|
||||
|
||||
func BenchmarkWriteDeduplication_On(b *testing.B) {
|
||||
db, cleanup := setupBoltDB(b)
|
||||
defer cleanup()
|
||||
|
||||
bucketName := []byte("allocations")
|
||||
alloc := mock.Alloc()
|
||||
allocID := []byte(alloc.ID)
|
||||
|
||||
err := db.Update(func(tx *Tx) error {
|
||||
allocs, err := tx.CreateBucket(bucketName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return allocs.Put(allocID, alloc)
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
b.Fatalf("error setting up: %v", err)
|
||||
}
|
||||
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
err := db.Update(func(tx *Tx) error {
|
||||
return tx.Bucket(bucketName).Put(allocID, alloc)
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
b.Fatalf("error at runtime: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkWriteDeduplication_Off(b *testing.B) {
|
||||
dir, err := ioutil.TempDir("", "nomadtest_")
|
||||
if err != nil {
|
||||
b.Fatalf("error creating tempdir: %v", err)
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if err := os.RemoveAll(dir); err != nil {
|
||||
b.Logf("error removing test dir: %v", err)
|
||||
}
|
||||
}()
|
||||
|
||||
dbFilename := filepath.Join(dir, "nomadtest.db")
|
||||
db, err := Open(dbFilename, 0600, nil)
|
||||
if err != nil {
|
||||
b.Fatalf("error creating boltdb: %v", err)
|
||||
}
|
||||
|
||||
defer db.Close()
|
||||
|
||||
bucketName := []byte("allocations")
|
||||
alloc := mock.Alloc()
|
||||
allocID := []byte(alloc.ID)
|
||||
|
||||
err = db.Update(func(tx *Tx) error {
|
||||
allocs, err := tx.CreateBucket(bucketName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var buf bytes.Buffer
|
||||
if err := codec.NewEncoder(&buf, structs.MsgpackHandle).Encode(alloc); err != nil {
|
||||
return fmt.Errorf("failed to encode passed object: %v", err)
|
||||
}
|
||||
|
||||
return allocs.Put(allocID, buf)
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
b.Fatalf("error setting up: %v", err)
|
||||
}
|
||||
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
err := db.Update(func(tx *Tx) error {
|
||||
var buf bytes.Buffer
|
||||
if err := codec.NewEncoder(&buf, structs.MsgpackHandle).Encode(alloc); err != nil {
|
||||
return fmt.Errorf("failed to encode passed object: %v", err)
|
||||
}
|
||||
|
||||
return tx.Bucket(bucketName).Put(allocID, buf)
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
b.Fatalf("error at runtime: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue