open-consul/consul/state/watch.go

220 lines
5.6 KiB
Go

package state
import (
"fmt"
"sync"
"github.com/armon/go-radix"
)
// Watch is the external interface that's common to all the different flavors.
type Watch interface {
// Wait registers the given channel and calls it back when the watch
// fires.
Wait(notifyCh chan struct{})
// Clear deregisters the given channel.
Clear(notifyCh chan struct{})
}
// FullTableWatch implements a single notify group for a table.
type FullTableWatch struct {
group NotifyGroup
}
// NewFullTableWatch returns a new full table watch.
func NewFullTableWatch() *FullTableWatch {
return &FullTableWatch{}
}
// See Watch.
func (w *FullTableWatch) Wait(notifyCh chan struct{}) {
w.group.Wait(notifyCh)
}
// See Watch.
func (w *FullTableWatch) Clear(notifyCh chan struct{}) {
w.group.Clear(notifyCh)
}
// Notify wakes up all the watchers registered for this table.
func (w *FullTableWatch) Notify() {
w.group.Notify()
}
// DumbWatchManager is a wrapper that allows nested code to arm full table
// watches multiple times but fire them only once. This doesn't have any
// way to clear the state, and it's not thread-safe, so it should be used once
// and thrown away inside the context of a single thread.
type DumbWatchManager struct {
// tableWatches holds the full table watches.
tableWatches map[string]*FullTableWatch
// armed tracks whether the table should be notified.
armed map[string]bool
}
// NewDumbWatchManager returns a new dumb watch manager.
func NewDumbWatchManager(tableWatches map[string]*FullTableWatch) *DumbWatchManager {
return &DumbWatchManager{
tableWatches: tableWatches,
armed: make(map[string]bool),
}
}
// Arm arms the given table's watch.
func (d *DumbWatchManager) Arm(table string) {
if _, ok := d.tableWatches[table]; !ok {
panic(fmt.Sprintf("unknown table: %s", table))
}
if _, ok := d.armed[table]; !ok {
d.armed[table] = true
}
}
// Notify fires watches for all the armed tables.
func (d *DumbWatchManager) Notify() {
for table, _ := range d.armed {
d.tableWatches[table].Notify()
}
}
// PrefixWatch provides a Watch-compatible interface for a PrefixWatchManager,
// bound to a specific prefix.
type PrefixWatch struct {
// manager is the underlying watch manager.
manager *PrefixWatchManager
// prefix is the prefix we are watching.
prefix string
}
// Wait registers the given channel with the notify group for our prefix.
func (w *PrefixWatch) Wait(notifyCh chan struct{}) {
w.manager.Wait(w.prefix, notifyCh)
}
// Clear deregisters the given channel from the the notify group for our prefix.
func (w *PrefixWatch) Clear(notifyCh chan struct{}) {
w.manager.Clear(w.prefix, notifyCh)
}
// PrefixWatchManager maintains a notify group for each prefix, allowing for
// much more fine-grained watches.
type PrefixWatchManager struct {
// watches has the set of notify groups, organized by prefix.
watches *radix.Tree
// lock protects the watches tree.
lock sync.Mutex
}
// NewPrefixWatchManager returns a new prefix watch manager.
func NewPrefixWatchManager() *PrefixWatchManager {
return &PrefixWatchManager{
watches: radix.New(),
}
}
// NewPrefixWatch returns a Watch-compatible interface for watching the given
// prefix.
func (w *PrefixWatchManager) NewPrefixWatch(prefix string) Watch {
return &PrefixWatch{
manager: w,
prefix: prefix,
}
}
// Wait registers the given channel on a prefix.
func (w *PrefixWatchManager) Wait(prefix string, notifyCh chan struct{}) {
w.lock.Lock()
defer w.lock.Unlock()
var group *NotifyGroup
if raw, ok := w.watches.Get(prefix); ok {
group = raw.(*NotifyGroup)
} else {
group = &NotifyGroup{}
w.watches.Insert(prefix, group)
}
group.Wait(notifyCh)
}
// Clear deregisters the given channel from the notify group for a prefix (if
// one exists).
func (w *PrefixWatchManager) Clear(prefix string, notifyCh chan struct{}) {
w.lock.Lock()
defer w.lock.Unlock()
if raw, ok := w.watches.Get(prefix); ok {
group := raw.(*NotifyGroup)
group.Clear(notifyCh)
}
}
// Notify wakes up all the watchers associated with the given prefix. If subtree
// is true then we will also notify all the tree under the prefix, such as when
// a key is being deleted.
func (w *PrefixWatchManager) Notify(prefix string, subtree bool) {
w.lock.Lock()
defer w.lock.Unlock()
var cleanup []string
fn := func(k string, raw interface{}) bool {
group := raw.(*NotifyGroup)
group.Notify()
if k != "" {
cleanup = append(cleanup, k)
}
return false
}
// Invoke any watcher on the path downward to the key.
w.watches.WalkPath(prefix, fn)
// If the entire prefix may be affected (e.g. delete tree),
// invoke the entire prefix.
if subtree {
w.watches.WalkPrefix(prefix, fn)
}
// Delete the old notify groups.
for i := len(cleanup) - 1; i >= 0; i-- {
w.watches.Delete(cleanup[i])
}
// TODO (slackpad) If a watch never fires then we will never clear it
// out of the tree. The old state store had the same behavior, so this
// has been around for a while. We should probably add a prefix scan
// with a function that clears out any notify groups that are empty.
}
// MultiWatch wraps several watches and allows any of them to trigger the
// caller.
type MultiWatch struct {
// watches holds the list of subordinate watches to forward events to.
watches []Watch
}
// NewMultiWatch returns a new new multi watch over the given set of watches.
func NewMultiWatch(watches ...Watch) *MultiWatch {
return &MultiWatch{
watches: watches,
}
}
// See Watch.
func (w *MultiWatch) Wait(notifyCh chan struct{}) {
for _, watch := range w.watches {
watch.Wait(notifyCh)
}
}
// See Watch.
func (w *MultiWatch) Clear(notifyCh chan struct{}) {
for _, watch := range w.watches {
watch.Clear(notifyCh)
}
}