Use new DeletePrefixMethod for implementing KVSDeleteTree operation. This makes deletes on sub trees larger than one million nodes about 100 times faster. Added unit tests.
This commit is contained in:
parent
37230611a5
commit
f4cccf44e3
|
@ -420,36 +420,21 @@ func (s *Store) KVSDeleteTree(idx uint64, prefix string) error {
|
|||
// kvsDeleteTreeTxn is the inner method that does a recursive delete inside an
|
||||
// existing transaction.
|
||||
func (s *Store) kvsDeleteTreeTxn(tx *memdb.Txn, idx uint64, prefix string) error {
|
||||
// Get an iterator over all of the keys with the given prefix.
|
||||
entries, err := tx.Get("kvs", "id_prefix", prefix)
|
||||
|
||||
// For prefix deletes, only insert one tombstone and delete the entire subtree
|
||||
|
||||
deleted, err := tx.DeletePrefix("kvs", "id_prefix", prefix)
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed kvs lookup: %s", err)
|
||||
return fmt.Errorf("failed recursive deleting kvs entry: %s", err)
|
||||
}
|
||||
|
||||
// Go over all of the keys and remove them. We call the delete
|
||||
// directly so that we only update the index once. We also add
|
||||
// tombstones as we go.
|
||||
var modified bool
|
||||
var objs []interface{}
|
||||
for entry := entries.Next(); entry != nil; entry = entries.Next() {
|
||||
e := entry.(*structs.DirEntry)
|
||||
if err := s.kvsGraveyard.InsertTxn(tx, e.Key, idx); err != nil {
|
||||
// Update the index if the delete was successful.
|
||||
// Missing prefixes don't result in an index update
|
||||
if deleted {
|
||||
if err := s.kvsGraveyard.InsertTxn(tx, prefix, idx); err != nil {
|
||||
return fmt.Errorf("failed adding to graveyard: %s", err)
|
||||
}
|
||||
objs = append(objs, entry)
|
||||
modified = true
|
||||
}
|
||||
|
||||
// Do the actual deletes in a separate loop so we don't trash the
|
||||
// iterator as we go.
|
||||
for _, obj := range objs {
|
||||
if err := tx.Delete("kvs", obj); err != nil {
|
||||
return fmt.Errorf("failed deleting kvs entry: %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Update the index
|
||||
if modified {
|
||||
if err := tx.Insert("index", &IndexEntry{"kvs", idx}); err != nil {
|
||||
return fmt.Errorf("failed updating index: %s", err)
|
||||
}
|
||||
|
|
|
@ -6,6 +6,8 @@ import (
|
|||
"testing"
|
||||
"time"
|
||||
|
||||
"fmt"
|
||||
|
||||
"github.com/hashicorp/consul/agent/consul/structs"
|
||||
"github.com/hashicorp/go-memdb"
|
||||
)
|
||||
|
@ -1022,6 +1024,163 @@ func TestStateStore_KVSDeleteTree(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestStateStore_Watches_PrefixDelete(t *testing.T) {
|
||||
s := testStateStore(t)
|
||||
|
||||
// Create some KVS entries
|
||||
testSetKey(t, s, 1, "foo", "foo")
|
||||
testSetKey(t, s, 2, "foo/bar", "bar")
|
||||
testSetKey(t, s, 3, "foo/bar/zip", "zip")
|
||||
testSetKey(t, s, 4, "foo/bar/zip/zorp", "zorp")
|
||||
testSetKey(t, s, 5, "foo/bar/baz", "baz")
|
||||
|
||||
// Delete a key and make sure the index comes from the tombstone.
|
||||
ws := memdb.NewWatchSet()
|
||||
idx, _, err := s.KVSList(ws, "foo/bar/baz")
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected err: %s", err)
|
||||
}
|
||||
if err := s.KVSDeleteTree(6, "foo/bar"); err != nil {
|
||||
t.Fatalf("unexpected err: %s", err)
|
||||
}
|
||||
if !watchFired(ws) {
|
||||
t.Fatalf("expected watch to fire but it did not")
|
||||
}
|
||||
ws = memdb.NewWatchSet()
|
||||
idx, _, err = s.KVSList(ws, "foo/bar/baz")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
if idx != 6 {
|
||||
t.Fatalf("bad index: %d, expected %d", idx, 6)
|
||||
}
|
||||
|
||||
// Set a different key to bump the index. This shouldn't fire the
|
||||
// watch since there's a different prefix.
|
||||
testSetKey(t, s, 7, "some/other/key", "")
|
||||
if watchFired(ws) {
|
||||
t.Fatalf("bad")
|
||||
}
|
||||
|
||||
// Make sure we get the right index from the tombstone for the prefix
|
||||
idx, _, err = s.KVSList(nil, "foo/bar")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
||||
if idx != 6 {
|
||||
t.Fatalf("bad index: %d, expected %v", idx, 7)
|
||||
}
|
||||
|
||||
// Now ask for the index for a node within the prefix that was deleted
|
||||
// We expect to get the max index in the tree because the tombstone contains the parent foo/bar
|
||||
idx, _, err = s.KVSList(nil, "foo/bar/baz")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
||||
if idx != 7 {
|
||||
t.Fatalf("bad index: %d, expected %v", idx, 7)
|
||||
}
|
||||
|
||||
// Now reap the tombstones and make sure we get the latest index
|
||||
// since there are no matching keys.
|
||||
if err := s.ReapTombstones(6); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
idx, _, err = s.KVSList(nil, "foo/bar/baz")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
if idx != 7 {
|
||||
t.Fatalf("bad index: %d", idx)
|
||||
}
|
||||
|
||||
// List all the keys to make sure the index is also correct.
|
||||
idx, _, err = s.KVSList(nil, "")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
if idx != 7 {
|
||||
t.Fatalf("bad index: %d", idx)
|
||||
}
|
||||
}
|
||||
|
||||
func TestStateStore_KVSDeleteTreePrefix(t *testing.T) {
|
||||
s := testStateStore(t)
|
||||
|
||||
// Create kvs entries in the state store.
|
||||
for i := 0; i < 120; i++ {
|
||||
ind := uint64(i + 1)
|
||||
key := "foo/bar" + fmt.Sprintf("%d", ind)
|
||||
testSetKey(t, s, ind, key, "bar")
|
||||
}
|
||||
testSetKey(t, s, 121, "foo/zorp", "zorp")
|
||||
|
||||
// Calling tree deletion which affects nothing does not
|
||||
// modify the table index.
|
||||
if err := s.KVSDeleteTree(129, "bar"); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
if idx := s.maxIndex("kvs"); idx != 121 {
|
||||
t.Fatalf("bad index: %d", idx)
|
||||
}
|
||||
|
||||
// Call tree deletion with a nested prefix.
|
||||
if err := s.KVSDeleteTree(122, "foo/bar"); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
||||
// Check that all the matching keys were deleted
|
||||
tx := s.db.Txn(false)
|
||||
defer tx.Abort()
|
||||
|
||||
entries, err := tx.Get("kvs", "id")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
||||
num := 0
|
||||
for entry := entries.Next(); entry != nil; entry = entries.Next() {
|
||||
if entry.(*structs.DirEntry).Key != "foo/zorp" {
|
||||
t.Fatalf("unexpected kvs entry: %#v", entry)
|
||||
}
|
||||
num++
|
||||
}
|
||||
|
||||
if num != 1 {
|
||||
t.Fatalf("expected 1 key, got: %d", num)
|
||||
}
|
||||
|
||||
// Index should be updated if modifications are made
|
||||
if idx := s.maxIndex("kvs"); idx != 122 {
|
||||
t.Fatalf("bad index: %d", idx)
|
||||
}
|
||||
|
||||
// Check that the tombstones ware created and that prevents the index
|
||||
// from sliding backwards.
|
||||
idx, _, err := s.KVSList(nil, "foo")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
if idx != 122 {
|
||||
t.Fatalf("bad index: %d", idx)
|
||||
}
|
||||
|
||||
// Now reap the tombstones and watch the index revert to the remaining
|
||||
// foo/zorp key's index.
|
||||
if err := s.ReapTombstones(122); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
idx, _, err = s.KVSList(nil, "foo")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
if idx != 121 {
|
||||
t.Fatalf("bad index: %d", idx)
|
||||
}
|
||||
}
|
||||
|
||||
func TestStateStore_KVSLockDelay(t *testing.T) {
|
||||
s := testStateStore(t)
|
||||
|
||||
|
|
|
@ -68,6 +68,66 @@ func TestKeyWatch(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestKeyWatch_With_PrefixDelete(t *testing.T) {
|
||||
if consulAddr == "" {
|
||||
t.Skip()
|
||||
}
|
||||
plan := mustParse(t, `{"type":"key", "key":"foo/bar/baz"}`)
|
||||
invoke := 0
|
||||
deletedKeyWatchInvoked := 0
|
||||
plan.Handler = func(idx uint64, raw interface{}) {
|
||||
if raw == nil && deletedKeyWatchInvoked == 0 {
|
||||
deletedKeyWatchInvoked++
|
||||
return
|
||||
}
|
||||
if invoke == 0 {
|
||||
v, ok := raw.(*consulapi.KVPair)
|
||||
if !ok || v == nil || string(v.Value) != "test" {
|
||||
t.Fatalf("Bad: %#v", raw)
|
||||
}
|
||||
invoke++
|
||||
}
|
||||
}
|
||||
|
||||
go func() {
|
||||
defer plan.Stop()
|
||||
time.Sleep(20 * time.Millisecond)
|
||||
|
||||
kv := plan.client.KV()
|
||||
pair := &consulapi.KVPair{
|
||||
Key: "foo/bar/baz",
|
||||
Value: []byte("test"),
|
||||
}
|
||||
_, err := kv.Put(pair, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Wait for the query to run
|
||||
time.Sleep(20 * time.Millisecond)
|
||||
|
||||
// Delete the key
|
||||
_, err = kv.DeleteTree("foo/bar", nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
plan.Stop()
|
||||
}()
|
||||
|
||||
err := plan.Run(consulAddr)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if invoke != 1 {
|
||||
t.Fatalf("expected watch plan to be invoked once but got %v", invoke)
|
||||
}
|
||||
|
||||
if deletedKeyWatchInvoked != 1 {
|
||||
t.Fatalf("expected watch plan to be invoked once on delete but got %v", deletedKeyWatchInvoked)
|
||||
}
|
||||
}
|
||||
|
||||
func TestKeyPrefixWatch(t *testing.T) {
|
||||
if consulAddr == "" {
|
||||
t.Skip()
|
||||
|
|
Loading…
Reference in New Issue