From 73ad5f06950df1d5d6266b7d1746bc25ac97e631 Mon Sep 17 00:00:00 2001 From: James Phillips Date: Wed, 14 Oct 2015 02:11:29 -0700 Subject: [PATCH] Adds watch unit tests and does some related watch cleanup. --- consul/state/state_store_test.go | 42 ----- consul/state/watch.go | 29 +++- consul/state/watch_test.go | 279 +++++++++++++++++++++++++++++++ 3 files changed, 304 insertions(+), 46 deletions(-) create mode 100644 consul/state/watch_test.go diff --git a/consul/state/state_store_test.go b/consul/state/state_store_test.go index 9b1f937a5..c527e59b8 100644 --- a/consul/state/state_store_test.go +++ b/consul/state/state_store_test.go @@ -106,48 +106,6 @@ func testSetKey(t *testing.T, s *StateStore, idx uint64, key, value string) { } } -// verifyWatch will set up a watch channel, call the given function, and then -// make sure the watch fires. -func verifyWatch(t *testing.T, watch Watch, fn func()) { - ch := make(chan struct{}) - watch.Wait(ch) - - done := make(chan struct{}) - go func() { - fn() - close(done) - }() - - select { - case <-ch: - case <-done: - t.Fatalf("watch was not notified") - case <-time.After(1 * time.Second): - t.Fatalf("timeout") - } -} - -// verifyNoWatch will set up a watch channel, call the given function, and then -// make sure the watch never fires. -func verifyNoWatch(t *testing.T, watch Watch, fn func()) { - ch := make(chan struct{}) - watch.Wait(ch) - - done := make(chan struct{}) - go func() { - fn() - close(done) - }() - - select { - case <-ch: - t.Fatalf("watch should not have been notified") - case <-done: - case <-time.After(1 * time.Second): - t.Fatalf("timeout") - } -} - func TestStateStore_maxIndex(t *testing.T) { s := testStateStore(t) diff --git a/consul/state/watch.go b/consul/state/watch.go index cb4564356..f8aa273a8 100644 --- a/consul/state/watch.go +++ b/consul/state/watch.go @@ -1,6 +1,7 @@ package state import ( + "fmt" "sync" "github.com/armon/go-radix" @@ -42,10 +43,15 @@ func (w *FullTableWatch) Notify() { } // DumbWatchManager is a wrapper that allows nested code to arm full table -// watches multiple times but fire them only once. +// watches multiple times but fire them only once. This doesn't have any +// way to clear the state, and it's not thread-safe, so it should be used once +// and thrown away inside the context of a single thread. type DumbWatchManager struct { + // tableWatches holds the full table watches. tableWatches map[string]*FullTableWatch - armed map[string]bool + + // armed tracks whether the table should be notified. + armed map[string]bool } // NewDumbWatchManager returns a new dumb watch manager. @@ -58,6 +64,10 @@ func NewDumbWatchManager(tableWatches map[string]*FullTableWatch) *DumbWatchMana // Arm arms the given table's watch. func (d *DumbWatchManager) Arm(table string) { + if _, ok := d.tableWatches[table]; !ok { + panic(fmt.Sprintf("unknown table: %s", table)) + } + if _, ok := d.armed[table]; !ok { d.armed[table] = true } @@ -82,7 +92,9 @@ type PrefixWatch struct { // NewPrefixWatch returns a new prefix watch. func NewPrefixWatch() *PrefixWatch { - return &PrefixWatch{watches: radix.New()} + return &PrefixWatch{ + watches: radix.New(), + } } // GetSubwatch returns the notify group for the given prefix. @@ -129,16 +141,25 @@ func (w *PrefixWatch) Notify(prefix string, subtree bool) { for i := len(cleanup) - 1; i >= 0; i-- { w.watches.Delete(cleanup[i]) } + + // TODO (slackpad) If a watch never fires then we will never clear it + // out of the tree. The old state store had the same behavior, so this + // has been around for a while. We should probably add a prefix scan + // with a function that clears out any notify groups that are empty. } // MultiWatch wraps several watches and allows any of them to trigger the // caller. type MultiWatch struct { + // watches holds the list of subordinate watches to forward events to. watches []Watch } +// NewMultiWatch returns a new new multi watch over the given set of watches. func NewMultiWatch(watches ...Watch) *MultiWatch { - return &MultiWatch{watches: watches} + return &MultiWatch{ + watches: watches, + } } // See Watch. diff --git a/consul/state/watch_test.go b/consul/state/watch_test.go new file mode 100644 index 000000000..1d31abe81 --- /dev/null +++ b/consul/state/watch_test.go @@ -0,0 +1,279 @@ +package state + +import ( + "testing" +) + +// verifyWatch will set up a watch channel, call the given function, and then +// make sure the watch fires. +func verifyWatch(t *testing.T, watch Watch, fn func()) { + ch := make(chan struct{}, 1) + watch.Wait(ch) + + fn() + + select { + case <-ch: + default: + t.Fatalf("watch should have been notified") + } +} + +// verifyNoWatch will set up a watch channel, call the given function, and then +// make sure the watch never fires. +func verifyNoWatch(t *testing.T, watch Watch, fn func()) { + ch := make(chan struct{}, 1) + watch.Wait(ch) + + fn() + + select { + case <-ch: + t.Fatalf("watch should not been notified") + default: + } +} + +func TestWatch_FullTableWatch(t *testing.T) { + w := NewFullTableWatch() + + // Test the basic trigger with a single watcher. + verifyWatch(t, w, func() { + w.Notify() + }) + + // Run multiple watchers and make sure they both fire. + verifyWatch(t, w, func() { + verifyWatch(t, w, func() { + w.Notify() + }) + }) + + // Make sure clear works. + ch := make(chan struct{}, 1) + w.Wait(ch) + w.Clear(ch) + w.Notify() + select { + case <-ch: + t.Fatalf("watch should not have been notified") + default: + } + + // Make sure notify is a one shot. + w.Wait(ch) + w.Notify() + select { + case <-ch: + default: + t.Fatalf("watch should have been notified") + } + w.Notify() + select { + case <-ch: + t.Fatalf("watch should not have been notified") + default: + } +} + +func TestWatch_DumbWatchManager(t *testing.T) { + watches := map[string]*FullTableWatch{ + "alice": NewFullTableWatch(), + "bob": NewFullTableWatch(), + "carol": NewFullTableWatch(), + } + + // Notify with nothing armed and make sure nothing triggers. + func() { + w := NewDumbWatchManager(watches) + verifyNoWatch(t, watches["alice"], func() { + verifyNoWatch(t, watches["bob"], func() { + verifyNoWatch(t, watches["carol"], func() { + w.Notify() + }) + }) + }) + }() + + // Trigger one watch. + func() { + w := NewDumbWatchManager(watches) + verifyWatch(t, watches["alice"], func() { + verifyNoWatch(t, watches["bob"], func() { + verifyNoWatch(t, watches["carol"], func() { + w.Arm("alice") + w.Notify() + }) + }) + }) + }() + + // Trigger two watches. + func() { + w := NewDumbWatchManager(watches) + verifyWatch(t, watches["alice"], func() { + verifyNoWatch(t, watches["bob"], func() { + verifyWatch(t, watches["carol"], func() { + w.Arm("alice") + w.Arm("carol") + w.Notify() + }) + }) + }) + }() + + // Trigger all three watches. + func() { + w := NewDumbWatchManager(watches) + verifyWatch(t, watches["alice"], func() { + verifyWatch(t, watches["bob"], func() { + verifyWatch(t, watches["carol"], func() { + w.Arm("alice") + w.Arm("bob") + w.Arm("carol") + w.Notify() + }) + }) + }) + }() + + // Trigger multiple times. + func() { + w := NewDumbWatchManager(watches) + verifyWatch(t, watches["alice"], func() { + verifyNoWatch(t, watches["bob"], func() { + verifyNoWatch(t, watches["carol"], func() { + w.Arm("alice") + w.Arm("alice") + w.Notify() + }) + }) + }) + }() + + // Make sure it panics when asked to arm an unknown table. + func() { + defer func() { + if r := recover(); r == nil { + t.Fatalf("didn't get expected panic") + } + }() + w := NewDumbWatchManager(watches) + w.Arm("nope") + }() +} + +func TestWatch_PrefixWatch(t *testing.T) { + w := NewPrefixWatch() + + // Hit a specific key. + verifyWatch(t, w.GetSubwatch(""), func() { + verifyWatch(t, w.GetSubwatch("foo/bar/baz"), func() { + verifyNoWatch(t, w.GetSubwatch("foo/bar/zoo"), func() { + verifyNoWatch(t, w.GetSubwatch("nope"), func() { + w.Notify("foo/bar/baz", false) + }) + }) + }) + }) + + // Make sure cleanup is happening. All that should be left is the + // full-table watch and the un-fired watches. + fn := func(k string, v interface{}) bool { + if k != "" && k != "foo/bar/zoo" && k != "nope" { + t.Fatalf("unexpected watch: %s", k) + } + return false + } + w.watches.WalkPrefix("", fn) + + // Delete a subtree. + verifyWatch(t, w.GetSubwatch(""), func() { + verifyWatch(t, w.GetSubwatch("foo/bar/baz"), func() { + verifyWatch(t, w.GetSubwatch("foo/bar/zoo"), func() { + verifyNoWatch(t, w.GetSubwatch("nope"), func() { + w.Notify("foo/", true) + }) + }) + }) + }) + + // Hit an unknown key. + verifyWatch(t, w.GetSubwatch(""), func() { + verifyNoWatch(t, w.GetSubwatch("foo/bar/baz"), func() { + verifyNoWatch(t, w.GetSubwatch("foo/bar/zoo"), func() { + verifyNoWatch(t, w.GetSubwatch("nope"), func() { + w.Notify("not/in/there", false) + }) + }) + }) + }) +} + +type MockWatch struct { + Waits map[chan struct{}] int + Clears map[chan struct{}] int +} + +func NewMockWatch() *MockWatch { + return &MockWatch{ + Waits: make(map[chan struct{}] int), + Clears: make(map[chan struct{}] int), + } +} + +func (m *MockWatch) Wait(notifyCh chan struct{}) { + if _, ok := m.Waits[notifyCh]; ok { + m.Waits[notifyCh]++ + } else { + m.Waits[notifyCh] = 1 + } +} + +func (m *MockWatch) Clear(notifyCh chan struct{}) { + if _, ok := m.Clears[notifyCh]; ok { + m.Clears[notifyCh]++ + } else { + m.Clears[notifyCh] = 1 + } +} + +func TestWatch_MultiWatch(t *testing.T) { + w1, w2 := NewMockWatch(), NewMockWatch() + w := NewMultiWatch(w1, w2) + + // Do some activity. + c1, c2 := make(chan struct{}), make(chan struct{}) + w.Wait(c1) + w.Clear(c1) + w.Wait(c1) + w.Wait(c2) + w.Clear(c1) + w.Clear(c2) + + // Make sure all the events were forwarded. + if cnt, ok := w1.Waits[c1]; !ok || cnt != 2 { + t.Fatalf("bad: %d", w1.Waits[c1]) + } + if cnt, ok := w1.Clears[c1]; !ok || cnt != 2 { + t.Fatalf("bad: %d", w1.Clears[c1]) + } + if cnt, ok := w1.Waits[c2]; !ok || cnt != 1 { + t.Fatalf("bad: %d", w1.Waits[c2]) + } + if cnt, ok := w1.Clears[c2]; !ok || cnt != 1 { + t.Fatalf("bad: %d", w1.Clears[c2]) + } + if cnt, ok := w2.Waits[c1]; !ok || cnt != 2 { + t.Fatalf("bad: %d", w2.Waits[c1]) + } + if cnt, ok := w2.Clears[c1]; !ok || cnt != 2 { + t.Fatalf("bad: %d", w2.Clears[c1]) + } + if cnt, ok := w2.Waits[c2]; !ok || cnt != 1 { + t.Fatalf("bad: %d", w2.Waits[c2]) + } + if cnt, ok := w2.Clears[c2]; !ok || cnt != 1 { + t.Fatalf("bad: %d", w2.Clears[c2]) + } +}