consul: Adding methods to stop watching for changes

This commit is contained in:
Armon Dadgar 2015-05-14 17:33:02 -07:00
parent abbf4456f2
commit 6f433c9ad8
2 changed files with 45 additions and 0 deletions

View File

@ -421,6 +421,13 @@ func (s *StateStore) Watch(tables MDBTables, notify chan struct{}) {
}
}
// StopWatch is used to unsubscribe a channel to a set of MDBTables
func (s *StateStore) StopWatch(tables MDBTables, notify chan struct{}) {
for _, t := range tables {
s.watch[t].Clear(notify)
}
}
// WatchKV is used to subscribe a channel to changes in KV data
func (s *StateStore) WatchKV(prefix string, notify chan struct{}) {
s.kvWatchLock.Lock()
@ -439,6 +446,18 @@ func (s *StateStore) WatchKV(prefix string, notify chan struct{}) {
s.kvWatch.Insert(prefix, grp)
}
// StopWatchKV is used to unsubscribe a channel from changes in KV data
func (s *StateStore) StopWatchKV(prefix string, notify chan struct{}) {
s.kvWatchLock.Lock()
defer s.kvWatchLock.Unlock()
// Check for an existing notify group
if raw, ok := s.kvWatch.Get(prefix); ok {
grp := raw.(*NotifyGroup)
grp.Clear(notify)
}
}
// notifyKV is used to notify any KV listeners of a change
// on a prefix
func (s *StateStore) notifyKV(path string, prefix bool) {

View File

@ -1429,6 +1429,32 @@ func TestKVSSet_Watch(t *testing.T) {
}
}
func TestKVSSet_Watch_Stop(t *testing.T) {
store, err := testStateStore()
if err != nil {
t.Fatalf("err: %v", err)
}
defer store.Close()
notify1 := make(chan struct{}, 1)
store.WatchKV("", notify1)
store.StopWatchKV("", notify1)
// Create the entry
d := &structs.DirEntry{Key: "foo/baz", Flags: 42, Value: []byte("test")}
if err := store.KVSSet(1000, d); err != nil {
t.Fatalf("err: %v", err)
}
// Check that we've not fired notify1
select {
case <-notify1:
t.Fatalf("should not notify ")
default:
}
}
func TestKVSSet_Get(t *testing.T) {
store, err := testStateStore()
if err != nil {