From 9b4f8cd800fa93c10cf6ce752ff2e36634a4243c Mon Sep 17 00:00:00 2001 From: Ryan Uber Date: Tue, 1 Sep 2015 16:50:28 -0700 Subject: [PATCH] consul/state: adding shallow delete for kvs store --- consul/state/state_store.go | 40 +++++++++++++++++++++-- consul/state/state_store_test.go | 55 ++++++++++++++++++++++++++++++++ 2 files changed, 92 insertions(+), 3 deletions(-) diff --git a/consul/state/state_store.go b/consul/state/state_store.go index 447686230..880477432 100644 --- a/consul/state/state_store.go +++ b/consul/state/state_store.go @@ -719,7 +719,7 @@ func (s *StateStore) kvsSetTxn( // Retrieve an existing KV pair existing, err := tx.First("kvs", "id", entry.Key) if err != nil { - return fmt.Errorf("failed key lookup: %s", err) + return fmt.Errorf("failed kvs lookup: %s", err) } // Set the indexes @@ -733,7 +733,7 @@ func (s *StateStore) kvsSetTxn( // Store the kv pair in the state store and update the index if err := tx.Insert("kvs", entry); err != nil { - return fmt.Errorf("failed inserting kv entry: %s", err) + return fmt.Errorf("failed inserting kvs entry: %s", err) } if err := tx.Insert("index", &IndexEntry{"kvs", idx}); err != nil { return fmt.Errorf("failed updating index: %s", err) @@ -750,7 +750,41 @@ func (s *StateStore) KVSGet(key string) (*structs.DirEntry, error) { entry, err := tx.First("kvs", "id", key) if err != nil { - return nil, fmt.Errorf("failed key lookup: %s", err) + return nil, fmt.Errorf("failed kvs lookup: %s", err) } return entry.(*structs.DirEntry), nil } + +// KVSDelete is used to perform a shallow delete on a single key in the +// the state store. +func (s *StateStore) KVSDelete(idx uint64, key string) error { + tx := s.db.Txn(true) + defer tx.Abort() + + // Perform the actual delete + if err := s.kvsDeleteTxn(idx, key, tx); err != nil { + return err + } + + tx.Commit() + return nil +} + +// kvsDeleteTxn is the inner method used to perform the actual deletion +// of a key/value pair within an existing transaction. +func (s *StateStore) kvsDeleteTxn(idx uint64, key string, tx *memdb.Txn) error { + // Look up the entry in the state store + entry, err := tx.First("kvs", "id", key) + if err != nil { + return fmt.Errorf("failed key lookup: %s", err) + } + + // Delete the entry and update the index + if err := tx.Delete("kvs", entry); err != nil { + return fmt.Errorf("failed deleting kvs entry: %s", err) + } + if err := tx.Insert("index", &IndexEntry{"kvs", idx}); err != nil { + return fmt.Errorf("failed updating index: %s", err) + } + return nil +} diff --git a/consul/state/state_store_test.go b/consul/state/state_store_test.go index 2746a642a..7008f4ccd 100644 --- a/consul/state/state_store_test.go +++ b/consul/state/state_store_test.go @@ -82,6 +82,23 @@ func testRegisterCheck(t *testing.T, s *StateStore, idx uint64, } } +func testSetKey(t *testing.T, s *StateStore, idx uint64, key, value string) { + entry := &structs.DirEntry{Key: key, Value: []byte(value)} + if err := s.KVSSet(idx, entry); err != nil { + t.Fatalf("err: %s", err) + } + + tx := s.db.Txn(false) + defer tx.Abort() + e, err := tx.First("kvs", "id", key) + if err != nil { + t.Fatalf("err: %s", err) + } + if result, ok := e.(*structs.DirEntry); !ok || result.Key != key { + t.Fatalf("bad kvs entry: %#v", result) + } +} + func TestStateStore_EnsureNode(t *testing.T) { s := testStateStore(t) @@ -828,3 +845,41 @@ func TestStateStore_KVSSet(t *testing.T) { t.Fatalf("expected 'baz', got '%s'", v) } } + +func TestStateStore_KVSDelete(t *testing.T) { + s := testStateStore(t) + + // Create some KV pairs + testSetKey(t, s, 1, "foo", "foo") + testSetKey(t, s, 2, "foo/bar", "bar") + + // Call a delete on a specific key + if err := s.KVSDelete(3, "foo"); err != nil { + t.Fatalf("err: %s", err) + } + + // The entry was removed from the state store + tx := s.db.Txn(false) + defer tx.Abort() + e, err := tx.First("kvs", "id", "foo") + if err != nil { + t.Fatalf("err: %s", err) + } + if e != nil { + t.Fatalf("expected kvs entry to be deleted, got: %#v", e) + } + + // Try fetching the other keys to ensure they still exist + e, err = tx.First("kvs", "id", "foo/bar") + if err != nil { + t.Fatalf("err: %s", err) + } + if e == nil || string(e.(*structs.DirEntry).Value) != "bar" { + t.Fatalf("bad kvs entry: %#v", e) + } + + // Check that the index table was updated + if idx := s.maxIndex("kvs"); idx != 3 { + t.Fatalf("bad index: %d", idx) + } +}