diff --git a/consul/state_store.go b/consul/state_store.go index 825c0131a..104253418 100644 --- a/consul/state_store.go +++ b/consul/state_store.go @@ -11,6 +11,7 @@ import ( "sync" "time" + "github.com/armon/go-radix" "github.com/armon/gomdb" "github.com/hashicorp/consul/consul/structs" ) @@ -63,6 +64,14 @@ type StateStore struct { watch map[*MDBTable]*NotifyGroup queryTables map[string]MDBTables + // kvWatch is a more optimized way of watching for KV changes. + // Instead of just using a NotifyGroup for the entire table, + // a watcher is instantiated on a given prefix. When a change happens, + // only the relevant watchers are woken up. This reduces the cost of + // watching for KV changes. + kvWatch *radix.Tree + kvWatchLock sync.Mutex + // lockDelay is used to mark certain locks as unacquirable. // When a lock is forcefully released (failing health // check, destroyed session, etc), it is subject to the LockDelay @@ -131,6 +140,7 @@ func NewStateStorePath(gc *TombstoneGC, path string, logOutput io.Writer) (*Stat path: path, env: env, watch: make(map[*MDBTable]*NotifyGroup), + kvWatch: radix.New(), lockDelay: make(map[string]time.Time), gc: gc, } @@ -414,6 +424,58 @@ func (s *StateStore) Watch(tables MDBTables, notify chan struct{}) { } } +// WatchKV is used to subscribe a channel to changes in KV data +func (s *StateStore) WatchKV(prefix string, notify chan struct{}) { + s.kvWatchLock.Lock() + defer s.kvWatchLock.Unlock() + + // Check for an existing notify group + if raw, ok := s.kvWatch.Get(prefix); ok { + grp := raw.(*NotifyGroup) + grp.Wait(notify) + return + } + + // Create new notify group + grp := &NotifyGroup{} + grp.Wait(notify) + s.kvWatch.Insert(prefix, grp) +} + +// notifyKV is used to notify any KV listeners of a change +// on a prefix +func (s *StateStore) notifyKV(path string, prefix bool) { + // Backwards compatibility for old listeners + s.watch[s.kvsTable].Notify() + + s.kvWatchLock.Lock() + defer s.kvWatchLock.Unlock() + + var toDelete []string + fn := func(s string, v interface{}) bool { + group := v.(*NotifyGroup) + group.Notify() + if s != "" { + toDelete = append(toDelete, s) + } + return false + } + + // Invoke any watcher on the path downward to the key. + s.kvWatch.WalkPath(path, fn) + + // If the entire prefix may be affected (e.g. delete tree), + // invoke the entire prefix + if prefix { + s.kvWatch.WalkPrefix(path, fn) + } + + // Delete the old watch groups + for i := len(toDelete) - 1; i >= 0; i-- { + s.kvWatch.Delete(toDelete[i]) + } +} + // QueryTables returns the Tables that are queried for a given query func (s *StateStore) QueryTables(q string) MDBTables { return s.queryTables[q] @@ -1298,7 +1360,17 @@ func (s *StateStore) kvsDeleteWithIndexTxn(index uint64, tx *MDBTxn, tableIndex return err } tx.Defer(func() { - s.watch[s.kvsTable].Notify() + // Trigger the most fine grained notifications if possible + switch { + case len(parts) == 0: + s.notifyKV("", true) + case tableIndex == "id": + s.notifyKV(parts[0], false) + case tableIndex == "id_prefix": + s.notifyKV(parts[0], true) + default: + s.notifyKV("", true) + } if s.gc != nil { // If GC is configured, then we hint that this index // required expiration. @@ -1426,7 +1498,7 @@ func (s *StateStore) kvsSet( if err := s.kvsTable.SetLastIndexTxn(tx, index); err != nil { return false, err } - tx.Defer(func() { s.watch[s.kvsTable].Notify() }) + tx.Defer(func() { s.notifyKV(d.Key, false) }) return true, tx.Commit() } @@ -1785,12 +1857,12 @@ func (s *StateStore) invalidateLocks(index uint64, tx *MDBTxn, s.lockDelayLock.Unlock() }) } + tx.Defer(func() { s.notifyKV(kv.Key, false) }) } if len(pairs) > 0 { if err := s.kvsTable.SetLastIndexTxn(tx, index); err != nil { return err } - tx.Defer(func() { s.watch[s.kvsTable].Notify() }) } return nil }