Cuts KVS endpoints over to new fine-grained watch plumbing.
This commit is contained in:
parent
c3a1014fbf
commit
943ca1b2d9
|
@ -387,7 +387,7 @@ func TestFSM_SnapshotRestore(t *testing.T) {
|
|||
Value: []byte("foo"),
|
||||
})
|
||||
fsm.state.KVSDelete(12, "/remove")
|
||||
idx, _, err := fsm.state.KVSList("/remove")
|
||||
idx, _, err := fsm.state.KVSList(nil, "/remove")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
@ -491,7 +491,7 @@ func TestFSM_SnapshotRestore(t *testing.T) {
|
|||
}
|
||||
|
||||
// Verify key is set
|
||||
_, d, err := fsm2.state.KVSGet("/test")
|
||||
_, d, err := fsm2.state.KVSGet(nil, "/test")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
@ -617,7 +617,7 @@ func TestFSM_KVSSet(t *testing.T) {
|
|||
}
|
||||
|
||||
// Verify key is set
|
||||
_, d, err := fsm.state.KVSGet("/test/path")
|
||||
_, d, err := fsm.state.KVSGet(nil, "/test/path")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
@ -662,7 +662,7 @@ func TestFSM_KVSDelete(t *testing.T) {
|
|||
}
|
||||
|
||||
// Verify key is not set
|
||||
_, d, err := fsm.state.KVSGet("/test/path")
|
||||
_, d, err := fsm.state.KVSGet(nil, "/test/path")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
@ -708,7 +708,7 @@ func TestFSM_KVSDeleteTree(t *testing.T) {
|
|||
}
|
||||
|
||||
// Verify key is not set
|
||||
_, d, err := fsm.state.KVSGet("/test/path")
|
||||
_, d, err := fsm.state.KVSGet(nil, "/test/path")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
@ -742,7 +742,7 @@ func TestFSM_KVSDeleteCheckAndSet(t *testing.T) {
|
|||
}
|
||||
|
||||
// Verify key is set
|
||||
_, d, err := fsm.state.KVSGet("/test/path")
|
||||
_, d, err := fsm.state.KVSGet(nil, "/test/path")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
@ -763,7 +763,7 @@ func TestFSM_KVSDeleteCheckAndSet(t *testing.T) {
|
|||
}
|
||||
|
||||
// Verify key is gone
|
||||
_, d, err = fsm.state.KVSGet("/test/path")
|
||||
_, d, err = fsm.state.KVSGet(nil, "/test/path")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
@ -797,7 +797,7 @@ func TestFSM_KVSCheckAndSet(t *testing.T) {
|
|||
}
|
||||
|
||||
// Verify key is set
|
||||
_, d, err := fsm.state.KVSGet("/test/path")
|
||||
_, d, err := fsm.state.KVSGet(nil, "/test/path")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
@ -819,7 +819,7 @@ func TestFSM_KVSCheckAndSet(t *testing.T) {
|
|||
}
|
||||
|
||||
// Verify key is updated
|
||||
_, d, err = fsm.state.KVSGet("/test/path")
|
||||
_, d, err = fsm.state.KVSGet(nil, "/test/path")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
@ -976,7 +976,7 @@ func TestFSM_KVSLock(t *testing.T) {
|
|||
}
|
||||
|
||||
// Verify key is locked
|
||||
_, d, err := fsm.state.KVSGet("/test/path")
|
||||
_, d, err := fsm.state.KVSGet(nil, "/test/path")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
@ -1038,7 +1038,7 @@ func TestFSM_KVSUnlock(t *testing.T) {
|
|||
}
|
||||
|
||||
// Verify key is unlocked
|
||||
_, d, err := fsm.state.KVSGet("/test/path")
|
||||
_, d, err := fsm.state.KVSGet(nil, "/test/path")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
@ -1234,7 +1234,7 @@ func TestFSM_TombstoneReap(t *testing.T) {
|
|||
Value: []byte("foo"),
|
||||
})
|
||||
fsm.state.KVSDelete(12, "/remove")
|
||||
idx, _, err := fsm.state.KVSList("/remove")
|
||||
idx, _, err := fsm.state.KVSList(nil, "/remove")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
@ -1301,7 +1301,7 @@ func TestFSM_Txn(t *testing.T) {
|
|||
}
|
||||
|
||||
// Verify key is set directly in the state store.
|
||||
_, d, err := fsm.state.KVSGet("/test/path")
|
||||
_, d, err := fsm.state.KVSGet(nil, "/test/path")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
|
|
@ -7,6 +7,7 @@ import (
|
|||
"github.com/armon/go-metrics"
|
||||
"github.com/hashicorp/consul/acl"
|
||||
"github.com/hashicorp/consul/consul/structs"
|
||||
"github.com/hashicorp/go-memdb"
|
||||
)
|
||||
|
||||
// KVS endpoint is used to manipulate the Key-Value store
|
||||
|
@ -119,12 +120,11 @@ func (k *KVS) Get(args *structs.KeyRequest, reply *structs.IndexedDirEntries) er
|
|||
|
||||
// Get the local state
|
||||
state := k.srv.fsm.State()
|
||||
return k.srv.blockingRPC(
|
||||
return k.srv.blockingQuery(
|
||||
&args.QueryOptions,
|
||||
&reply.QueryMeta,
|
||||
state.GetKVSWatch(args.Key),
|
||||
func() error {
|
||||
index, ent, err := state.KVSGet(args.Key)
|
||||
func(ws memdb.WatchSet) error {
|
||||
index, ent, err := state.KVSGet(ws, args.Key)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -161,12 +161,11 @@ func (k *KVS) List(args *structs.KeyRequest, reply *structs.IndexedDirEntries) e
|
|||
|
||||
// Get the local state
|
||||
state := k.srv.fsm.State()
|
||||
return k.srv.blockingRPC(
|
||||
return k.srv.blockingQuery(
|
||||
&args.QueryOptions,
|
||||
&reply.QueryMeta,
|
||||
state.GetKVSWatch(args.Key),
|
||||
func() error {
|
||||
index, ent, err := state.KVSList(args.Key)
|
||||
func(ws memdb.WatchSet) error {
|
||||
index, ent, err := state.KVSList(ws, args.Key)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -204,12 +203,11 @@ func (k *KVS) ListKeys(args *structs.KeyListRequest, reply *structs.IndexedKeyLi
|
|||
|
||||
// Get the local state
|
||||
state := k.srv.fsm.State()
|
||||
return k.srv.blockingRPC(
|
||||
return k.srv.blockingQuery(
|
||||
&args.QueryOptions,
|
||||
&reply.QueryMeta,
|
||||
state.GetKVSWatch(args.Prefix),
|
||||
func() error {
|
||||
index, keys, err := state.KVSListKeys(args.Prefix, args.Seperator)
|
||||
func(ws memdb.WatchSet) error {
|
||||
index, keys, err := state.KVSListKeys(ws, args.Prefix, args.Seperator)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -36,7 +36,7 @@ func TestKVS_Apply(t *testing.T) {
|
|||
|
||||
// Verify
|
||||
state := s1.fsm.State()
|
||||
_, d, err := state.KVSGet("test")
|
||||
_, d, err := state.KVSGet(nil, "test")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
@ -58,7 +58,7 @@ func TestKVS_Apply(t *testing.T) {
|
|||
}
|
||||
|
||||
// Verify
|
||||
_, d, err = state.KVSGet("test")
|
||||
_, d, err = state.KVSGet(nil, "test")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
|
|
@ -119,24 +119,25 @@ func (s *StateStore) kvsSetTxn(tx *memdb.Txn, idx uint64, entry *structs.DirEntr
|
|||
}
|
||||
|
||||
// KVSGet is used to retrieve a key/value pair from the state store.
|
||||
func (s *StateStore) KVSGet(key string) (uint64, *structs.DirEntry, error) {
|
||||
func (s *StateStore) KVSGet(ws memdb.WatchSet, key string) (uint64, *structs.DirEntry, error) {
|
||||
tx := s.db.Txn(false)
|
||||
defer tx.Abort()
|
||||
|
||||
return s.kvsGetTxn(tx, key)
|
||||
return s.kvsGetTxn(tx, ws, key)
|
||||
}
|
||||
|
||||
// kvsGetTxn is the inner method that gets a KVS entry inside an existing
|
||||
// transaction.
|
||||
func (s *StateStore) kvsGetTxn(tx *memdb.Txn, key string) (uint64, *structs.DirEntry, error) {
|
||||
func (s *StateStore) kvsGetTxn(tx *memdb.Txn, ws memdb.WatchSet, key string) (uint64, *structs.DirEntry, error) {
|
||||
// Get the table index.
|
||||
idx := maxIndexTxn(tx, "kvs", "tombstones")
|
||||
|
||||
// Retrieve the key.
|
||||
entry, err := tx.First("kvs", "id", key)
|
||||
watchCh, entry, err := tx.FirstWatch("kvs", "id", key)
|
||||
if err != nil {
|
||||
return 0, nil, fmt.Errorf("failed kvs lookup: %s", err)
|
||||
}
|
||||
ws.Add(watchCh)
|
||||
if entry != nil {
|
||||
return idx, entry.(*structs.DirEntry), nil
|
||||
}
|
||||
|
@ -147,16 +148,16 @@ func (s *StateStore) kvsGetTxn(tx *memdb.Txn, key string) (uint64, *structs.DirE
|
|||
// prefix is left empty, all keys in the KVS will be returned. The returned
|
||||
// is the max index of the returned kvs entries or applicable tombstones, or
|
||||
// else it's the full table indexes for kvs and tombstones.
|
||||
func (s *StateStore) KVSList(prefix string) (uint64, structs.DirEntries, error) {
|
||||
func (s *StateStore) KVSList(ws memdb.WatchSet, prefix string) (uint64, structs.DirEntries, error) {
|
||||
tx := s.db.Txn(false)
|
||||
defer tx.Abort()
|
||||
|
||||
return s.kvsListTxn(tx, prefix)
|
||||
return s.kvsListTxn(tx, ws, prefix)
|
||||
}
|
||||
|
||||
// kvsListTxn is the inner method that gets a list of KVS entries matching a
|
||||
// prefix.
|
||||
func (s *StateStore) kvsListTxn(tx *memdb.Txn, prefix string) (uint64, structs.DirEntries, error) {
|
||||
func (s *StateStore) kvsListTxn(tx *memdb.Txn, ws memdb.WatchSet, prefix string) (uint64, structs.DirEntries, error) {
|
||||
// Get the table indexes.
|
||||
idx := maxIndexTxn(tx, "kvs", "tombstones")
|
||||
|
||||
|
@ -165,6 +166,7 @@ func (s *StateStore) kvsListTxn(tx *memdb.Txn, prefix string) (uint64, structs.D
|
|||
if err != nil {
|
||||
return 0, nil, fmt.Errorf("failed kvs lookup: %s", err)
|
||||
}
|
||||
ws.Add(entries.WatchCh())
|
||||
|
||||
// Gather all of the keys found in the store
|
||||
var ents structs.DirEntries
|
||||
|
@ -203,7 +205,7 @@ func (s *StateStore) kvsListTxn(tx *memdb.Txn, prefix string) (uint64, structs.D
|
|||
// An optional separator may be specified, which can be used to slice off a part
|
||||
// of the response so that only a subset of the prefix is returned. In this
|
||||
// mode, the keys which are omitted are still counted in the returned index.
|
||||
func (s *StateStore) KVSListKeys(prefix, sep string) (uint64, []string, error) {
|
||||
func (s *StateStore) KVSListKeys(ws memdb.WatchSet, prefix, sep string) (uint64, []string, error) {
|
||||
tx := s.db.Txn(false)
|
||||
defer tx.Abort()
|
||||
|
||||
|
@ -215,6 +217,7 @@ func (s *StateStore) KVSListKeys(prefix, sep string) (uint64, []string, error) {
|
|||
if err != nil {
|
||||
return 0, nil, fmt.Errorf("failed kvs lookup: %s", err)
|
||||
}
|
||||
ws.Add(entries.WatchCh())
|
||||
|
||||
prefixLen := len(prefix)
|
||||
sepLen := len(sep)
|
||||
|
|
|
@ -7,6 +7,7 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/hashicorp/consul/consul/structs"
|
||||
"github.com/hashicorp/go-memdb"
|
||||
)
|
||||
|
||||
func TestStateStore_GC(t *testing.T) {
|
||||
|
@ -121,7 +122,7 @@ func TestStateStore_ReapTombstones(t *testing.T) {
|
|||
|
||||
// Pull out the list and check the index, which should come from the
|
||||
// tombstones.
|
||||
idx, _, err := s.KVSList("foo/")
|
||||
idx, _, err := s.KVSList(nil, "foo/")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
@ -135,7 +136,7 @@ func TestStateStore_ReapTombstones(t *testing.T) {
|
|||
}
|
||||
|
||||
// Should still be good because 7 is in there.
|
||||
idx, _, err = s.KVSList("foo/")
|
||||
idx, _, err = s.KVSList(nil, "foo/")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
@ -149,7 +150,7 @@ func TestStateStore_ReapTombstones(t *testing.T) {
|
|||
}
|
||||
|
||||
// At this point the sub index will slide backwards.
|
||||
idx, _, err = s.KVSList("foo/")
|
||||
idx, _, err = s.KVSList(nil, "foo/")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
@ -173,7 +174,8 @@ func TestStateStore_KVSSet_KVSGet(t *testing.T) {
|
|||
s := testStateStore(t)
|
||||
|
||||
// Get on an nonexistent key returns nil.
|
||||
idx, result, err := s.KVSGet("foo")
|
||||
ws := memdb.NewWatchSet()
|
||||
idx, result, err := s.KVSGet(ws, "foo")
|
||||
if result != nil || err != nil || idx != 0 {
|
||||
t.Fatalf("expected (0, nil, nil), got : (%#v, %#v, %#v)", idx, result, err)
|
||||
}
|
||||
|
@ -186,9 +188,13 @@ func TestStateStore_KVSSet_KVSGet(t *testing.T) {
|
|||
if err := s.KVSSet(1, entry); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
if !watchFired(ws) {
|
||||
t.Fatalf("bad")
|
||||
}
|
||||
|
||||
// Retrieve the K/V entry again.
|
||||
idx, result, err = s.KVSGet("foo")
|
||||
ws = memdb.NewWatchSet()
|
||||
idx, result, err = s.KVSGet(ws, "foo")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
@ -217,9 +223,13 @@ func TestStateStore_KVSSet_KVSGet(t *testing.T) {
|
|||
if err := s.KVSSet(2, update); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
if !watchFired(ws) {
|
||||
t.Fatalf("bad")
|
||||
}
|
||||
|
||||
// Fetch the kv pair and check.
|
||||
idx, result, err = s.KVSGet("foo")
|
||||
ws = memdb.NewWatchSet()
|
||||
idx, result, err = s.KVSGet(ws, "foo")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
@ -242,9 +252,13 @@ func TestStateStore_KVSSet_KVSGet(t *testing.T) {
|
|||
if err := s.KVSSet(3, update); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
if !watchFired(ws) {
|
||||
t.Fatalf("bad")
|
||||
}
|
||||
|
||||
// Fetch the kv pair and check.
|
||||
idx, result, err = s.KVSGet("foo")
|
||||
ws = memdb.NewWatchSet()
|
||||
idx, result, err = s.KVSGet(ws, "foo")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
@ -276,9 +290,13 @@ func TestStateStore_KVSSet_KVSGet(t *testing.T) {
|
|||
if !ok || err != nil {
|
||||
t.Fatalf("didn't get the lock: %v %s", ok, err)
|
||||
}
|
||||
if !watchFired(ws) {
|
||||
t.Fatalf("bad")
|
||||
}
|
||||
|
||||
// Fetch the kv pair and check.
|
||||
idx, result, err = s.KVSGet("foo")
|
||||
ws = memdb.NewWatchSet()
|
||||
idx, result, err = s.KVSGet(ws, "foo")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
@ -304,9 +322,13 @@ func TestStateStore_KVSSet_KVSGet(t *testing.T) {
|
|||
if err := s.KVSSet(7, update); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
if !watchFired(ws) {
|
||||
t.Fatalf("bad")
|
||||
}
|
||||
|
||||
// Fetch the kv pair and check.
|
||||
idx, result, err = s.KVSGet("foo")
|
||||
ws = memdb.NewWatchSet()
|
||||
idx, result, err = s.KVSGet(ws, "foo")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
@ -323,11 +345,17 @@ func TestStateStore_KVSSet_KVSGet(t *testing.T) {
|
|||
t.Fatalf("bad index: %d", idx)
|
||||
}
|
||||
|
||||
// Setting some unrelated key should not fire the watch.
|
||||
testSetKey(t, s, 8, "other", "yup")
|
||||
if watchFired(ws) {
|
||||
t.Fatalf("bad")
|
||||
}
|
||||
|
||||
// Fetch a key that doesn't exist and make sure we get the right
|
||||
// response.
|
||||
idx, result, err = s.KVSGet("nope")
|
||||
if result != nil || err != nil || idx != 7 {
|
||||
t.Fatalf("expected (7, nil, nil), got : (%#v, %#v, %#v)", idx, result, err)
|
||||
idx, result, err = s.KVSGet(nil, "nope")
|
||||
if result != nil || err != nil || idx != 8 {
|
||||
t.Fatalf("expected (8, nil, nil), got : (%#v, %#v, %#v)", idx, result, err)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -335,7 +363,8 @@ func TestStateStore_KVSList(t *testing.T) {
|
|||
s := testStateStore(t)
|
||||
|
||||
// Listing an empty KVS returns nothing
|
||||
idx, entries, err := s.KVSList("")
|
||||
ws := memdb.NewWatchSet()
|
||||
idx, entries, err := s.KVSList(ws, "")
|
||||
if idx != 0 || entries != nil || err != nil {
|
||||
t.Fatalf("expected (0, nil, nil), got: (%d, %#v, %#v)", idx, entries, err)
|
||||
}
|
||||
|
@ -346,9 +375,12 @@ func TestStateStore_KVSList(t *testing.T) {
|
|||
testSetKey(t, s, 3, "foo/bar/zip", "zip")
|
||||
testSetKey(t, s, 4, "foo/bar/zip/zorp", "zorp")
|
||||
testSetKey(t, s, 5, "foo/bar/baz", "baz")
|
||||
if !watchFired(ws) {
|
||||
t.Fatalf("bad")
|
||||
}
|
||||
|
||||
// List out all of the keys
|
||||
idx, entries, err = s.KVSList("")
|
||||
idx, entries, err = s.KVSList(nil, "")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
@ -362,7 +394,7 @@ func TestStateStore_KVSList(t *testing.T) {
|
|||
}
|
||||
|
||||
// Try listing with a provided prefix
|
||||
idx, entries, err = s.KVSList("foo/bar/zip")
|
||||
idx, entries, err = s.KVSList(nil, "foo/bar/zip")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
@ -379,10 +411,19 @@ func TestStateStore_KVSList(t *testing.T) {
|
|||
}
|
||||
|
||||
// Delete a key and make sure the index comes from the tombstone.
|
||||
ws = memdb.NewWatchSet()
|
||||
idx, _, err = s.KVSList(ws, "foo/bar/baz")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
if err := s.KVSDelete(6, "foo/bar/baz"); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
idx, _, err = s.KVSList("foo/bar/baz")
|
||||
if !watchFired(ws) {
|
||||
t.Fatalf("bad")
|
||||
}
|
||||
ws = memdb.NewWatchSet()
|
||||
idx, _, err = s.KVSList(ws, "foo/bar/baz")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
@ -390,11 +431,15 @@ func TestStateStore_KVSList(t *testing.T) {
|
|||
t.Fatalf("bad index: %d", idx)
|
||||
}
|
||||
|
||||
// Set a different key to bump the index.
|
||||
// Set a different key to bump the index. This shouldn't fire the
|
||||
// watch since there's a different prefix.
|
||||
testSetKey(t, s, 7, "some/other/key", "")
|
||||
if watchFired(ws) {
|
||||
t.Fatalf("bad")
|
||||
}
|
||||
|
||||
// Make sure we get the right index from the tombstone.
|
||||
idx, _, err = s.KVSList("foo/bar/baz")
|
||||
idx, _, err = s.KVSList(nil, "foo/bar/baz")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
@ -407,7 +452,7 @@ func TestStateStore_KVSList(t *testing.T) {
|
|||
if err := s.ReapTombstones(6); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
idx, _, err = s.KVSList("foo/bar/baz")
|
||||
idx, _, err = s.KVSList(nil, "foo/bar/baz")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
@ -416,7 +461,7 @@ func TestStateStore_KVSList(t *testing.T) {
|
|||
}
|
||||
|
||||
// List all the keys to make sure the index is also correct.
|
||||
idx, _, err = s.KVSList("")
|
||||
idx, _, err = s.KVSList(nil, "")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
@ -429,7 +474,8 @@ func TestStateStore_KVSListKeys(t *testing.T) {
|
|||
s := testStateStore(t)
|
||||
|
||||
// Listing keys with no results returns nil.
|
||||
idx, keys, err := s.KVSListKeys("", "")
|
||||
ws := memdb.NewWatchSet()
|
||||
idx, keys, err := s.KVSListKeys(ws, "", "")
|
||||
if idx != 0 || keys != nil || err != nil {
|
||||
t.Fatalf("expected (0, nil, nil), got: (%d, %#v, %#v)", idx, keys, err)
|
||||
}
|
||||
|
@ -442,9 +488,12 @@ func TestStateStore_KVSListKeys(t *testing.T) {
|
|||
testSetKey(t, s, 5, "foo/bar/zip/zam", "zam")
|
||||
testSetKey(t, s, 6, "foo/bar/zip/zorp", "zorp")
|
||||
testSetKey(t, s, 7, "some/other/prefix", "nack")
|
||||
if !watchFired(ws) {
|
||||
t.Fatalf("bad")
|
||||
}
|
||||
|
||||
// List all the keys.
|
||||
idx, keys, err = s.KVSListKeys("", "")
|
||||
idx, keys, err = s.KVSListKeys(nil, "", "")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
@ -456,7 +505,7 @@ func TestStateStore_KVSListKeys(t *testing.T) {
|
|||
}
|
||||
|
||||
// Query using a prefix and pass a separator.
|
||||
idx, keys, err = s.KVSListKeys("foo/bar/", "/")
|
||||
idx, keys, err = s.KVSListKeys(nil, "foo/bar/", "/")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
@ -474,7 +523,7 @@ func TestStateStore_KVSListKeys(t *testing.T) {
|
|||
}
|
||||
|
||||
// Listing keys with no separator returns everything.
|
||||
idx, keys, err = s.KVSListKeys("foo", "")
|
||||
idx, keys, err = s.KVSListKeys(nil, "foo", "")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
@ -488,10 +537,19 @@ func TestStateStore_KVSListKeys(t *testing.T) {
|
|||
}
|
||||
|
||||
// Delete a key and make sure the index comes from the tombstone.
|
||||
ws = memdb.NewWatchSet()
|
||||
idx, _, err = s.KVSListKeys(ws, "foo/bar/baz", "")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
if err := s.KVSDelete(8, "foo/bar/baz"); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
idx, _, err = s.KVSListKeys("foo/bar/baz", "")
|
||||
if !watchFired(ws) {
|
||||
t.Fatalf("bad")
|
||||
}
|
||||
ws = memdb.NewWatchSet()
|
||||
idx, _, err = s.KVSListKeys(ws, "foo/bar/baz", "")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
@ -499,11 +557,15 @@ func TestStateStore_KVSListKeys(t *testing.T) {
|
|||
t.Fatalf("bad index: %d", idx)
|
||||
}
|
||||
|
||||
// Set a different key to bump the index.
|
||||
// Set a different key to bump the index. This shouldn't fire the watch
|
||||
// since there's a different prefix.
|
||||
testSetKey(t, s, 9, "some/other/key", "")
|
||||
if watchFired(ws) {
|
||||
t.Fatalf("bad")
|
||||
}
|
||||
|
||||
// Make sure the index still comes from the tombstone.
|
||||
idx, _, err = s.KVSListKeys("foo/bar/baz", "")
|
||||
idx, _, err = s.KVSListKeys(nil, "foo/bar/baz", "")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
@ -516,7 +578,7 @@ func TestStateStore_KVSListKeys(t *testing.T) {
|
|||
if err := s.ReapTombstones(8); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
idx, _, err = s.KVSListKeys("foo/bar/baz", "")
|
||||
idx, _, err = s.KVSListKeys(nil, "foo/bar/baz", "")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
@ -525,7 +587,7 @@ func TestStateStore_KVSListKeys(t *testing.T) {
|
|||
}
|
||||
|
||||
// List all the keys to make sure the index is also correct.
|
||||
idx, _, err = s.KVSListKeys("", "")
|
||||
idx, _, err = s.KVSListKeys(nil, "", "")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
@ -573,7 +635,7 @@ func TestStateStore_KVSDelete(t *testing.T) {
|
|||
|
||||
// Check that the tombstone was created and that prevents the index
|
||||
// from sliding backwards.
|
||||
idx, _, err := s.KVSList("foo")
|
||||
idx, _, err := s.KVSList(nil, "foo")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
@ -586,7 +648,7 @@ func TestStateStore_KVSDelete(t *testing.T) {
|
|||
if err := s.ReapTombstones(3); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
idx, _, err = s.KVSList("foo")
|
||||
idx, _, err = s.KVSList(nil, "foo")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
@ -620,7 +682,7 @@ func TestStateStore_KVSDeleteCAS(t *testing.T) {
|
|||
|
||||
// Check that the index is untouched and the entry
|
||||
// has not been deleted.
|
||||
idx, e, err := s.KVSGet("foo")
|
||||
idx, e, err := s.KVSGet(nil, "foo")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
@ -639,7 +701,7 @@ func TestStateStore_KVSDeleteCAS(t *testing.T) {
|
|||
}
|
||||
|
||||
// Entry was deleted and index was updated
|
||||
idx, e, err = s.KVSGet("bar")
|
||||
idx, e, err = s.KVSGet(nil, "bar")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
@ -655,7 +717,7 @@ func TestStateStore_KVSDeleteCAS(t *testing.T) {
|
|||
|
||||
// Check that the tombstone was created and that prevents the index
|
||||
// from sliding backwards.
|
||||
idx, _, err = s.KVSList("bar")
|
||||
idx, _, err = s.KVSList(nil, "bar")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
@ -668,7 +730,7 @@ func TestStateStore_KVSDeleteCAS(t *testing.T) {
|
|||
if err := s.ReapTombstones(4); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
idx, _, err = s.KVSList("bar")
|
||||
idx, _, err = s.KVSList(nil, "bar")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
@ -733,7 +795,7 @@ func TestStateStore_KVSSetCAS(t *testing.T) {
|
|||
}
|
||||
|
||||
// Entry was inserted
|
||||
idx, entry, err := s.KVSGet("foo")
|
||||
idx, entry, err := s.KVSGet(nil, "foo")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
@ -775,7 +837,7 @@ func TestStateStore_KVSSetCAS(t *testing.T) {
|
|||
}
|
||||
|
||||
// Entry was not updated in the store
|
||||
idx, entry, err = s.KVSGet("foo")
|
||||
idx, entry, err = s.KVSGet(nil, "foo")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
@ -802,7 +864,7 @@ func TestStateStore_KVSSetCAS(t *testing.T) {
|
|||
}
|
||||
|
||||
// Entry was updated
|
||||
idx, entry, err = s.KVSGet("foo")
|
||||
idx, entry, err = s.KVSGet(nil, "foo")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
@ -829,7 +891,7 @@ func TestStateStore_KVSSetCAS(t *testing.T) {
|
|||
}
|
||||
|
||||
// Entry was updated, but the session should have been ignored.
|
||||
idx, entry, err = s.KVSGet("foo")
|
||||
idx, entry, err = s.KVSGet(nil, "foo")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
@ -874,7 +936,7 @@ func TestStateStore_KVSSetCAS(t *testing.T) {
|
|||
}
|
||||
|
||||
// Entry was updated, and the lock status should have stayed the same.
|
||||
idx, entry, err = s.KVSGet("foo")
|
||||
idx, entry, err = s.KVSGet(nil, "foo")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
@ -938,7 +1000,7 @@ func TestStateStore_KVSDeleteTree(t *testing.T) {
|
|||
|
||||
// Check that the tombstones ware created and that prevents the index
|
||||
// from sliding backwards.
|
||||
idx, _, err := s.KVSList("foo")
|
||||
idx, _, err := s.KVSList(nil, "foo")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
@ -951,7 +1013,7 @@ func TestStateStore_KVSDeleteTree(t *testing.T) {
|
|||
if err := s.ReapTombstones(5); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
idx, _, err = s.KVSList("foo")
|
||||
idx, _, err = s.KVSList(nil, "foo")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
@ -1000,7 +1062,7 @@ func TestStateStore_KVSLock(t *testing.T) {
|
|||
}
|
||||
|
||||
// Make sure the indexes got set properly.
|
||||
idx, result, err := s.KVSGet("foo")
|
||||
idx, result, err := s.KVSGet(nil, "foo")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
@ -1021,7 +1083,7 @@ func TestStateStore_KVSLock(t *testing.T) {
|
|||
|
||||
// Make sure the indexes got set properly, note that the lock index
|
||||
// won't go up since we didn't lock it again.
|
||||
idx, result, err = s.KVSGet("foo")
|
||||
idx, result, err = s.KVSGet(nil, "foo")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
@ -1044,7 +1106,7 @@ func TestStateStore_KVSLock(t *testing.T) {
|
|||
}
|
||||
|
||||
// Make sure the indexes got set properly.
|
||||
idx, result, err = s.KVSGet("foo")
|
||||
idx, result, err = s.KVSGet(nil, "foo")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
@ -1064,7 +1126,7 @@ func TestStateStore_KVSLock(t *testing.T) {
|
|||
}
|
||||
|
||||
// Make sure the indexes got set properly.
|
||||
idx, result, err = s.KVSGet("bar")
|
||||
idx, result, err = s.KVSGet(nil, "bar")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
@ -1090,7 +1152,7 @@ func TestStateStore_KVSLock(t *testing.T) {
|
|||
}
|
||||
|
||||
// Make sure the indexes didn't update.
|
||||
idx, result, err = s.KVSGet("bar")
|
||||
idx, result, err = s.KVSGet(nil, "bar")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
@ -1134,7 +1196,7 @@ func TestStateStore_KVSUnlock(t *testing.T) {
|
|||
}
|
||||
|
||||
// Make sure the indexes didn't update.
|
||||
idx, result, err := s.KVSGet("foo")
|
||||
idx, result, err := s.KVSGet(nil, "foo")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
@ -1163,7 +1225,7 @@ func TestStateStore_KVSUnlock(t *testing.T) {
|
|||
}
|
||||
|
||||
// Make sure the indexes didn't update.
|
||||
idx, result, err = s.KVSGet("foo")
|
||||
idx, result, err = s.KVSGet(nil, "foo")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
@ -1182,7 +1244,7 @@ func TestStateStore_KVSUnlock(t *testing.T) {
|
|||
}
|
||||
|
||||
// Make sure the indexes got set properly.
|
||||
idx, result, err = s.KVSGet("foo")
|
||||
idx, result, err = s.KVSGet(nil, "foo")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
@ -1201,7 +1263,7 @@ func TestStateStore_KVSUnlock(t *testing.T) {
|
|||
}
|
||||
|
||||
// Make sure the indexes didn't update.
|
||||
idx, result, err = s.KVSGet("foo")
|
||||
idx, result, err = s.KVSGet(nil, "foo")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
@ -1294,7 +1356,7 @@ func TestStateStore_KVS_Snapshot_Restore(t *testing.T) {
|
|||
restore.Commit()
|
||||
|
||||
// Read the restored keys back out and verify they match.
|
||||
idx, res, err := s.KVSList("")
|
||||
idx, res, err := s.KVSList(nil, "")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
@ -1467,7 +1529,7 @@ func TestStateStore_Tombstone_Snapshot_Restore(t *testing.T) {
|
|||
if err := s.ReapTombstones(4); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
idx, _, err := s.KVSList("foo/bar")
|
||||
idx, _, err := s.KVSList(nil, "foo/bar")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
@ -1504,7 +1566,7 @@ func TestStateStore_Tombstone_Snapshot_Restore(t *testing.T) {
|
|||
restore.Commit()
|
||||
|
||||
// See if the stone works properly in a list query.
|
||||
idx, _, err := s.KVSList("foo/bar")
|
||||
idx, _, err := s.KVSList(nil, "foo/bar")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
@ -1518,7 +1580,7 @@ func TestStateStore_Tombstone_Snapshot_Restore(t *testing.T) {
|
|||
if err := s.ReapTombstones(4); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
idx, _, err = s.KVSList("foo/bar")
|
||||
idx, _, err = s.KVSList(nil, "foo/bar")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
|
|
@ -807,7 +807,7 @@ func TestStateStore_Session_Invalidate_Key_Unlock_Behavior(t *testing.T) {
|
|||
}
|
||||
|
||||
// Key should be unlocked.
|
||||
idx, d2, err := s.KVSGet("/foo")
|
||||
idx, d2, err := s.KVSGet(nil, "/foo")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
@ -889,7 +889,7 @@ func TestStateStore_Session_Invalidate_Key_Delete_Behavior(t *testing.T) {
|
|||
}
|
||||
|
||||
// Key should be deleted.
|
||||
idx, d2, err := s.KVSGet("/bar")
|
||||
idx, d2, err := s.KVSGet(nil, "/bar")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
|
|
@ -152,7 +152,7 @@ func TestStateStore_Restore_Abort(t *testing.T) {
|
|||
}
|
||||
restore.Abort()
|
||||
|
||||
idx, entries, err := s.KVSList("")
|
||||
idx, entries, err := s.KVSList(nil, "")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
|
|
@ -55,14 +55,14 @@ func (s *StateStore) txnKVS(tx *memdb.Txn, idx uint64, op *structs.TxnKVOp) (str
|
|||
}
|
||||
|
||||
case structs.KVSGet:
|
||||
_, entry, err = s.kvsGetTxn(tx, op.DirEnt.Key)
|
||||
_, entry, err = s.kvsGetTxn(tx, nil, op.DirEnt.Key)
|
||||
if entry == nil && err == nil {
|
||||
err = fmt.Errorf("key %q doesn't exist", op.DirEnt.Key)
|
||||
}
|
||||
|
||||
case structs.KVSGetTree:
|
||||
var entries structs.DirEntries
|
||||
_, entries, err = s.kvsListTxn(tx, op.DirEnt.Key)
|
||||
_, entries, err = s.kvsListTxn(tx, nil, op.DirEnt.Key)
|
||||
if err == nil {
|
||||
results := make(structs.TxnResults, 0, len(entries))
|
||||
for _, e := range entries {
|
||||
|
|
|
@ -295,7 +295,7 @@ func TestStateStore_Txn_KVS(t *testing.T) {
|
|||
}
|
||||
|
||||
// Pull the resulting state store contents.
|
||||
idx, actual, err := s.KVSList("")
|
||||
idx, actual, err := s.KVSList(nil, "")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
@ -364,7 +364,7 @@ func TestStateStore_Txn_KVS_Rollback(t *testing.T) {
|
|||
|
||||
// This function verifies that the state store wasn't changed.
|
||||
verifyStateStore := func(desc string) {
|
||||
idx, actual, err := s.KVSList("")
|
||||
idx, actual, err := s.KVSList(nil, "")
|
||||
if err != nil {
|
||||
t.Fatalf("err (%s): %s", desc, err)
|
||||
}
|
||||
|
|
|
@ -55,7 +55,7 @@ func TestTxn_Apply(t *testing.T) {
|
|||
|
||||
// Verify the state store directly.
|
||||
state := s1.fsm.State()
|
||||
_, d, err := state.KVSGet("test")
|
||||
_, d, err := state.KVSGet(nil, "test")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue