consul/state: adding shallow delete for kvs store
This commit is contained in:
parent
b2dc11fed4
commit
9b4f8cd800
|
@ -719,7 +719,7 @@ func (s *StateStore) kvsSetTxn(
|
|||
// Retrieve an existing KV pair
|
||||
existing, err := tx.First("kvs", "id", entry.Key)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed key lookup: %s", err)
|
||||
return fmt.Errorf("failed kvs lookup: %s", err)
|
||||
}
|
||||
|
||||
// Set the indexes
|
||||
|
@ -733,7 +733,7 @@ func (s *StateStore) kvsSetTxn(
|
|||
|
||||
// Store the kv pair in the state store and update the index
|
||||
if err := tx.Insert("kvs", entry); err != nil {
|
||||
return fmt.Errorf("failed inserting kv entry: %s", err)
|
||||
return fmt.Errorf("failed inserting kvs entry: %s", err)
|
||||
}
|
||||
if err := tx.Insert("index", &IndexEntry{"kvs", idx}); err != nil {
|
||||
return fmt.Errorf("failed updating index: %s", err)
|
||||
|
@ -750,7 +750,41 @@ func (s *StateStore) KVSGet(key string) (*structs.DirEntry, error) {
|
|||
|
||||
entry, err := tx.First("kvs", "id", key)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed key lookup: %s", err)
|
||||
return nil, fmt.Errorf("failed kvs lookup: %s", err)
|
||||
}
|
||||
return entry.(*structs.DirEntry), nil
|
||||
}
|
||||
|
||||
// KVSDelete is used to perform a shallow delete on a single key in the
|
||||
// the state store.
|
||||
func (s *StateStore) KVSDelete(idx uint64, key string) error {
|
||||
tx := s.db.Txn(true)
|
||||
defer tx.Abort()
|
||||
|
||||
// Perform the actual delete
|
||||
if err := s.kvsDeleteTxn(idx, key, tx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
tx.Commit()
|
||||
return nil
|
||||
}
|
||||
|
||||
// kvsDeleteTxn is the inner method used to perform the actual deletion
|
||||
// of a key/value pair within an existing transaction.
|
||||
func (s *StateStore) kvsDeleteTxn(idx uint64, key string, tx *memdb.Txn) error {
|
||||
// Look up the entry in the state store
|
||||
entry, err := tx.First("kvs", "id", key)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed key lookup: %s", err)
|
||||
}
|
||||
|
||||
// Delete the entry and update the index
|
||||
if err := tx.Delete("kvs", entry); err != nil {
|
||||
return fmt.Errorf("failed deleting kvs entry: %s", err)
|
||||
}
|
||||
if err := tx.Insert("index", &IndexEntry{"kvs", idx}); err != nil {
|
||||
return fmt.Errorf("failed updating index: %s", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -82,6 +82,23 @@ func testRegisterCheck(t *testing.T, s *StateStore, idx uint64,
|
|||
}
|
||||
}
|
||||
|
||||
func testSetKey(t *testing.T, s *StateStore, idx uint64, key, value string) {
|
||||
entry := &structs.DirEntry{Key: key, Value: []byte(value)}
|
||||
if err := s.KVSSet(idx, entry); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
||||
tx := s.db.Txn(false)
|
||||
defer tx.Abort()
|
||||
e, err := tx.First("kvs", "id", key)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
if result, ok := e.(*structs.DirEntry); !ok || result.Key != key {
|
||||
t.Fatalf("bad kvs entry: %#v", result)
|
||||
}
|
||||
}
|
||||
|
||||
func TestStateStore_EnsureNode(t *testing.T) {
|
||||
s := testStateStore(t)
|
||||
|
||||
|
@ -828,3 +845,41 @@ func TestStateStore_KVSSet(t *testing.T) {
|
|||
t.Fatalf("expected 'baz', got '%s'", v)
|
||||
}
|
||||
}
|
||||
|
||||
func TestStateStore_KVSDelete(t *testing.T) {
|
||||
s := testStateStore(t)
|
||||
|
||||
// Create some KV pairs
|
||||
testSetKey(t, s, 1, "foo", "foo")
|
||||
testSetKey(t, s, 2, "foo/bar", "bar")
|
||||
|
||||
// Call a delete on a specific key
|
||||
if err := s.KVSDelete(3, "foo"); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
||||
// The entry was removed from the state store
|
||||
tx := s.db.Txn(false)
|
||||
defer tx.Abort()
|
||||
e, err := tx.First("kvs", "id", "foo")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
if e != nil {
|
||||
t.Fatalf("expected kvs entry to be deleted, got: %#v", e)
|
||||
}
|
||||
|
||||
// Try fetching the other keys to ensure they still exist
|
||||
e, err = tx.First("kvs", "id", "foo/bar")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
if e == nil || string(e.(*structs.DirEntry).Value) != "bar" {
|
||||
t.Fatalf("bad kvs entry: %#v", e)
|
||||
}
|
||||
|
||||
// Check that the index table was updated
|
||||
if idx := s.maxIndex("kvs"); idx != 3 {
|
||||
t.Fatalf("bad index: %d", idx)
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue