Adds watch unit tests and does some related watch cleanup.
This commit is contained in:
parent
9fce4aaf35
commit
73ad5f0695
|
@ -106,48 +106,6 @@ func testSetKey(t *testing.T, s *StateStore, idx uint64, key, value string) {
|
|||
}
|
||||
}
|
||||
|
||||
// verifyWatch will set up a watch channel, call the given function, and then
|
||||
// make sure the watch fires.
|
||||
func verifyWatch(t *testing.T, watch Watch, fn func()) {
|
||||
ch := make(chan struct{})
|
||||
watch.Wait(ch)
|
||||
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
fn()
|
||||
close(done)
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-ch:
|
||||
case <-done:
|
||||
t.Fatalf("watch was not notified")
|
||||
case <-time.After(1 * time.Second):
|
||||
t.Fatalf("timeout")
|
||||
}
|
||||
}
|
||||
|
||||
// verifyNoWatch will set up a watch channel, call the given function, and then
|
||||
// make sure the watch never fires.
|
||||
func verifyNoWatch(t *testing.T, watch Watch, fn func()) {
|
||||
ch := make(chan struct{})
|
||||
watch.Wait(ch)
|
||||
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
fn()
|
||||
close(done)
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-ch:
|
||||
t.Fatalf("watch should not have been notified")
|
||||
case <-done:
|
||||
case <-time.After(1 * time.Second):
|
||||
t.Fatalf("timeout")
|
||||
}
|
||||
}
|
||||
|
||||
func TestStateStore_maxIndex(t *testing.T) {
|
||||
s := testStateStore(t)
|
||||
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package state
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"github.com/armon/go-radix"
|
||||
|
@ -42,10 +43,15 @@ func (w *FullTableWatch) Notify() {
|
|||
}
|
||||
|
||||
// DumbWatchManager is a wrapper that allows nested code to arm full table
|
||||
// watches multiple times but fire them only once.
|
||||
// watches multiple times but fire them only once. This doesn't have any
|
||||
// way to clear the state, and it's not thread-safe, so it should be used once
|
||||
// and thrown away inside the context of a single thread.
|
||||
type DumbWatchManager struct {
|
||||
// tableWatches holds the full table watches.
|
||||
tableWatches map[string]*FullTableWatch
|
||||
armed map[string]bool
|
||||
|
||||
// armed tracks whether the table should be notified.
|
||||
armed map[string]bool
|
||||
}
|
||||
|
||||
// NewDumbWatchManager returns a new dumb watch manager.
|
||||
|
@ -58,6 +64,10 @@ func NewDumbWatchManager(tableWatches map[string]*FullTableWatch) *DumbWatchMana
|
|||
|
||||
// Arm arms the given table's watch.
|
||||
func (d *DumbWatchManager) Arm(table string) {
|
||||
if _, ok := d.tableWatches[table]; !ok {
|
||||
panic(fmt.Sprintf("unknown table: %s", table))
|
||||
}
|
||||
|
||||
if _, ok := d.armed[table]; !ok {
|
||||
d.armed[table] = true
|
||||
}
|
||||
|
@ -82,7 +92,9 @@ type PrefixWatch struct {
|
|||
|
||||
// NewPrefixWatch returns a new prefix watch.
|
||||
func NewPrefixWatch() *PrefixWatch {
|
||||
return &PrefixWatch{watches: radix.New()}
|
||||
return &PrefixWatch{
|
||||
watches: radix.New(),
|
||||
}
|
||||
}
|
||||
|
||||
// GetSubwatch returns the notify group for the given prefix.
|
||||
|
@ -129,16 +141,25 @@ func (w *PrefixWatch) Notify(prefix string, subtree bool) {
|
|||
for i := len(cleanup) - 1; i >= 0; i-- {
|
||||
w.watches.Delete(cleanup[i])
|
||||
}
|
||||
|
||||
// TODO (slackpad) If a watch never fires then we will never clear it
|
||||
// out of the tree. The old state store had the same behavior, so this
|
||||
// has been around for a while. We should probably add a prefix scan
|
||||
// with a function that clears out any notify groups that are empty.
|
||||
}
|
||||
|
||||
// MultiWatch wraps several watches and allows any of them to trigger the
|
||||
// caller.
|
||||
type MultiWatch struct {
|
||||
// watches holds the list of subordinate watches to forward events to.
|
||||
watches []Watch
|
||||
}
|
||||
|
||||
// NewMultiWatch returns a new new multi watch over the given set of watches.
|
||||
func NewMultiWatch(watches ...Watch) *MultiWatch {
|
||||
return &MultiWatch{watches: watches}
|
||||
return &MultiWatch{
|
||||
watches: watches,
|
||||
}
|
||||
}
|
||||
|
||||
// See Watch.
|
||||
|
|
279
consul/state/watch_test.go
Normal file
279
consul/state/watch_test.go
Normal file
|
@ -0,0 +1,279 @@
|
|||
package state
|
||||
|
||||
import (
|
||||
"testing"
|
||||
)
|
||||
|
||||
// verifyWatch will set up a watch channel, call the given function, and then
|
||||
// make sure the watch fires.
|
||||
func verifyWatch(t *testing.T, watch Watch, fn func()) {
|
||||
ch := make(chan struct{}, 1)
|
||||
watch.Wait(ch)
|
||||
|
||||
fn()
|
||||
|
||||
select {
|
||||
case <-ch:
|
||||
default:
|
||||
t.Fatalf("watch should have been notified")
|
||||
}
|
||||
}
|
||||
|
||||
// verifyNoWatch will set up a watch channel, call the given function, and then
|
||||
// make sure the watch never fires.
|
||||
func verifyNoWatch(t *testing.T, watch Watch, fn func()) {
|
||||
ch := make(chan struct{}, 1)
|
||||
watch.Wait(ch)
|
||||
|
||||
fn()
|
||||
|
||||
select {
|
||||
case <-ch:
|
||||
t.Fatalf("watch should not been notified")
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
func TestWatch_FullTableWatch(t *testing.T) {
|
||||
w := NewFullTableWatch()
|
||||
|
||||
// Test the basic trigger with a single watcher.
|
||||
verifyWatch(t, w, func() {
|
||||
w.Notify()
|
||||
})
|
||||
|
||||
// Run multiple watchers and make sure they both fire.
|
||||
verifyWatch(t, w, func() {
|
||||
verifyWatch(t, w, func() {
|
||||
w.Notify()
|
||||
})
|
||||
})
|
||||
|
||||
// Make sure clear works.
|
||||
ch := make(chan struct{}, 1)
|
||||
w.Wait(ch)
|
||||
w.Clear(ch)
|
||||
w.Notify()
|
||||
select {
|
||||
case <-ch:
|
||||
t.Fatalf("watch should not have been notified")
|
||||
default:
|
||||
}
|
||||
|
||||
// Make sure notify is a one shot.
|
||||
w.Wait(ch)
|
||||
w.Notify()
|
||||
select {
|
||||
case <-ch:
|
||||
default:
|
||||
t.Fatalf("watch should have been notified")
|
||||
}
|
||||
w.Notify()
|
||||
select {
|
||||
case <-ch:
|
||||
t.Fatalf("watch should not have been notified")
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
func TestWatch_DumbWatchManager(t *testing.T) {
|
||||
watches := map[string]*FullTableWatch{
|
||||
"alice": NewFullTableWatch(),
|
||||
"bob": NewFullTableWatch(),
|
||||
"carol": NewFullTableWatch(),
|
||||
}
|
||||
|
||||
// Notify with nothing armed and make sure nothing triggers.
|
||||
func() {
|
||||
w := NewDumbWatchManager(watches)
|
||||
verifyNoWatch(t, watches["alice"], func() {
|
||||
verifyNoWatch(t, watches["bob"], func() {
|
||||
verifyNoWatch(t, watches["carol"], func() {
|
||||
w.Notify()
|
||||
})
|
||||
})
|
||||
})
|
||||
}()
|
||||
|
||||
// Trigger one watch.
|
||||
func() {
|
||||
w := NewDumbWatchManager(watches)
|
||||
verifyWatch(t, watches["alice"], func() {
|
||||
verifyNoWatch(t, watches["bob"], func() {
|
||||
verifyNoWatch(t, watches["carol"], func() {
|
||||
w.Arm("alice")
|
||||
w.Notify()
|
||||
})
|
||||
})
|
||||
})
|
||||
}()
|
||||
|
||||
// Trigger two watches.
|
||||
func() {
|
||||
w := NewDumbWatchManager(watches)
|
||||
verifyWatch(t, watches["alice"], func() {
|
||||
verifyNoWatch(t, watches["bob"], func() {
|
||||
verifyWatch(t, watches["carol"], func() {
|
||||
w.Arm("alice")
|
||||
w.Arm("carol")
|
||||
w.Notify()
|
||||
})
|
||||
})
|
||||
})
|
||||
}()
|
||||
|
||||
// Trigger all three watches.
|
||||
func() {
|
||||
w := NewDumbWatchManager(watches)
|
||||
verifyWatch(t, watches["alice"], func() {
|
||||
verifyWatch(t, watches["bob"], func() {
|
||||
verifyWatch(t, watches["carol"], func() {
|
||||
w.Arm("alice")
|
||||
w.Arm("bob")
|
||||
w.Arm("carol")
|
||||
w.Notify()
|
||||
})
|
||||
})
|
||||
})
|
||||
}()
|
||||
|
||||
// Trigger multiple times.
|
||||
func() {
|
||||
w := NewDumbWatchManager(watches)
|
||||
verifyWatch(t, watches["alice"], func() {
|
||||
verifyNoWatch(t, watches["bob"], func() {
|
||||
verifyNoWatch(t, watches["carol"], func() {
|
||||
w.Arm("alice")
|
||||
w.Arm("alice")
|
||||
w.Notify()
|
||||
})
|
||||
})
|
||||
})
|
||||
}()
|
||||
|
||||
// Make sure it panics when asked to arm an unknown table.
|
||||
func() {
|
||||
defer func() {
|
||||
if r := recover(); r == nil {
|
||||
t.Fatalf("didn't get expected panic")
|
||||
}
|
||||
}()
|
||||
w := NewDumbWatchManager(watches)
|
||||
w.Arm("nope")
|
||||
}()
|
||||
}
|
||||
|
||||
func TestWatch_PrefixWatch(t *testing.T) {
|
||||
w := NewPrefixWatch()
|
||||
|
||||
// Hit a specific key.
|
||||
verifyWatch(t, w.GetSubwatch(""), func() {
|
||||
verifyWatch(t, w.GetSubwatch("foo/bar/baz"), func() {
|
||||
verifyNoWatch(t, w.GetSubwatch("foo/bar/zoo"), func() {
|
||||
verifyNoWatch(t, w.GetSubwatch("nope"), func() {
|
||||
w.Notify("foo/bar/baz", false)
|
||||
})
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
// Make sure cleanup is happening. All that should be left is the
|
||||
// full-table watch and the un-fired watches.
|
||||
fn := func(k string, v interface{}) bool {
|
||||
if k != "" && k != "foo/bar/zoo" && k != "nope" {
|
||||
t.Fatalf("unexpected watch: %s", k)
|
||||
}
|
||||
return false
|
||||
}
|
||||
w.watches.WalkPrefix("", fn)
|
||||
|
||||
// Delete a subtree.
|
||||
verifyWatch(t, w.GetSubwatch(""), func() {
|
||||
verifyWatch(t, w.GetSubwatch("foo/bar/baz"), func() {
|
||||
verifyWatch(t, w.GetSubwatch("foo/bar/zoo"), func() {
|
||||
verifyNoWatch(t, w.GetSubwatch("nope"), func() {
|
||||
w.Notify("foo/", true)
|
||||
})
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
// Hit an unknown key.
|
||||
verifyWatch(t, w.GetSubwatch(""), func() {
|
||||
verifyNoWatch(t, w.GetSubwatch("foo/bar/baz"), func() {
|
||||
verifyNoWatch(t, w.GetSubwatch("foo/bar/zoo"), func() {
|
||||
verifyNoWatch(t, w.GetSubwatch("nope"), func() {
|
||||
w.Notify("not/in/there", false)
|
||||
})
|
||||
})
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
type MockWatch struct {
|
||||
Waits map[chan struct{}] int
|
||||
Clears map[chan struct{}] int
|
||||
}
|
||||
|
||||
func NewMockWatch() *MockWatch {
|
||||
return &MockWatch{
|
||||
Waits: make(map[chan struct{}] int),
|
||||
Clears: make(map[chan struct{}] int),
|
||||
}
|
||||
}
|
||||
|
||||
func (m *MockWatch) Wait(notifyCh chan struct{}) {
|
||||
if _, ok := m.Waits[notifyCh]; ok {
|
||||
m.Waits[notifyCh]++
|
||||
} else {
|
||||
m.Waits[notifyCh] = 1
|
||||
}
|
||||
}
|
||||
|
||||
func (m *MockWatch) Clear(notifyCh chan struct{}) {
|
||||
if _, ok := m.Clears[notifyCh]; ok {
|
||||
m.Clears[notifyCh]++
|
||||
} else {
|
||||
m.Clears[notifyCh] = 1
|
||||
}
|
||||
}
|
||||
|
||||
func TestWatch_MultiWatch(t *testing.T) {
|
||||
w1, w2 := NewMockWatch(), NewMockWatch()
|
||||
w := NewMultiWatch(w1, w2)
|
||||
|
||||
// Do some activity.
|
||||
c1, c2 := make(chan struct{}), make(chan struct{})
|
||||
w.Wait(c1)
|
||||
w.Clear(c1)
|
||||
w.Wait(c1)
|
||||
w.Wait(c2)
|
||||
w.Clear(c1)
|
||||
w.Clear(c2)
|
||||
|
||||
// Make sure all the events were forwarded.
|
||||
if cnt, ok := w1.Waits[c1]; !ok || cnt != 2 {
|
||||
t.Fatalf("bad: %d", w1.Waits[c1])
|
||||
}
|
||||
if cnt, ok := w1.Clears[c1]; !ok || cnt != 2 {
|
||||
t.Fatalf("bad: %d", w1.Clears[c1])
|
||||
}
|
||||
if cnt, ok := w1.Waits[c2]; !ok || cnt != 1 {
|
||||
t.Fatalf("bad: %d", w1.Waits[c2])
|
||||
}
|
||||
if cnt, ok := w1.Clears[c2]; !ok || cnt != 1 {
|
||||
t.Fatalf("bad: %d", w1.Clears[c2])
|
||||
}
|
||||
if cnt, ok := w2.Waits[c1]; !ok || cnt != 2 {
|
||||
t.Fatalf("bad: %d", w2.Waits[c1])
|
||||
}
|
||||
if cnt, ok := w2.Clears[c1]; !ok || cnt != 2 {
|
||||
t.Fatalf("bad: %d", w2.Clears[c1])
|
||||
}
|
||||
if cnt, ok := w2.Waits[c2]; !ok || cnt != 1 {
|
||||
t.Fatalf("bad: %d", w2.Waits[c2])
|
||||
}
|
||||
if cnt, ok := w2.Clears[c2]; !ok || cnt != 1 {
|
||||
t.Fatalf("bad: %d", w2.Clears[c2])
|
||||
}
|
||||
}
|
Loading…
Reference in a new issue