diff --git a/consul/state/state_store_test.go b/consul/state/state_store_test.go index 1af011211..84d93370c 100644 --- a/consul/state/state_store_test.go +++ b/consul/state/state_store_test.go @@ -122,6 +122,8 @@ func verifyWatch(t *testing.T, watch Watch, fn func()) { case <-ch: case <-done: t.Fatalf("watch was not notified") + case <-time.After(1 * time.Second): + t.Fatalf("timeout") } } @@ -141,6 +143,8 @@ func verifyNoWatch(t *testing.T, watch Watch, fn func()) { case <-ch: t.Fatalf("watch should not have been notified") case <-done: + case <-time.After(1 * time.Second): + t.Fatalf("timeout") } } @@ -2339,6 +2343,226 @@ func TestStateStore_KVSUnlock(t *testing.T) { } } +func TestStateStore_KVS_Snapshot_Restore(t *testing.T) { + s := testStateStore(t) + + // Build up some entries to seed. + entries := structs.DirEntries{ + &structs.DirEntry{ + Key: "aaa", + Flags: 23, + Value: []byte("hello"), + }, + &structs.DirEntry{ + Key: "bar/a", + Value: []byte("one"), + }, + &structs.DirEntry{ + Key: "bar/b", + Value: []byte("two"), + }, + &structs.DirEntry{ + Key: "bar/c", + Value: []byte("three"), + }, + } + for i, entry := range entries { + if err := s.KVSSet(uint64(i + 1), entry); err != nil { + t.Fatalf("err: %s", err) + } + } + + // Make a node and session so we can test a locked key. + testRegisterNode(t, s, 5, "node1") + if err := s.SessionCreate(6, &structs.Session{ID: "session1", Node: "node1"}); err != nil { + t.Fatalf("err: %s", err) + } + entries[3].Session = "session1" + if ok, err := s.KVSLock(7, entries[3]); !ok || err != nil { + t.Fatalf("didn't get the lock: %v %s", ok, err) + } + + // This is required for the compare later. + entries[3].LockIndex = 1 + + // Snapshot the keys. + snap := s.Snapshot() + defer snap.Close() + + // Verify the snapshot. + if idx := snap.LastIndex(); idx != 7 { + t.Fatalf("bad index: %d", idx) + } + dump, err := snap.KVSDump() + if err != nil { + t.Fatalf("err: %s", err) + } + if !reflect.DeepEqual(dump, entries) { + t.Fatalf("bad: %#v", dump) + } + + // Restore the values into a new state store. + func() { + s := testStateStore(t) + for _, entry := range dump { + if err := s.KVSRestore(entry); err != nil { + t.Fatalf("err: %s", err) + } + } + + // Read the restored keys back out and verify they match. + idx, keys, err := s.KVSList("") + if err != nil { + t.Fatalf("err: %s", err) + } + if idx != 7 { + t.Fatalf("bad index: %d", idx) + } + for i, key := range keys { + entry, err := s.KVSGet(key) + if err != nil { + t.Fatalf("err: %s", err) + } + + if !reflect.DeepEqual(entry, entries[i]) { + t.Fatalf("bad: %#v", entry) + } + } + }() +} + +func TestStateStore_KVS_Watches(t *testing.T) { + s := testStateStore(t) + + // This is used when locking down below. + testRegisterNode(t, s, 1, "node1") + if err := s.SessionCreate(2, &structs.Session{ID: "session1", Node: "node1"}); err != nil { + t.Fatalf("err: %s", err) + } + + // An empty prefix watch should hit on all KVS ops, and some other + // prefix should not be affected ever. We also add a positive prefix + // match. + verifyWatch(t, s.GetKVSWatch(""), func() { + verifyWatch(t, s.GetKVSWatch("a"), func() { + verifyNoWatch(t, s.GetKVSWatch("/nope"), func() { + if err := s.KVSSet(1, &structs.DirEntry{Key: "aaa"}); err != nil { + t.Fatalf("err: %s", err) + } + }) + }) + }) + verifyWatch(t, s.GetKVSWatch(""), func() { + verifyWatch(t, s.GetKVSWatch("a"), func() { + verifyNoWatch(t, s.GetKVSWatch("/nope"), func() { + if err := s.KVSSet(2, &structs.DirEntry{Key: "aaa"}); err != nil { + t.Fatalf("err: %s", err) + } + }) + }) + }) + verifyWatch(t, s.GetKVSWatch(""), func() { + verifyWatch(t, s.GetKVSWatch("b"), func() { + verifyNoWatch(t, s.GetKVSWatch("/nope"), func() { + if err := s.KVSRestore(&structs.DirEntry{Key: "bbb"}); err != nil { + t.Fatalf("err: %s", err) + } + }) + }) + }) + verifyWatch(t, s.GetKVSWatch(""), func() { + verifyWatch(t, s.GetKVSWatch("a"), func() { + verifyNoWatch(t, s.GetKVSWatch("/nope"), func() { + if err := s.KVSDelete(3, "aaa"); err != nil { + t.Fatalf("err: %s", err) + } + }) + }) + }) + verifyWatch(t, s.GetKVSWatch(""), func() { + verifyWatch(t, s.GetKVSWatch("a"), func() { + verifyNoWatch(t, s.GetKVSWatch("/nope"), func() { + if ok, err := s.KVSSetCAS(4, &structs.DirEntry{Key: "aaa"}); !ok || err != nil { + t.Fatalf("ok: %v err: %s", ok, err) + } + }) + }) + }) + verifyWatch(t, s.GetKVSWatch(""), func() { + verifyWatch(t, s.GetKVSWatch("a"), func() { + verifyNoWatch(t, s.GetKVSWatch("/nope"), func() { + if ok, err := s.KVSLock(5, &structs.DirEntry{Key: "aaa", Session: "session1"}); !ok || err != nil { + t.Fatalf("ok: %v err: %s", ok, err) + } + }) + }) + }) + verifyWatch(t, s.GetKVSWatch(""), func() { + verifyWatch(t, s.GetKVSWatch("a"), func() { + verifyNoWatch(t, s.GetKVSWatch("/nope"), func() { + if ok, err := s.KVSUnlock(6, &structs.DirEntry{Key: "aaa", Session: "session1"}); !ok || err != nil { + t.Fatalf("ok: %v err: %s", ok, err) + } + }) + }) + }) + verifyWatch(t, s.GetKVSWatch(""), func() { + verifyWatch(t, s.GetKVSWatch("a"), func() { + verifyNoWatch(t, s.GetKVSWatch("/nope"), func() { + if err := s.KVSDeleteTree(7, "aaa"); err != nil { + t.Fatalf("err: %s", err) + } + }) + }) + }) + + // A delete tree operation at the top level will notify all the watches. + verifyWatch(t, s.GetKVSWatch(""), func() { + verifyWatch(t, s.GetKVSWatch("a"), func() { + verifyWatch(t, s.GetKVSWatch("/nope"), func() { + if err := s.KVSDeleteTree(8, ""); err != nil { + t.Fatalf("err: %s", err) + } + }) + }) + }) + + // Create a more interesting tree. + testSetKey(t, s, 9, "/foo/bar", "bar") + testSetKey(t, s, 10, "/foo/bar/baz", "baz") + testSetKey(t, s, 11, "/foo/bar/zip", "zip") + testSetKey(t, s, 12, "/foo/zorp", "zorp") + + // Deleting just the foo/bar key should not trigger watches on the + // children. + verifyWatch(t, s.GetKVSWatch("/foo/bar"), func() { + verifyNoWatch(t, s.GetKVSWatch("/foo/bar/baz"), func() { + verifyNoWatch(t, s.GetKVSWatch("/foo/bar/zip"), func() { + if err := s.KVSDelete(13, "/foo/bar"); err != nil { + t.Fatalf("err: %s", err) + } + }) + }) + }) + + // But a delete tree from that point should notify the whole subtree, + // even for keys that don't exist. + verifyWatch(t, s.GetKVSWatch("/foo/bar"), func() { + verifyWatch(t, s.GetKVSWatch("/foo/bar/baz"), func() { + verifyWatch(t, s.GetKVSWatch("/foo/bar/zip"), func() { + verifyWatch(t, s.GetKVSWatch("/foo/bar/uh/nope"), func() { + if err := s.KVSDeleteTree(14, "/foo/bar"); err != nil { + t.Fatalf("err: %s", err) + } + }) + }) + }) + }) +} + +// TODO (slackpad) - Tombstone dump/restore. +// TODO (slackpad) - Tombstones in KVS delete. + func TestStateStore_SessionCreate_GetSession(t *testing.T) { s := testStateStore(t)