Adds snapshot/restore and watch tests for KVS.
This commit is contained in:
parent
9bfe2c32f6
commit
4be951571e
|
@ -122,6 +122,8 @@ func verifyWatch(t *testing.T, watch Watch, fn func()) {
|
|||
case <-ch:
|
||||
case <-done:
|
||||
t.Fatalf("watch was not notified")
|
||||
case <-time.After(1 * time.Second):
|
||||
t.Fatalf("timeout")
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -141,6 +143,8 @@ func verifyNoWatch(t *testing.T, watch Watch, fn func()) {
|
|||
case <-ch:
|
||||
t.Fatalf("watch should not have been notified")
|
||||
case <-done:
|
||||
case <-time.After(1 * time.Second):
|
||||
t.Fatalf("timeout")
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2339,6 +2343,226 @@ func TestStateStore_KVSUnlock(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestStateStore_KVS_Snapshot_Restore(t *testing.T) {
|
||||
s := testStateStore(t)
|
||||
|
||||
// Build up some entries to seed.
|
||||
entries := structs.DirEntries{
|
||||
&structs.DirEntry{
|
||||
Key: "aaa",
|
||||
Flags: 23,
|
||||
Value: []byte("hello"),
|
||||
},
|
||||
&structs.DirEntry{
|
||||
Key: "bar/a",
|
||||
Value: []byte("one"),
|
||||
},
|
||||
&structs.DirEntry{
|
||||
Key: "bar/b",
|
||||
Value: []byte("two"),
|
||||
},
|
||||
&structs.DirEntry{
|
||||
Key: "bar/c",
|
||||
Value: []byte("three"),
|
||||
},
|
||||
}
|
||||
for i, entry := range entries {
|
||||
if err := s.KVSSet(uint64(i + 1), entry); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Make a node and session so we can test a locked key.
|
||||
testRegisterNode(t, s, 5, "node1")
|
||||
if err := s.SessionCreate(6, &structs.Session{ID: "session1", Node: "node1"}); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
entries[3].Session = "session1"
|
||||
if ok, err := s.KVSLock(7, entries[3]); !ok || err != nil {
|
||||
t.Fatalf("didn't get the lock: %v %s", ok, err)
|
||||
}
|
||||
|
||||
// This is required for the compare later.
|
||||
entries[3].LockIndex = 1
|
||||
|
||||
// Snapshot the keys.
|
||||
snap := s.Snapshot()
|
||||
defer snap.Close()
|
||||
|
||||
// Verify the snapshot.
|
||||
if idx := snap.LastIndex(); idx != 7 {
|
||||
t.Fatalf("bad index: %d", idx)
|
||||
}
|
||||
dump, err := snap.KVSDump()
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
if !reflect.DeepEqual(dump, entries) {
|
||||
t.Fatalf("bad: %#v", dump)
|
||||
}
|
||||
|
||||
// Restore the values into a new state store.
|
||||
func() {
|
||||
s := testStateStore(t)
|
||||
for _, entry := range dump {
|
||||
if err := s.KVSRestore(entry); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Read the restored keys back out and verify they match.
|
||||
idx, keys, err := s.KVSList("")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
if idx != 7 {
|
||||
t.Fatalf("bad index: %d", idx)
|
||||
}
|
||||
for i, key := range keys {
|
||||
entry, err := s.KVSGet(key)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
||||
if !reflect.DeepEqual(entry, entries[i]) {
|
||||
t.Fatalf("bad: %#v", entry)
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func TestStateStore_KVS_Watches(t *testing.T) {
|
||||
s := testStateStore(t)
|
||||
|
||||
// This is used when locking down below.
|
||||
testRegisterNode(t, s, 1, "node1")
|
||||
if err := s.SessionCreate(2, &structs.Session{ID: "session1", Node: "node1"}); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
||||
// An empty prefix watch should hit on all KVS ops, and some other
|
||||
// prefix should not be affected ever. We also add a positive prefix
|
||||
// match.
|
||||
verifyWatch(t, s.GetKVSWatch(""), func() {
|
||||
verifyWatch(t, s.GetKVSWatch("a"), func() {
|
||||
verifyNoWatch(t, s.GetKVSWatch("/nope"), func() {
|
||||
if err := s.KVSSet(1, &structs.DirEntry{Key: "aaa"}); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
})
|
||||
})
|
||||
})
|
||||
verifyWatch(t, s.GetKVSWatch(""), func() {
|
||||
verifyWatch(t, s.GetKVSWatch("a"), func() {
|
||||
verifyNoWatch(t, s.GetKVSWatch("/nope"), func() {
|
||||
if err := s.KVSSet(2, &structs.DirEntry{Key: "aaa"}); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
})
|
||||
})
|
||||
})
|
||||
verifyWatch(t, s.GetKVSWatch(""), func() {
|
||||
verifyWatch(t, s.GetKVSWatch("b"), func() {
|
||||
verifyNoWatch(t, s.GetKVSWatch("/nope"), func() {
|
||||
if err := s.KVSRestore(&structs.DirEntry{Key: "bbb"}); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
})
|
||||
})
|
||||
})
|
||||
verifyWatch(t, s.GetKVSWatch(""), func() {
|
||||
verifyWatch(t, s.GetKVSWatch("a"), func() {
|
||||
verifyNoWatch(t, s.GetKVSWatch("/nope"), func() {
|
||||
if err := s.KVSDelete(3, "aaa"); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
})
|
||||
})
|
||||
})
|
||||
verifyWatch(t, s.GetKVSWatch(""), func() {
|
||||
verifyWatch(t, s.GetKVSWatch("a"), func() {
|
||||
verifyNoWatch(t, s.GetKVSWatch("/nope"), func() {
|
||||
if ok, err := s.KVSSetCAS(4, &structs.DirEntry{Key: "aaa"}); !ok || err != nil {
|
||||
t.Fatalf("ok: %v err: %s", ok, err)
|
||||
}
|
||||
})
|
||||
})
|
||||
})
|
||||
verifyWatch(t, s.GetKVSWatch(""), func() {
|
||||
verifyWatch(t, s.GetKVSWatch("a"), func() {
|
||||
verifyNoWatch(t, s.GetKVSWatch("/nope"), func() {
|
||||
if ok, err := s.KVSLock(5, &structs.DirEntry{Key: "aaa", Session: "session1"}); !ok || err != nil {
|
||||
t.Fatalf("ok: %v err: %s", ok, err)
|
||||
}
|
||||
})
|
||||
})
|
||||
})
|
||||
verifyWatch(t, s.GetKVSWatch(""), func() {
|
||||
verifyWatch(t, s.GetKVSWatch("a"), func() {
|
||||
verifyNoWatch(t, s.GetKVSWatch("/nope"), func() {
|
||||
if ok, err := s.KVSUnlock(6, &structs.DirEntry{Key: "aaa", Session: "session1"}); !ok || err != nil {
|
||||
t.Fatalf("ok: %v err: %s", ok, err)
|
||||
}
|
||||
})
|
||||
})
|
||||
})
|
||||
verifyWatch(t, s.GetKVSWatch(""), func() {
|
||||
verifyWatch(t, s.GetKVSWatch("a"), func() {
|
||||
verifyNoWatch(t, s.GetKVSWatch("/nope"), func() {
|
||||
if err := s.KVSDeleteTree(7, "aaa"); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
// A delete tree operation at the top level will notify all the watches.
|
||||
verifyWatch(t, s.GetKVSWatch(""), func() {
|
||||
verifyWatch(t, s.GetKVSWatch("a"), func() {
|
||||
verifyWatch(t, s.GetKVSWatch("/nope"), func() {
|
||||
if err := s.KVSDeleteTree(8, ""); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
// Create a more interesting tree.
|
||||
testSetKey(t, s, 9, "/foo/bar", "bar")
|
||||
testSetKey(t, s, 10, "/foo/bar/baz", "baz")
|
||||
testSetKey(t, s, 11, "/foo/bar/zip", "zip")
|
||||
testSetKey(t, s, 12, "/foo/zorp", "zorp")
|
||||
|
||||
// Deleting just the foo/bar key should not trigger watches on the
|
||||
// children.
|
||||
verifyWatch(t, s.GetKVSWatch("/foo/bar"), func() {
|
||||
verifyNoWatch(t, s.GetKVSWatch("/foo/bar/baz"), func() {
|
||||
verifyNoWatch(t, s.GetKVSWatch("/foo/bar/zip"), func() {
|
||||
if err := s.KVSDelete(13, "/foo/bar"); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
// But a delete tree from that point should notify the whole subtree,
|
||||
// even for keys that don't exist.
|
||||
verifyWatch(t, s.GetKVSWatch("/foo/bar"), func() {
|
||||
verifyWatch(t, s.GetKVSWatch("/foo/bar/baz"), func() {
|
||||
verifyWatch(t, s.GetKVSWatch("/foo/bar/zip"), func() {
|
||||
verifyWatch(t, s.GetKVSWatch("/foo/bar/uh/nope"), func() {
|
||||
if err := s.KVSDeleteTree(14, "/foo/bar"); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
})
|
||||
})
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
// TODO (slackpad) - Tombstone dump/restore.
|
||||
// TODO (slackpad) - Tombstones in KVS delete.
|
||||
|
||||
func TestStateStore_SessionCreate_GetSession(t *testing.T) {
|
||||
s := testStateStore(t)
|
||||
|
||||
|
|
Loading…
Reference in New Issue