From 820af27171c58076fdbbe46926f120004a1e053c Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Thu, 16 Aug 2018 11:14:52 -0700 Subject: [PATCH] wrap boltdb in a write deduplicator Saves a tiny bit of cpu and some IO. Sadly doesn't prevent all IO on duplicate writes as the transactions are still created and committed. $ go test -bench=. -benchmem goos: linux goarch: amd64 pkg: github.com/hashicorp/nomad/helper/boltdd BenchmarkWriteDeduplication_On-4 500 4059591 ns/op 23736 B/op 56 allocs/op BenchmarkWriteDeduplication_Off-4 300 4115319 ns/op 25942 B/op 55 allocs/op --- client/allocrunner/alloc_runner.go | 18 +- client/allocrunner/taskrunner/task_runner.go | 19 +- client/state/kvcodec.go | 132 ------- client/state/kvcodec_test.go | 80 ----- client/state/named_bucket.go | 123 ------- client/state/named_bucket_test.go | 151 -------- client/state/state_database.go | 60 ++-- helper/boltdd/boltdd.go | 360 +++++++++++++++++++ helper/boltdd/boltdd_test.go | 344 ++++++++++++++++++ 9 files changed, 752 insertions(+), 535 deletions(-) delete mode 100644 client/state/kvcodec.go delete mode 100644 client/state/kvcodec_test.go delete mode 100644 client/state/named_bucket.go delete mode 100644 client/state/named_bucket_test.go create mode 100644 helper/boltdd/boltdd.go create mode 100644 helper/boltdd/boltdd_test.go diff --git a/client/allocrunner/alloc_runner.go b/client/allocrunner/alloc_runner.go index 9bf22149e..b60432332 100644 --- a/client/allocrunner/alloc_runner.go +++ b/client/allocrunner/alloc_runner.go @@ -15,7 +15,6 @@ import ( "github.com/hashicorp/nomad/client/allocrunner/taskrunner" "github.com/hashicorp/nomad/client/config" consulApi "github.com/hashicorp/nomad/client/consul" - "github.com/hashicorp/nomad/client/state" "github.com/hashicorp/nomad/client/vaultclient" "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/nomad/structs" @@ -432,15 +431,16 @@ func (r *AllocRunner) saveAllocRunnerState() error { // DestroyState is used to cleanup after ourselves func (r *AllocRunner) DestroyState() error { - r.allocStateLock.Lock() - defer r.allocStateLock.Unlock() + //r.allocStateLock.Lock() + //defer r.allocStateLock.Unlock() - return r.stateDB.Update(func(tx *bolt.Tx) error { - if err := state.DeleteAllocationBucket(tx, r.allocID); err != nil { - return fmt.Errorf("failed to delete allocation bucket: %v", err) - } - return nil - }) + //return r.stateDB.Update(func(tx *bolt.Tx) error { + // if err := state.DeleteAllocationBucket(tx, r.allocID); err != nil { + // return fmt.Errorf("failed to delete allocation bucket: %v", err) + // } + // return nil + //}) + panic("deprecated: use allocrunnerv2") } // DestroyContext is used to destroy the context diff --git a/client/allocrunner/taskrunner/task_runner.go b/client/allocrunner/taskrunner/task_runner.go index 60032e60d..386fb672a 100644 --- a/client/allocrunner/taskrunner/task_runner.go +++ b/client/allocrunner/taskrunner/task_runner.go @@ -26,7 +26,6 @@ import ( "github.com/hashicorp/nomad/client/config" consulApi "github.com/hashicorp/nomad/client/consul" "github.com/hashicorp/nomad/client/driver" - "github.com/hashicorp/nomad/client/state" "github.com/hashicorp/nomad/client/vaultclient" "github.com/hashicorp/nomad/nomad/structs" "github.com/ugorji/go/codec" @@ -536,15 +535,17 @@ func (r *TaskRunner) SaveState() error { // DestroyState is used to cleanup after ourselves func (r *TaskRunner) DestroyState() error { - r.persistLock.Lock() - defer r.persistLock.Unlock() + //r.persistLock.Lock() + //defer r.persistLock.Unlock() - return r.stateDB.Update(func(tx *bolt.Tx) error { - if err := state.DeleteTaskBucket(tx, r.alloc.ID, r.task.Name); err != nil { - return fmt.Errorf("failed to delete task bucket: %v", err) - } - return nil - }) + //return r.stateDB.Update(func(tx *bolt.Tx) error { + // if err := state.DeleteTaskBucket(tx, r.alloc.ID, r.task.Name); err != nil { + // return fmt.Errorf("failed to delete task bucket: %v", err) + // } + // return nil + //}) + //XXX Deprecated: see allocrunnerv2 + panic("deprecated") } // setState is used to update the state of the task runner diff --git a/client/state/kvcodec.go b/client/state/kvcodec.go deleted file mode 100644 index 61bca1a4f..000000000 --- a/client/state/kvcodec.go +++ /dev/null @@ -1,132 +0,0 @@ -package state - -import ( - "bytes" - "fmt" - "io" - "sync" - - "github.com/hashicorp/nomad/nomad/structs" - "github.com/ugorji/go/codec" - "golang.org/x/crypto/blake2b" -) - -type kvStore interface { - Path() []byte - Bucket(name []byte) kvStore - CreateBucket(key []byte) (kvStore, error) - CreateBucketIfNotExists(key []byte) (kvStore, error) - DeleteBucket(key []byte) error - Get(key []byte) (val []byte) - Put(key, val []byte) error -} - -// keyValueCodec handles encoding and decoding values from a key/value store -// such as boltdb. -type keyValueCodec struct { - // hashes maps buckets to keys to the hash of the last content written: - // bucket -> key -> hash for example: - // allocations/1234 -> alloc -> abcd - // allocations/1234/redis -> task_state -> efff - hashes map[string]map[string][]byte - hashesLock sync.Mutex -} - -func newKeyValueCodec() *keyValueCodec { - return &keyValueCodec{ - hashes: make(map[string]map[string][]byte), - } -} - -// Put into kv store iff it has changed since the last write. A globally -// unique key is constructed for each value by concatinating the path and key -// passed in. -func (c *keyValueCodec) Put(bkt kvStore, key []byte, val interface{}) error { - // buffer for writing serialized state to - var buf bytes.Buffer - - // Hash for skipping unnecessary writes - h, err := blake2b.New256(nil) - if err != nil { - // Programming error that should never happen! - return err - } - - // Multiplex writes to both hasher and buffer - w := io.MultiWriter(h, &buf) - - // Serialize the object - if err := codec.NewEncoder(w, structs.MsgpackHandle).Encode(val); err != nil { - return fmt.Errorf("failed to encode passed object: %v", err) - } - - // If the hashes are equal, skip the write - hashPath := string(bkt.Path()) - hashKey := string(key) - hashVal := h.Sum(nil) - - // lastHash value or nil if it hasn't been hashed yet - var lastHash []byte - - c.hashesLock.Lock() - if hashBkt, ok := c.hashes[hashPath]; ok { - lastHash = hashBkt[hashKey] - } else { - // Create hash bucket - c.hashes[hashPath] = make(map[string][]byte, 2) - } - c.hashesLock.Unlock() - - if bytes.Equal(hashVal, lastHash) { - return nil - } - - // New value: write it to the underlying store - if err := bkt.Put(key, buf.Bytes()); err != nil { - return fmt.Errorf("failed to write data at key %s: %v", key, err) - } - - // New value written, store hash (bucket path map was created above) - c.hashesLock.Lock() - c.hashes[hashPath][hashKey] = hashVal - c.hashesLock.Unlock() - - return nil - -} - -// Get value by key from boltdb. -func (c *keyValueCodec) Get(bkt kvStore, key []byte, obj interface{}) error { - // Get the data - data := bkt.Get(key) - if data == nil { - return fmt.Errorf("no data at key %v", string(key)) - } - - // Deserialize the object - if err := codec.NewDecoderBytes(data, structs.MsgpackHandle).Decode(obj); err != nil { - return fmt.Errorf("failed to decode data into passed object: %v", err) - } - - return nil -} - -// DeleteBucket or do nothing if bucket doesn't exist. -func (c *keyValueCodec) DeleteBucket(parent kvStore, bktName []byte) error { - // Get the path of the bucket being deleted - bkt := parent.Bucket(bktName) - if bkt == nil { - // Doesn't exist! Nothing to delete - return nil - } - - // Delete the bucket - err := parent.DeleteBucket(bktName) - - // Always purge all corresponding hashes to prevent memory leaks - c.hashesLock.Lock() - delete(c.hashes, string(bkt.Path())) - c.hashesLock.Unlock() - - return err -} diff --git a/client/state/kvcodec_test.go b/client/state/kvcodec_test.go deleted file mode 100644 index ee625d016..000000000 --- a/client/state/kvcodec_test.go +++ /dev/null @@ -1,80 +0,0 @@ -package state - -import ( - "testing" - - "github.com/stretchr/testify/require" -) - -// mockKVStore tracks puts and is useful for testing KVCodec's write-on-change -// code. -type mockKVStore struct { - puts int -} - -func (mockKVStore) Path() []byte { - return []byte{} -} - -func (m *mockKVStore) Bucket(name []byte) kvStore { - return m -} - -func (m *mockKVStore) CreateBucket(key []byte) (kvStore, error) { - return m, nil -} - -func (m *mockKVStore) CreateBucketIfNotExists(key []byte) (kvStore, error) { - return m, nil -} - -func (m *mockKVStore) DeleteBucket(key []byte) error { - return nil -} - -func (mockKVStore) Get(key []byte) (val []byte) { - return nil -} - -func (m *mockKVStore) Put(key, val []byte) error { - m.puts++ - return nil -} - -// TestKVCodec_PutHash asserts that Puts on the underlying kvstore only occur -// when the data actually changes. -func TestKVCodec_PutHash(t *testing.T) { - require := require.New(t) - codec := newKeyValueCodec() - - // Create arguments for Put - kv := new(mockKVStore) - key := []byte("key1") - val := &struct { - Val int - }{ - Val: 1, - } - - // Initial Put should be written - require.NoError(codec.Put(kv, key, val)) - require.Equal(1, kv.puts) - - // Writing the same values again should be a noop - require.NoError(codec.Put(kv, key, val)) - require.Equal(1, kv.puts) - - // Changing the value should write again - val.Val++ - require.NoError(codec.Put(kv, key, val)) - require.Equal(2, kv.puts) - - // Changing the key should write again - key = []byte("key2") - require.NoError(codec.Put(kv, key, val)) - require.Equal(3, kv.puts) - - // Writing the same values again should be a noop - require.NoError(codec.Put(kv, key, val)) - require.Equal(3, kv.puts) -} diff --git a/client/state/named_bucket.go b/client/state/named_bucket.go deleted file mode 100644 index b5383e838..000000000 --- a/client/state/named_bucket.go +++ /dev/null @@ -1,123 +0,0 @@ -package state - -import "github.com/boltdb/bolt" - -// namedBucket is a wrapper around bolt.Bucket's to preserve their path -// information and expose it via the Path() method. -// -// Knowing the full bucket path to a key is necessary for tracking accesses in -// another datastructure such as the hashing writer keyValueCodec. -type namedBucket struct { - path []byte - name []byte - bkt *bolt.Bucket -} - -// newNamedBucket from a bolt transaction. -func newNamedBucket(tx *bolt.Tx, root []byte) *namedBucket { - b := tx.Bucket(root) - if b == nil { - return nil - } - - return &namedBucket{ - path: root, - name: root, - bkt: b, - } -} - -// createNamedBucketIfNotExists from a bolt transaction. -func createNamedBucketIfNotExists(tx *bolt.Tx, root []byte) (*namedBucket, error) { - b, err := tx.CreateBucketIfNotExists(root) - if err != nil { - return nil, err - } - - return &namedBucket{ - path: root, - name: root, - bkt: b, - }, nil -} - -// Path to this bucket (including this bucket). -func (n *namedBucket) Path() []byte { - return n.path -} - -// Name of this bucket. -func (n *namedBucket) Name() []byte { - return n.name -} - -// Bucket returns a bucket inside the current one or nil if the bucket does not -// exist. -func (n *namedBucket) Bucket(name []byte) kvStore { - b := n.bkt.Bucket(name) - if b == nil { - return nil - } - - return &namedBucket{ - path: n.chBkt(name), - name: name, - bkt: b, - } -} - -// CreateBucketIfNotExists creates a bucket if it doesn't exist and returns it -// or an error. -func (n *namedBucket) CreateBucketIfNotExists(name []byte) (kvStore, error) { - b, err := n.bkt.CreateBucketIfNotExists(name) - if err != nil { - return nil, err - } - - return &namedBucket{ - path: n.chBkt(name), - name: name, - bkt: b, - }, nil -} - -// CreateBucket creates a bucket and returns it. -func (n *namedBucket) CreateBucket(name []byte) (kvStore, error) { - b, err := n.bkt.CreateBucket(name) - if err != nil { - return nil, err - } - - return &namedBucket{ - path: n.chBkt(name), - name: name, - bkt: b, - }, nil -} - -// DeleteBucket calls DeleteBucket on the underlying bolt.Bucket. -func (n *namedBucket) DeleteBucket(name []byte) error { - return n.bkt.DeleteBucket(name) -} - -// Get calls Get on the underlying bolt.Bucket. -func (n *namedBucket) Get(key []byte) []byte { - return n.bkt.Get(key) -} - -// Put calls Put on the underlying bolt.Bucket. -func (n *namedBucket) Put(key, value []byte) error { - return n.bkt.Put(key, value) -} - -// chBkt is like chdir but for buckets: it appends the new name to the end of -// a copy of the path and returns it. -func (n *namedBucket) chBkt(name []byte) []byte { - // existing path + new path element + path separator - path := make([]byte, len(n.path)+len(name)+1) - copy(path[0:len(n.path)], n.path) - path[len(n.path)] = '/' - copy(path[len(n.path)+1:], name) - - return path -} diff --git a/client/state/named_bucket_test.go b/client/state/named_bucket_test.go deleted file mode 100644 index 705483a16..000000000 --- a/client/state/named_bucket_test.go +++ /dev/null @@ -1,151 +0,0 @@ -package state - -import ( - "io/ioutil" - "os" - "path/filepath" - "testing" - - "github.com/boltdb/bolt" - "github.com/stretchr/testify/require" -) - -func setupBoltDB(t *testing.T) (*bolt.DB, func()) { - dir, err := ioutil.TempDir("", "nomadtest_") - require.NoError(t, err) - cleanup := func() { - if err := os.RemoveAll(dir); err != nil { - t.Logf("error removing test dir: %v", err) - } - } - - dbFilename := filepath.Join(dir, "nomadtest.db") - db, err := bolt.Open(dbFilename, 0600, nil) - if err != nil { - cleanup() - t.Fatalf("error creating boltdb: %v", err) - } - - return db, func() { - db.Close() - cleanup() - } -} - -// TestNamedBucket_Path asserts that creating and changing buckets are tracked -// properly by the namedBucket wrapper. -func TestNamedBucket_Path(t *testing.T) { - t.Parallel() - require := require.New(t) - - db, cleanup := setupBoltDB(t) - defer cleanup() - - parentBktName, childBktName := []byte("root"), []byte("child") - parentKey, parentVal := []byte("pkey"), []byte("pval") - childKey, childVal := []byte("ckey"), []byte("cval") - - require.NoError(db.Update(func(tx *bolt.Tx) error { - // Trying to open a named bucket from a nonexistent bucket - // should return nil. - require.Nil(newNamedBucket(tx, []byte("nonexistent"))) - - // Creating a named bucket from a bolt tx should work and set - // the path and name properly. - b, err := createNamedBucketIfNotExists(tx, parentBktName) - require.NoError(err) - require.Equal(parentBktName, b.Name()) - require.Equal(parentBktName, b.Path()) - - // Trying to descend into a nonexistent bucket should return - // nil. - require.Nil(b.Bucket([]byte("nonexistent"))) - - // Descending into a new bucket should update the path. - childBkt, err := b.CreateBucket(childBktName) - require.NoError(err) - require.Equal(childBktName, childBkt.(*namedBucket).Name()) - require.Equal([]byte("root/child"), childBkt.Path()) - - // Assert the parent bucket did not get changed. - require.Equal(parentBktName, b.Name()) - require.Equal(parentBktName, b.Path()) - - // Add entries to both buckets - require.NoError(b.Put(parentKey, parentVal)) - require.NoError(childBkt.Put(childKey, childVal)) - return nil - })) - - // Read buckets and values back out - require.NoError(db.View(func(tx *bolt.Tx) error { - b := newNamedBucket(tx, parentBktName) - require.NotNil(b) - require.Equal(parentVal, b.Get(parentKey)) - require.Nil(b.Get(childKey)) - - childBkt := b.Bucket(childBktName) - require.NotNil(childBkt) - require.Nil(childBkt.Get(parentKey)) - require.Equal(childVal, childBkt.Get(childKey)) - return nil - })) -} - -// TestNamedBucket_DeleteBucket asserts that deleting a bucket properly purges -// all related keys from the internal hashes map. -func TestNamedBucket_DeleteBucket(t *testing.T) { - t.Parallel() - require := require.New(t) - - db, cleanup := setupBoltDB(t) - defer cleanup() - - // Create some nested buckets and keys (key values will just be their names) - b1Name, c1Name, c2Name, c1c1Name := []byte("b1"), []byte("c1"), []byte("c2"), []byte("c1c1") - b1k1, c1k1, c2k1, c1c1k1 := []byte("b1k1"), []byte("c1k1"), []byte("c2k1"), []byte("c1c1k1") - - codec := newKeyValueCodec() - - // Create initial db state - require.NoError(db.Update(func(tx *bolt.Tx) error { - // Create bucket 1 and key - b1, err := createNamedBucketIfNotExists(tx, b1Name) - require.NoError(err) - require.NoError(codec.Put(b1, b1k1, b1k1)) - - // Create child bucket 1 and key - c1, err := b1.CreateBucketIfNotExists(c1Name) - require.NoError(err) - require.NoError(codec.Put(c1, c1k1, c1k1)) - - // Create child-child bucket 1 and key - c1c1, err := c1.CreateBucketIfNotExists(c1c1Name) - require.NoError(err) - require.NoError(codec.Put(c1c1, c1c1k1, c1c1k1)) - - // Create child bucket 2 and key - c2, err := b1.CreateBucketIfNotExists(c2Name) - require.NoError(err) - require.NoError(codec.Put(c2, c2k1, c2k1)) - return nil - })) - - // codec should be tracking 4 hash buckets (b1, c1, c2, c1c1) - require.Len(codec.hashes, 4) - - // Delete c1 - require.NoError(db.Update(func(tx *bolt.Tx) error { - b1 := newNamedBucket(tx, b1Name) - return codec.DeleteBucket(b1, c1Name) - })) - - START HERE // We don't appear to be properly deleting the sub-bucket - // codec should be tracking 2 hash buckets (b1, c2) - require.Len(codec.hashes, 2) - - // Assert all of c1 is gone - require.NoError(db.View(func(tx *bolt.Tx) error { - return nil - })) -} diff --git a/client/state/state_database.go b/client/state/state_database.go index 4e0e2d946..ff89f6dff 100644 --- a/client/state/state_database.go +++ b/client/state/state_database.go @@ -4,8 +4,8 @@ import ( "fmt" "path/filepath" - "github.com/boltdb/bolt" trstate "github.com/hashicorp/nomad/client/allocrunnerv2/taskrunner/state" + "github.com/hashicorp/nomad/helper/boltdd" "github.com/hashicorp/nomad/nomad/structs" ) @@ -57,20 +57,18 @@ func GetStateDBFactory(devMode bool) NewStateDBFunc { // methods are safe for concurrent access. Create via NewStateDB by setting // devMode=false. type BoltStateDB struct { - db *bolt.DB - codec *keyValueCodec + db *boltdd.DB } func NewBoltStateDB(stateDir string) (StateDB, error) { // Create or open the boltdb state database - db, err := bolt.Open(filepath.Join(stateDir, "state.db"), 0600, nil) + db, err := boltdd.Open(filepath.Join(stateDir, "state.db"), 0600, nil) if err != nil { return nil, fmt.Errorf("failed to create state database: %v", err) } sdb := &BoltStateDB{ - db: db, - codec: newKeyValueCodec(), + db: db, } return sdb, nil } @@ -83,7 +81,7 @@ func NewBoltStateDB(stateDir string) (StateDB, error) { func (s *BoltStateDB) GetAllAllocations() ([]*structs.Allocation, map[string]error, error) { var allocs []*structs.Allocation var errs map[string]error - err := s.db.View(func(tx *bolt.Tx) error { + err := s.db.View(func(tx *boltdd.Tx) error { allocs, errs = s.getAllAllocations(tx) return nil }) @@ -101,8 +99,8 @@ type allocEntry struct { Alloc *structs.Allocation } -func (s *BoltStateDB) getAllAllocations(tx *bolt.Tx) ([]*structs.Allocation, map[string]error) { - allocationsBkt := newNamedBucket(tx, allocationsBucket) +func (s *BoltStateDB) getAllAllocations(tx *boltdd.Tx) ([]*structs.Allocation, map[string]error) { + allocationsBkt := tx.Bucket(allocationsBucket) if allocationsBkt == nil { // No allocs return nil, nil @@ -112,7 +110,7 @@ func (s *BoltStateDB) getAllAllocations(tx *bolt.Tx) ([]*structs.Allocation, map errs := map[string]error{} // Create a cursor for iteration. - c := allocationsBkt.bkt.Cursor() + c := allocationsBkt.BoltBucket().Cursor() // Iterate over all the allocation buckets for k, _ := c.First(); k != nil; k, _ = c.Next() { @@ -124,7 +122,7 @@ func (s *BoltStateDB) getAllAllocations(tx *bolt.Tx) ([]*structs.Allocation, map } var allocState allocEntry - if err := s.codec.Get(allocBkt, allocKey, &allocState); err != nil { + if err := allocBkt.Get(allocKey, &allocState); err != nil { errs[allocID] = fmt.Errorf("failed to decode alloc %v", err) continue } @@ -137,9 +135,9 @@ func (s *BoltStateDB) getAllAllocations(tx *bolt.Tx) ([]*structs.Allocation, map // PutAllocation stores an allocation or returns an error. func (s *BoltStateDB) PutAllocation(alloc *structs.Allocation) error { - return s.db.Update(func(tx *bolt.Tx) error { + return s.db.Update(func(tx *boltdd.Tx) error { // Retrieve the root allocations bucket - allocsBkt, err := createNamedBucketIfNotExists(tx, allocationsBucket) + allocsBkt, err := tx.CreateBucketIfNotExists(allocationsBucket) if err != nil { return err } @@ -154,7 +152,7 @@ func (s *BoltStateDB) PutAllocation(alloc *structs.Allocation) error { allocState := allocEntry{ Alloc: alloc, } - return s.codec.Put(allocBkt, allocKey, &allocState) + return allocBkt.Put(allocKey, &allocState) }) } @@ -164,7 +162,7 @@ func (s *BoltStateDB) GetTaskRunnerState(allocID, taskName string) (*trstate.Loc var ls trstate.LocalState var ts structs.TaskState - err := s.db.View(func(tx *bolt.Tx) error { + err := s.db.View(func(tx *boltdd.Tx) error { bkt, err := getTaskBucket(tx, allocID, taskName) if err != nil { return fmt.Errorf("failed to get task %q bucket: %v", taskName, err) @@ -172,12 +170,12 @@ func (s *BoltStateDB) GetTaskRunnerState(allocID, taskName string) (*trstate.Loc // Restore Local State //XXX set persisted hash to avoid immediate write on first use? - if err := s.codec.Get(bkt, taskLocalStateKey, &ls); err != nil { + if err := bkt.Get(taskLocalStateKey, &ls); err != nil { return fmt.Errorf("failed to read local task runner state: %v", err) } // Restore Task State - if err := s.codec.Get(bkt, taskStateKey, &ts); err != nil { + if err := bkt.Get(taskStateKey, &ts); err != nil { return fmt.Errorf("failed to read task state: %v", err) } @@ -199,13 +197,13 @@ func (s *BoltStateDB) GetTaskRunnerState(allocID, taskName string) (*trstate.Loc // PutTaskRunnerLocalState stores TaskRunner's LocalState or returns an error. // It is up to the caller to serialize the state to bytes. func (s *BoltStateDB) PutTaskRunnerLocalState(allocID, taskName string, val interface{}) error { - return s.db.Update(func(tx *bolt.Tx) error { + return s.db.Update(func(tx *boltdd.Tx) error { taskBkt, err := getTaskBucket(tx, allocID, taskName) if err != nil { return fmt.Errorf("failed to retrieve allocation bucket: %v", err) } - if err := s.codec.Put(taskBkt, taskLocalStateKey, val); err != nil { + if err := taskBkt.Put(taskLocalStateKey, val); err != nil { return fmt.Errorf("failed to write task_runner state: %v", err) } @@ -215,21 +213,21 @@ func (s *BoltStateDB) PutTaskRunnerLocalState(allocID, taskName string, val inte // PutTaskState stores a task's state or returns an error. func (s *BoltStateDB) PutTaskState(allocID, taskName string, state *structs.TaskState) error { - return s.db.Update(func(tx *bolt.Tx) error { + return s.db.Update(func(tx *boltdd.Tx) error { taskBkt, err := getTaskBucket(tx, allocID, taskName) if err != nil { return fmt.Errorf("failed to retrieve allocation bucket: %v", err) } - return s.codec.Put(taskBkt, taskStateKey, state) + return taskBkt.Put(taskStateKey, state) }) } // DeleteTaskBucket is used to delete a task bucket if it exists. func (s *BoltStateDB) DeleteTaskBucket(allocID, taskName string) error { - return s.db.Update(func(tx *bolt.Tx) error { + return s.db.Update(func(tx *boltdd.Tx) error { // Retrieve the root allocations bucket - allocations := newNamedBucket(tx, allocationsBucket) + allocations := tx.Bucket(allocationsBucket) if allocations == nil { return nil } @@ -242,21 +240,21 @@ func (s *BoltStateDB) DeleteTaskBucket(allocID, taskName string) error { // Check if the bucket exists key := []byte(taskName) - return s.codec.DeleteBucket(alloc, key) + return alloc.DeleteBucket(key) }) } // DeleteAllocationBucket is used to delete an allocation bucket if it exists. func (s *BoltStateDB) DeleteAllocationBucket(allocID string) error { - return s.db.Update(func(tx *bolt.Tx) error { + return s.db.Update(func(tx *boltdd.Tx) error { // Retrieve the root allocations bucket - allocations := newNamedBucket(tx, allocationsBucket) + allocations := tx.Bucket(allocationsBucket) if allocations == nil { return nil } key := []byte(allocID) - return s.codec.DeleteBucket(allocations, key) + return allocations.DeleteBucket(key) }) } @@ -270,18 +268,18 @@ func (s *BoltStateDB) Close() error { // particular allocation. If the root allocation bucket or the specific // allocation bucket doesn't exist, it will be created as long as the // transaction is writable. -func getAllocationBucket(tx *bolt.Tx, allocID string) (kvStore, error) { +func getAllocationBucket(tx *boltdd.Tx, allocID string) (*boltdd.Bucket, error) { var err error w := tx.Writable() // Retrieve the root allocations bucket - allocations := newNamedBucket(tx, allocationsBucket) + allocations := tx.Bucket(allocationsBucket) if allocations == nil { if !w { return nil, fmt.Errorf("Allocations bucket doesn't exist and transaction is not writable") } - allocations, err = createNamedBucketIfNotExists(tx, allocationsBucket) + allocations, err = tx.CreateBucketIfNotExists(allocationsBucket) if err != nil { return nil, err } @@ -308,7 +306,7 @@ func getAllocationBucket(tx *bolt.Tx, allocID string) (kvStore, error) { // particular task. If the root allocation bucket, the specific // allocation or task bucket doesn't exist, they will be created as long as the // transaction is writable. -func getTaskBucket(tx *bolt.Tx, allocID, taskName string) (kvStore, error) { +func getTaskBucket(tx *boltdd.Tx, allocID, taskName string) (*boltdd.Bucket, error) { alloc, err := getAllocationBucket(tx, allocID) if err != nil { return nil, err diff --git a/helper/boltdd/boltdd.go b/helper/boltdd/boltdd.go new file mode 100644 index 000000000..0a9ec9897 --- /dev/null +++ b/helper/boltdd/boltdd.go @@ -0,0 +1,360 @@ +// boltdd contains a wrapper around BoltDB to deduplicate writes and encode +// values using mgspack. (dd stands for DeDuplicate) +package boltdd + +import ( + "bytes" + "fmt" + "os" + "sync" + + "github.com/boltdb/bolt" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/ugorji/go/codec" + "golang.org/x/crypto/blake2b" +) + +// DB wraps an underlying bolt.DB to create write deduplicating buckets and +// msgpack encoded values. +type DB struct { + rootBuckets map[string]*bucketMeta + rootBucketsLock sync.Mutex + + bdb *bolt.DB +} + +// Open a bolt.DB and wrap it in a write-deduplicating msgpack-encoding +// implementation. +func Open(path string, mode os.FileMode, options *bolt.Options) (*DB, error) { + bdb, err := bolt.Open(path, mode, options) + if err != nil { + return nil, err + } + + return &DB{ + rootBuckets: make(map[string]*bucketMeta), + bdb: bdb, + }, nil +} + +func (db *DB) bucket(btx *bolt.Tx, name []byte) *Bucket { + bb := btx.Bucket(name) + if bb == nil { + return nil + } + + db.rootBucketsLock.Lock() + defer db.rootBucketsLock.Unlock() + + b, ok := db.rootBuckets[string(name)] + if !ok { + b = newBucketMeta() + db.rootBuckets[string(name)] = b + } + + return newBucket(b, bb) +} + +func (db *DB) createBucket(btx *bolt.Tx, name []byte) (*Bucket, error) { + bb, err := btx.CreateBucket(name) + if err != nil { + return nil, err + } + + db.rootBucketsLock.Lock() + defer db.rootBucketsLock.Unlock() + + // Always create a new Bucket since CreateBucket above fails if the + // bucket already exists. + b := newBucketMeta() + db.rootBuckets[string(name)] = b + + return newBucket(b, bb), nil +} + +func (db *DB) createBucketIfNotExists(btx *bolt.Tx, name []byte) (*Bucket, error) { + bb, err := btx.CreateBucketIfNotExists(name) + if err != nil { + return nil, err + } + + db.rootBucketsLock.Lock() + defer db.rootBucketsLock.Unlock() + + b, ok := db.rootBuckets[string(name)] + if !ok { + b = newBucketMeta() + db.rootBuckets[string(name)] = b + } + + return newBucket(b, bb), nil +} + +func (db *DB) Update(fn func(*Tx) error) error { + return db.bdb.Update(func(btx *bolt.Tx) error { + tx := newTx(db, btx) + return fn(tx) + }) +} + +func (db *DB) View(fn func(*Tx) error) error { + return db.bdb.View(func(btx *bolt.Tx) error { + tx := newTx(db, btx) + return fn(tx) + }) +} + +// Close closes the underlying bolt.DB and clears all bucket hashes. DB is +// unusable after closing. +func (db *DB) Close() error { + db.rootBuckets = nil + return db.bdb.Close() +} + +// BoltDB returns the underlying bolt.DB. +func (db *DB) BoltDB() *bolt.DB { + return db.bdb +} + +type Tx struct { + db *DB + btx *bolt.Tx +} + +func newTx(db *DB, btx *bolt.Tx) *Tx { + return &Tx{ + db: db, + btx: btx, + } +} + +// Bucket returns a root bucket or nil if it doesn't exist. +func (tx *Tx) Bucket(name []byte) *Bucket { + return tx.db.bucket(tx.btx, name) +} + +func (tx *Tx) CreateBucket(name []byte) (*Bucket, error) { + return tx.db.createBucket(tx.btx, name) +} + +// CreateBucketIfNotExists returns a root bucket or creates a new one if it +// doesn't already exist. +func (tx *Tx) CreateBucketIfNotExists(name []byte) (*Bucket, error) { + return tx.db.createBucketIfNotExists(tx.btx, name) +} + +// Writable wraps boltdb Tx.Writable. +func (tx *Tx) Writable() bool { + return tx.btx.Writable() +} + +// BoltTx returns the unerlying bolt.Tx +func (tx *Tx) BoltTx() *bolt.Tx { + return tx.btx +} + +// bucketMeta persists metadata -- such as key hashes and child buckets -- +// about boltdb Buckets across transactions. +type bucketMeta struct { + // hashes holds all of the value hashes for keys in this bucket + hashes map[string][]byte + hashesLock sync.Mutex + + // buckets holds all of the child buckets + buckets map[string]*bucketMeta + bucketsLock sync.Mutex +} + +func newBucketMeta() *bucketMeta { + return &bucketMeta{ + hashes: make(map[string][]byte), + buckets: make(map[string]*bucketMeta), + } +} + +// getHash of last value written to a key or nil if no hash exists. +func (bm *bucketMeta) getHash(hashKey string) []byte { + bm.hashesLock.Lock() + lastHash := bm.hashes[hashKey] + bm.hashesLock.Unlock() + return lastHash +} + +// setHash of last value written to key. +func (bm *bucketMeta) setHash(hashKey string, hashVal []byte) { + bm.hashesLock.Lock() + bm.hashes[hashKey] = hashVal + bm.hashesLock.Unlock() +} + +// delHash deletes a hash value or does nothing if the hash key does not exist. +func (bm *bucketMeta) delHash(hashKey string) { + bm.hashesLock.Lock() + delete(bm.hashes, hashKey) + bm.hashesLock.Unlock() +} + +// createBucket metadata entry for the given nested bucket. Overwrites any +// existing entry so caller should ensure bucket does not already exist. +func (bm *bucketMeta) createBucket(name []byte) *bucketMeta { + bm.bucketsLock.Lock() + defer bm.bucketsLock.Unlock() + + // Always create a new Bucket since CreateBucket above fails if the + // bucket already exists. + b := newBucketMeta() + bm.buckets[string(name)] = b + return b +} + +// deleteBucket metadata entry for the given nested bucket. Does nothing if +// nested bucket metadata does not exist. +func (bm *bucketMeta) deleteBucket(name []byte) { + bm.bucketsLock.Lock() + delete(bm.buckets, string(name)) + bm.bucketsLock.Unlock() + +} + +// getOrCreateBucket metadata entry for the given nested bucket. +func (bm *bucketMeta) getOrCreateBucket(name []byte) *bucketMeta { + bm.bucketsLock.Lock() + defer bm.bucketsLock.Unlock() + + b, ok := bm.buckets[string(name)] + if !ok { + b = newBucketMeta() + bm.buckets[string(name)] = b + } + return b +} + +type Bucket struct { + bm *bucketMeta + boltBucket *bolt.Bucket +} + +// newBucket creates a new view into a bucket backed by a boltdb +// transaction. +func newBucket(b *bucketMeta, bb *bolt.Bucket) *Bucket { + return &Bucket{ + bm: b, + boltBucket: bb, + } +} + +// Put into boltdb iff it has changed since the last write. +func (b *Bucket) Put(key []byte, val interface{}) error { + // buffer for writing serialized state to + var buf bytes.Buffer + + // Serialize the object + if err := codec.NewEncoder(&buf, structs.MsgpackHandle).Encode(val); err != nil { + return fmt.Errorf("failed to encode passed object: %v", err) + } + + // Hash for skipping unnecessary writes + hashKey := string(key) + hashVal := blake2b.Sum256(buf.Bytes()) + + // lastHash value or nil if it hasn't been hashed yet + lastHash := b.bm.getHash(hashKey) + + // If the hashes are equal, skip the write + if bytes.Equal(hashVal[:], lastHash) { + return nil + } + + // New value: write it to the underlying boltdb + if err := b.boltBucket.Put(key, buf.Bytes()); err != nil { + return fmt.Errorf("failed to write data at key %s: %v", key, err) + } + + // New value written, store hash (bucket path map was created above) + b.bm.setHash(hashKey, hashVal[:]) + + return nil + +} + +// Get value by key from boltdb or return an error if key not found. +func (b *Bucket) Get(key []byte, obj interface{}) error { + // Get the raw data from the underlying boltdb + data := b.boltBucket.Get(key) + if data == nil { + return fmt.Errorf("no data at key %v", string(key)) + } + + // Deserialize the object + if err := codec.NewDecoderBytes(data, structs.MsgpackHandle).Decode(obj); err != nil { + return fmt.Errorf("failed to decode data into passed object: %v", err) + } + + return nil +} + +// Delete removes a key from the bucket. If the key does not exist then nothing +// is done and a nil error is returned. Returns an error if the bucket was +// created from a read-only transaction. +func (b *Bucket) Delete(key []byte) error { + err := b.boltBucket.Delete(key) + b.bm.delHash(string(key)) + return err +} + +// Bucket represents a boltdb Bucket and its associated metadata necessary for +// write deduplication. Like bolt.Buckets it is only valid for the duration of +// the transaction that created it. +func (b *Bucket) Bucket(name []byte) *Bucket { + bb := b.boltBucket.Bucket(name) + if bb == nil { + return nil + } + + bmeta := b.bm.getOrCreateBucket(name) + return newBucket(bmeta, bb) +} + +// CreateBucket creates a new bucket at the given key and returns the new +// bucket. Returns an error if the key already exists, if the bucket name is +// blank, or if the bucket name is too long. The bucket instance is only valid +// for the lifetime of the transaction. +func (b *Bucket) CreateBucket(name []byte) (*Bucket, error) { + bb, err := b.boltBucket.CreateBucket(name) + if err != nil { + return nil, err + } + + bmeta := b.bm.createBucket(name) + return newBucket(bmeta, bb), nil +} + +// CreateBucketIfNotExists creates a new bucket if it doesn't already exist and +// returns a reference to it. The bucket instance is only valid for the +// lifetime of the transaction. +func (b *Bucket) CreateBucketIfNotExists(name []byte) (*Bucket, error) { + bb, err := b.boltBucket.CreateBucketIfNotExists(name) + if err != nil { + return nil, err + } + + bmeta := b.bm.getOrCreateBucket(name) + return newBucket(bmeta, bb), nil +} + +// DeleteBucket deletes a child bucket. Returns an error if the bucket does not +// exist or corresponds to a non-bucket key. +func (b *Bucket) DeleteBucket(name []byte) error { + // Delete the bucket from the underlying boltdb + err := b.boltBucket.DeleteBucket(name) + + // Remove reference to child bucket + b.bm.deleteBucket(name) + return err +} + +// BoltBucket returns the internal bolt.Bucket for this Bucket. Only valid +// for the duration of the current transaction. +func (b *Bucket) BoltBucket() *bolt.Bucket { + return b.boltBucket +} diff --git a/helper/boltdd/boltdd_test.go b/helper/boltdd/boltdd_test.go new file mode 100644 index 000000000..da3681ef6 --- /dev/null +++ b/helper/boltdd/boltdd_test.go @@ -0,0 +1,344 @@ +package boltdd + +import ( + "bytes" + "fmt" + "io/ioutil" + "os" + "path/filepath" + "testing" + + "github.com/hashicorp/nomad/nomad/mock" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/stretchr/testify/require" + "github.com/ugorji/go/codec" +) + +type testingT interface { + Fatalf(format string, args ...interface{}) + Logf(format string, args ...interface{}) +} + +func setupBoltDB(t testingT) (*DB, func()) { + dir, err := ioutil.TempDir("", "nomadtest_") + if err != nil { + t.Fatalf("error creating tempdir: %v", err) + } + + cleanup := func() { + if err := os.RemoveAll(dir); err != nil { + t.Logf("error removing test dir: %v", err) + } + } + + dbFilename := filepath.Join(dir, "nomadtest.db") + db, err := Open(dbFilename, 0600, nil) + if err != nil { + cleanup() + t.Fatalf("error creating boltdb: %v", err) + } + + return db, func() { + db.Close() + cleanup() + } +} + +func TestDB_Open(t *testing.T) { + t.Parallel() + require := require.New(t) + + db, cleanup := setupBoltDB(t) + defer cleanup() + + require.Equal(0, db.BoltDB().Stats().TxStats.Write) +} + +func TestBucket_Create(t *testing.T) { + t.Parallel() + require := require.New(t) + + db, cleanup := setupBoltDB(t) + defer cleanup() + + name := []byte("create_test") + + require.NoError(db.Update(func(tx *Tx) error { + // Trying to get a nonexistent bucket should return nil + require.Nil(tx.Bucket(name)) + + // Creating a nonexistent bucket should work + b, err := tx.CreateBucket(name) + require.NoError(err) + require.NotNil(b) + + // Recreating a bucket that exists should fail + b, err = tx.CreateBucket(name) + require.Error(err) + require.Nil(b) + + // get or create should work + b, err = tx.CreateBucketIfNotExists(name) + require.NoError(err) + require.NotNil(b) + return nil + })) + + // Bucket should be visible + require.NoError(db.View(func(tx *Tx) error { + require.NotNil(tx.Bucket(name)) + return nil + })) +} + +func TestBucket_DedupeWrites(t *testing.T) { + t.Parallel() + require := require.New(t) + + db, cleanup := setupBoltDB(t) + defer cleanup() + + bname := []byte("dedupewrites_test") + k1name := []byte("k1") + k2name := []byte("k2") + + // Put 2 keys + require.NoError(db.Update(func(tx *Tx) error { + b, err := tx.CreateBucket(bname) + require.NoError(err) + + require.NoError(b.Put(k1name, k1name)) + require.NoError(b.Put(k2name, k2name)) + return nil + })) + + // Assert there was at least 1 write + origWrites := db.BoltDB().Stats().TxStats.Write + require.NotZero(origWrites) + + // Write the same values again and expect no new writes + require.NoError(db.Update(func(tx *Tx) error { + b := tx.Bucket(bname) + require.NoError(b.Put(k1name, k1name)) + require.NoError(b.Put(k2name, k2name)) + return nil + })) + + putWrites := db.BoltDB().Stats().TxStats.Write + + // Unforunately every committed transaction causes two writes, so this + // only saves 1 write operation + require.Equal(origWrites+2, putWrites) + + // Write new values and assert more writes took place + require.NoError(db.Update(func(tx *Tx) error { + b := tx.Bucket(bname) + require.NoError(b.Put(k1name, []byte("newval1"))) + require.NoError(b.Put(k2name, []byte("newval2"))) + return nil + })) + + putWrites2 := db.BoltDB().Stats().TxStats.Write + + // Expect 3 additional writes: 2 for the transaction and one for the + // dirty page + require.Equal(putWrites+3, putWrites2) +} + +func TestBucket_Delete(t *testing.T) { + t.Parallel() + require := require.New(t) + + db, cleanup := setupBoltDB(t) + defer cleanup() + + parentName := []byte("delete_test") + parentKey := []byte("parent_key") + childName := []byte("child") + childKey := []byte("child_key") + grandchildName1 := []byte("grandchild1") + grandchildKey1 := []byte("grandchild_key1") + grandchildName2 := []byte("grandchild2") + grandchildKey2 := []byte("grandchild_key2") + + // Create a parent bucket with 1 child and 2 grandchildren + require.NoError(db.Update(func(tx *Tx) error { + pb, err := tx.CreateBucket(parentName) + require.NoError(err) + + require.NoError(pb.Put(parentKey, parentKey)) + + child, err := pb.CreateBucket(childName) + require.NoError(err) + + require.NoError(child.Put(childKey, childKey)) + + grandchild1, err := child.CreateBucket(grandchildName1) + require.NoError(err) + + require.NoError(grandchild1.Put(grandchildKey1, grandchildKey1)) + + grandchild2, err := child.CreateBucket(grandchildName2) + require.NoError(err) + + require.NoError(grandchild2.Put(grandchildKey2, grandchildKey2)) + return nil + })) + + // Verify grandchild keys wrote + require.NoError(db.View(func(tx *Tx) error { + grandchild1 := tx.Bucket(parentName).Bucket(childName).Bucket(grandchildName1) + var v1 []byte + grandchild1.Get(grandchildKey1, &v1) + require.Equal(grandchildKey1, v1) + + grandchild2 := tx.Bucket(parentName).Bucket(childName).Bucket(grandchildName2) + var v2 []byte + grandchild2.Get(grandchildKey2, &v2) + require.Equal(grandchildKey2, v2) + return nil + })) + + // Delete grandchildKey1 and grandchild2 + require.NoError(db.Update(func(tx *Tx) error { + child := tx.Bucket(parentName).Bucket(childName) + + require.NoError(child.DeleteBucket(grandchildName2)) + + grandchild1 := child.Bucket(grandchildName1) + require.NoError(grandchild1.Delete(grandchildKey1)) + return nil + })) + + // Ensure grandchild2 alone was deleted + require.NoError(db.View(func(tx *Tx) error { + grandchild1 := tx.Bucket(parentName).Bucket(childName).Bucket(grandchildName1) + var v1 []byte + grandchild1.Get(grandchildKey1, &v1) + require.Equal(([]byte)(nil), v1) + + grandchild2 := tx.Bucket(parentName).Bucket(childName).Bucket(grandchildName2) + require.Nil(grandchild2) + return nil + })) + + // Deleting child bucket should delete grandchild1 as well + require.NoError(db.Update(func(tx *Tx) error { + parent := tx.Bucket(parentName) + require.NoError(parent.DeleteBucket(childName)) + + // Recreate child bucket and ensure childKey and grandchild are gone + child, err := parent.CreateBucket(childName) + require.NoError(err) + + var v []byte + require.Error(child.Get(childKey, &v)) + require.Equal(([]byte)(nil), v) + + require.Nil(child.Bucket(grandchildName1)) + + // Rewrite childKey1 to make sure it doesn't get dedupe incorrectly + require.NoError(child.Put(childKey, childKey)) + return nil + })) + + // Ensure childKey1 was rewritten and not deduped incorrectly + require.NoError(db.View(func(tx *Tx) error { + var v []byte + require.NoError(tx.Bucket(parentName).Bucket(childName).Get(childKey, &v)) + require.Equal(childKey, v) + return nil + })) +} + +func BenchmarkWriteDeduplication_On(b *testing.B) { + db, cleanup := setupBoltDB(b) + defer cleanup() + + bucketName := []byte("allocations") + alloc := mock.Alloc() + allocID := []byte(alloc.ID) + + err := db.Update(func(tx *Tx) error { + allocs, err := tx.CreateBucket(bucketName) + if err != nil { + return err + } + + return allocs.Put(allocID, alloc) + }) + + if err != nil { + b.Fatalf("error setting up: %v", err) + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + err := db.Update(func(tx *Tx) error { + return tx.Bucket(bucketName).Put(allocID, alloc) + }) + + if err != nil { + b.Fatalf("error at runtime: %v", err) + } + } +} + +func BenchmarkWriteDeduplication_Off(b *testing.B) { + dir, err := ioutil.TempDir("", "nomadtest_") + if err != nil { + b.Fatalf("error creating tempdir: %v", err) + } + + defer func() { + if err := os.RemoveAll(dir); err != nil { + b.Logf("error removing test dir: %v", err) + } + }() + + dbFilename := filepath.Join(dir, "nomadtest.db") + db, err := Open(dbFilename, 0600, nil) + if err != nil { + b.Fatalf("error creating boltdb: %v", err) + } + + defer db.Close() + + bucketName := []byte("allocations") + alloc := mock.Alloc() + allocID := []byte(alloc.ID) + + err = db.Update(func(tx *Tx) error { + allocs, err := tx.CreateBucket(bucketName) + if err != nil { + return err + } + + var buf bytes.Buffer + if err := codec.NewEncoder(&buf, structs.MsgpackHandle).Encode(alloc); err != nil { + return fmt.Errorf("failed to encode passed object: %v", err) + } + + return allocs.Put(allocID, buf) + }) + + if err != nil { + b.Fatalf("error setting up: %v", err) + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + err := db.Update(func(tx *Tx) error { + var buf bytes.Buffer + if err := codec.NewEncoder(&buf, structs.MsgpackHandle).Encode(alloc); err != nil { + return fmt.Errorf("failed to encode passed object: %v", err) + } + + return tx.Bucket(bucketName).Put(allocID, buf) + }) + + if err != nil { + b.Fatalf("error at runtime: %v", err) + } + } +}