Merge pull request #578 from hashicorp/f-kv-watch

Optimize performance of KV watchers
This commit is contained in:
Armon Dadgar 2015-01-05 17:11:23 -08:00
commit c45bd63938
5 changed files with 333 additions and 33 deletions

View File

@ -90,10 +90,12 @@ func (k *KVS) Get(args *structs.KeyRequest, reply *structs.IndexedDirEntries) er
// Get the local state // Get the local state
state := k.srv.fsm.State() state := k.srv.fsm.State()
return k.srv.blockingRPC(&args.QueryOptions, opts := blockingRPCOptions{
&reply.QueryMeta, queryOpts: &args.QueryOptions,
state.QueryTables("KVSGet"), queryMeta: &reply.QueryMeta,
func() error { kvWatch: true,
kvPrefix: args.Key,
run: func() error {
index, ent, err := state.KVSGet(args.Key) index, ent, err := state.KVSGet(args.Key)
if err != nil { if err != nil {
return err return err
@ -115,7 +117,9 @@ func (k *KVS) Get(args *structs.KeyRequest, reply *structs.IndexedDirEntries) er
reply.Entries = structs.DirEntries{ent} reply.Entries = structs.DirEntries{ent}
} }
return nil return nil
}) },
}
return k.srv.blockingRPCOpt(&opts)
} }
// List is used to list all keys with a given prefix // List is used to list all keys with a given prefix
@ -131,10 +135,12 @@ func (k *KVS) List(args *structs.KeyRequest, reply *structs.IndexedDirEntries) e
// Get the local state // Get the local state
state := k.srv.fsm.State() state := k.srv.fsm.State()
return k.srv.blockingRPC(&args.QueryOptions, opts := blockingRPCOptions{
&reply.QueryMeta, queryOpts: &args.QueryOptions,
state.QueryTables("KVSList"), queryMeta: &reply.QueryMeta,
func() error { kvWatch: true,
kvPrefix: args.Key,
run: func() error {
tombIndex, index, ent, err := state.KVSList(args.Key) tombIndex, index, ent, err := state.KVSList(args.Key)
if err != nil { if err != nil {
return err return err
@ -166,7 +172,9 @@ func (k *KVS) List(args *structs.KeyRequest, reply *structs.IndexedDirEntries) e
reply.Entries = ent reply.Entries = ent
} }
return nil return nil
}) },
}
return k.srv.blockingRPCOpt(&opts)
} }
// ListKeys is used to list all keys with a given prefix to a seperator // ListKeys is used to list all keys with a given prefix to a seperator
@ -182,10 +190,12 @@ func (k *KVS) ListKeys(args *structs.KeyListRequest, reply *structs.IndexedKeyLi
// Get the local state // Get the local state
state := k.srv.fsm.State() state := k.srv.fsm.State()
return k.srv.blockingRPC(&args.QueryOptions, opts := blockingRPCOptions{
&reply.QueryMeta, queryOpts: &args.QueryOptions,
state.QueryTables("KVSListKeys"), queryMeta: &reply.QueryMeta,
func() error { kvWatch: true,
kvPrefix: args.Prefix,
run: func() error {
index, keys, err := state.KVSListKeys(args.Prefix, args.Seperator) index, keys, err := state.KVSListKeys(args.Prefix, args.Seperator)
reply.Index = index reply.Index = index
if acl != nil { if acl != nil {
@ -193,5 +203,8 @@ func (k *KVS) ListKeys(args *structs.KeyListRequest, reply *structs.IndexedKeyLi
} }
reply.Keys = keys reply.Keys = keys
return err return err
})
},
}
return k.srv.blockingRPCOpt(&opts)
} }

View File

@ -279,6 +279,101 @@ func TestKVSEndpoint_List(t *testing.T) {
} }
} }
func TestKVSEndpoint_List_Blocking(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
client := rpcClient(t, s1)
defer client.Close()
testutil.WaitForLeader(t, client.Call, "dc1")
keys := []string{
"/test/key1",
"/test/key2",
"/test/sub/key3",
}
for _, key := range keys {
arg := structs.KVSRequest{
Datacenter: "dc1",
Op: structs.KVSSet,
DirEnt: structs.DirEntry{
Key: key,
Flags: 1,
},
}
var out bool
if err := client.Call("KVS.Apply", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
}
getR := structs.KeyRequest{
Datacenter: "dc1",
Key: "/test",
}
var dirent structs.IndexedDirEntries
if err := client.Call("KVS.List", &getR, &dirent); err != nil {
t.Fatalf("err: %v", err)
}
// Setup a blocking query
getR.MinQueryIndex = dirent.Index
getR.MaxQueryTime = time.Second
// Async cause a change
start := time.Now()
go func() {
time.Sleep(100 * time.Millisecond)
client := rpcClient(t, s1)
defer client.Close()
arg := structs.KVSRequest{
Datacenter: "dc1",
Op: structs.KVSDelete,
DirEnt: structs.DirEntry{
Key: "/test/sub/key3",
},
}
var out bool
if err := client.Call("KVS.Apply", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
}()
// Re-run the query
dirent = structs.IndexedDirEntries{}
if err := client.Call("KVS.List", &getR, &dirent); err != nil {
t.Fatalf("err: %v", err)
}
// Should block at least 100ms
if time.Now().Sub(start) < 100*time.Millisecond {
t.Fatalf("too fast")
}
if dirent.Index == 0 {
t.Fatalf("Bad: %v", dirent)
}
if len(dirent.Entries) != 2 {
for _, ent := range dirent.Entries {
t.Errorf("Bad: %#v", *ent)
}
}
for i := 0; i < len(dirent.Entries); i++ {
d := dirent.Entries[i]
if d.Key != keys[i] {
t.Fatalf("bad: %v", d)
}
if d.Flags != 1 {
t.Fatalf("bad: %v", d)
}
if d.Value != nil {
t.Fatalf("bad: %v", d)
}
}
}
func TestKVSEndpoint_List_ACLDeny(t *testing.T) { func TestKVSEndpoint_List_ACLDeny(t *testing.T) {
dir1, s1 := testServerWithConfig(t, func(c *Config) { dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.ACLDatacenter = "dc1" c.ACLDatacenter = "dc1"

View File

@ -289,57 +289,84 @@ func (s *Server) raftApply(t structs.MessageType, msg interface{}) (interface{},
// minimum index. This is used to block and wait for changes. // minimum index. This is used to block and wait for changes.
func (s *Server) blockingRPC(b *structs.QueryOptions, m *structs.QueryMeta, func (s *Server) blockingRPC(b *structs.QueryOptions, m *structs.QueryMeta,
tables MDBTables, run func() error) error { tables MDBTables, run func() error) error {
opts := blockingRPCOptions{
queryOpts: b,
queryMeta: m,
tables: tables,
run: run,
}
return s.blockingRPCOpt(&opts)
}
// blockingRPCOptions is used to parameterize blockingRPCOpt since
// it takes so many options. It should be prefered over blockingRPC.
type blockingRPCOptions struct {
queryOpts *structs.QueryOptions
queryMeta *structs.QueryMeta
tables MDBTables
kvWatch bool
kvPrefix string
run func() error
}
// blockingRPCOpt is the replacement for blockingRPC as it allows
// for more parameterization easily. It should be prefered over blockingRPC.
func (s *Server) blockingRPCOpt(opts *blockingRPCOptions) error {
var timeout <-chan time.Time var timeout <-chan time.Time
var notifyCh chan struct{} var notifyCh chan struct{}
// Fast path non-blocking // Fast path non-blocking
if b.MinQueryIndex == 0 { if opts.queryOpts.MinQueryIndex == 0 {
goto RUN_QUERY goto RUN_QUERY
} }
// Sanity check that we have tables to block on // Sanity check that we have tables to block on
if len(tables) == 0 { if len(opts.tables) == 0 && !opts.kvWatch {
panic("no tables to block on") panic("no tables to block on")
} }
// Restrict the max query time // Restrict the max query time
if b.MaxQueryTime > maxQueryTime { if opts.queryOpts.MaxQueryTime > maxQueryTime {
b.MaxQueryTime = maxQueryTime opts.queryOpts.MaxQueryTime = maxQueryTime
} }
// Ensure a time limit is set if we have an index // Ensure a time limit is set if we have an index
if b.MinQueryIndex > 0 && b.MaxQueryTime == 0 { if opts.queryOpts.MinQueryIndex > 0 && opts.queryOpts.MaxQueryTime == 0 {
b.MaxQueryTime = maxQueryTime opts.queryOpts.MaxQueryTime = maxQueryTime
} }
// Setup a query timeout // Setup a query timeout
if b.MaxQueryTime > 0 { if opts.queryOpts.MaxQueryTime > 0 {
timeout = time.After(b.MaxQueryTime) timeout = time.After(opts.queryOpts.MaxQueryTime)
} }
// Setup a notification channel for changes // Setup a notification channel for changes
SETUP_NOTIFY: SETUP_NOTIFY:
if b.MinQueryIndex > 0 { if opts.queryOpts.MinQueryIndex > 0 {
notifyCh = make(chan struct{}, 1) notifyCh = make(chan struct{}, 1)
s.fsm.State().Watch(tables, notifyCh) state := s.fsm.State()
state.Watch(opts.tables, notifyCh)
if opts.kvWatch {
state.WatchKV(opts.kvPrefix, notifyCh)
}
} }
RUN_QUERY: RUN_QUERY:
// Update the query meta data // Update the query meta data
s.setQueryMeta(m) s.setQueryMeta(opts.queryMeta)
// Check if query must be consistent // Check if query must be consistent
if b.RequireConsistent { if opts.queryOpts.RequireConsistent {
if err := s.consistentRead(); err != nil { if err := s.consistentRead(); err != nil {
return err return err
} }
} }
// Run the query function // Run the query function
err := run() err := opts.run()
// Check for minimum query time // Check for minimum query time
if err == nil && m.Index > 0 && m.Index <= b.MinQueryIndex { if err == nil && opts.queryMeta.Index > 0 && opts.queryMeta.Index <= opts.queryOpts.MinQueryIndex {
select { select {
case <-notifyCh: case <-notifyCh:
goto SETUP_NOTIFY goto SETUP_NOTIFY

View File

@ -11,6 +11,7 @@ import (
"sync" "sync"
"time" "time"
"github.com/armon/go-radix"
"github.com/armon/gomdb" "github.com/armon/gomdb"
"github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/consul/consul/structs"
) )
@ -63,6 +64,14 @@ type StateStore struct {
watch map[*MDBTable]*NotifyGroup watch map[*MDBTable]*NotifyGroup
queryTables map[string]MDBTables queryTables map[string]MDBTables
// kvWatch is a more optimized way of watching for KV changes.
// Instead of just using a NotifyGroup for the entire table,
// a watcher is instantiated on a given prefix. When a change happens,
// only the relevant watchers are woken up. This reduces the cost of
// watching for KV changes.
kvWatch *radix.Tree
kvWatchLock sync.Mutex
// lockDelay is used to mark certain locks as unacquirable. // lockDelay is used to mark certain locks as unacquirable.
// When a lock is forcefully released (failing health // When a lock is forcefully released (failing health
// check, destroyed session, etc), it is subject to the LockDelay // check, destroyed session, etc), it is subject to the LockDelay
@ -131,6 +140,7 @@ func NewStateStorePath(gc *TombstoneGC, path string, logOutput io.Writer) (*Stat
path: path, path: path,
env: env, env: env,
watch: make(map[*MDBTable]*NotifyGroup), watch: make(map[*MDBTable]*NotifyGroup),
kvWatch: radix.New(),
lockDelay: make(map[string]time.Time), lockDelay: make(map[string]time.Time),
gc: gc, gc: gc,
} }
@ -414,6 +424,58 @@ func (s *StateStore) Watch(tables MDBTables, notify chan struct{}) {
} }
} }
// WatchKV is used to subscribe a channel to changes in KV data
func (s *StateStore) WatchKV(prefix string, notify chan struct{}) {
s.kvWatchLock.Lock()
defer s.kvWatchLock.Unlock()
// Check for an existing notify group
if raw, ok := s.kvWatch.Get(prefix); ok {
grp := raw.(*NotifyGroup)
grp.Wait(notify)
return
}
// Create new notify group
grp := &NotifyGroup{}
grp.Wait(notify)
s.kvWatch.Insert(prefix, grp)
}
// notifyKV is used to notify any KV listeners of a change
// on a prefix
func (s *StateStore) notifyKV(path string, prefix bool) {
// Backwards compatibility for old listeners
s.watch[s.kvsTable].Notify()
s.kvWatchLock.Lock()
defer s.kvWatchLock.Unlock()
var toDelete []string
fn := func(s string, v interface{}) bool {
group := v.(*NotifyGroup)
group.Notify()
if s != "" {
toDelete = append(toDelete, s)
}
return false
}
// Invoke any watcher on the path downward to the key.
s.kvWatch.WalkPath(path, fn)
// If the entire prefix may be affected (e.g. delete tree),
// invoke the entire prefix
if prefix {
s.kvWatch.WalkPrefix(path, fn)
}
// Delete the old watch groups
for i := len(toDelete) - 1; i >= 0; i-- {
s.kvWatch.Delete(toDelete[i])
}
}
// QueryTables returns the Tables that are queried for a given query // QueryTables returns the Tables that are queried for a given query
func (s *StateStore) QueryTables(q string) MDBTables { func (s *StateStore) QueryTables(q string) MDBTables {
return s.queryTables[q] return s.queryTables[q]
@ -1298,7 +1360,17 @@ func (s *StateStore) kvsDeleteWithIndexTxn(index uint64, tx *MDBTxn, tableIndex
return err return err
} }
tx.Defer(func() { tx.Defer(func() {
s.watch[s.kvsTable].Notify() // Trigger the most fine grained notifications if possible
switch {
case len(parts) == 0:
s.notifyKV("", true)
case tableIndex == "id":
s.notifyKV(parts[0], false)
case tableIndex == "id_prefix":
s.notifyKV(parts[0], true)
default:
s.notifyKV("", true)
}
if s.gc != nil { if s.gc != nil {
// If GC is configured, then we hint that this index // If GC is configured, then we hint that this index
// required expiration. // required expiration.
@ -1426,7 +1498,7 @@ func (s *StateStore) kvsSet(
if err := s.kvsTable.SetLastIndexTxn(tx, index); err != nil { if err := s.kvsTable.SetLastIndexTxn(tx, index); err != nil {
return false, err return false, err
} }
tx.Defer(func() { s.watch[s.kvsTable].Notify() }) tx.Defer(func() { s.notifyKV(d.Key, false) })
return true, tx.Commit() return true, tx.Commit()
} }
@ -1785,12 +1857,12 @@ func (s *StateStore) invalidateLocks(index uint64, tx *MDBTxn,
s.lockDelayLock.Unlock() s.lockDelayLock.Unlock()
}) })
} }
tx.Defer(func() { s.notifyKV(kv.Key, false) })
} }
if len(pairs) > 0 { if len(pairs) > 0 {
if err := s.kvsTable.SetLastIndexTxn(tx, index); err != nil { if err := s.kvsTable.SetLastIndexTxn(tx, index); err != nil {
return err return err
} }
tx.Defer(func() { s.watch[s.kvsTable].Notify() })
} }
return nil return nil
} }

View File

@ -1379,6 +1379,45 @@ func TestNodeDump(t *testing.T) {
} }
} }
func TestKVSSet_Watch(t *testing.T) {
store, err := testStateStore()
if err != nil {
t.Fatalf("err: %v", err)
}
defer store.Close()
notify1 := make(chan struct{}, 1)
notify2 := make(chan struct{}, 1)
notify3 := make(chan struct{}, 1)
store.WatchKV("", notify1)
store.WatchKV("foo/", notify2)
store.WatchKV("foo/bar", notify3)
// Create the entry
d := &structs.DirEntry{Key: "foo/baz", Flags: 42, Value: []byte("test")}
if err := store.KVSSet(1000, d); err != nil {
t.Fatalf("err: %v", err)
}
// Check that we've fired notify1 and notify2
select {
case <-notify1:
default:
t.Fatalf("should notify root")
}
select {
case <-notify2:
default:
t.Fatalf("should notify foo/")
}
select {
case <-notify3:
t.Fatalf("should not notify foo/bar")
default:
}
}
func TestKVSSet_Get(t *testing.T) { func TestKVSSet_Get(t *testing.T) {
store, err := testStateStore() store, err := testStateStore()
if err != nil { if err != nil {
@ -1481,11 +1520,21 @@ func TestKVSDelete(t *testing.T) {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
notify1 := make(chan struct{}, 1)
store.WatchKV("/", notify1)
// Delete the entry // Delete the entry
if err := store.KVSDelete(1020, "/foo"); err != nil { if err := store.KVSDelete(1020, "/foo"); err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
// Check that we've fired notify1
select {
case <-notify1:
default:
t.Fatalf("should notify /")
}
// Should not exist // Should not exist
idx, d, err := store.KVSGet("/foo") idx, d, err := store.KVSGet("/foo")
if err != nil { if err != nil {
@ -1938,6 +1987,14 @@ func TestKVSDeleteTree(t *testing.T) {
gc.SetEnabled(true) gc.SetEnabled(true)
store.gc = gc store.gc = gc
notify1 := make(chan struct{}, 1)
notify2 := make(chan struct{}, 1)
notify3 := make(chan struct{}, 1)
store.WatchKV("", notify1)
store.WatchKV("/web/sub", notify2)
store.WatchKV("/other", notify3)
// Should not exist // Should not exist
err = store.KVSDeleteTree(1000, "/web") err = store.KVSDeleteTree(1000, "/web")
if err != nil { if err != nil {
@ -1993,6 +2050,23 @@ func TestKVSDeleteTree(t *testing.T) {
} }
} }
// Check that we've fired notify1 and notify2
select {
case <-notify1:
default:
t.Fatalf("should notify root")
}
select {
case <-notify2:
default:
t.Fatalf("should notify /web/sub")
}
select {
case <-notify3:
t.Fatalf("should not notify /other")
default:
}
// Check that we get a delete // Check that we get a delete
select { select {
case idx := <-gc.ExpireCh(): case idx := <-gc.ExpireCh():
@ -2560,7 +2634,6 @@ func TestSessionInvalidate_KeyUnlock(t *testing.T) {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
defer store.Close() defer store.Close()
if err := store.EnsureNode(3, structs.Node{"foo", "127.0.0.1"}); err != nil { if err := store.EnsureNode(3, structs.Node{"foo", "127.0.0.1"}); err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
@ -2588,6 +2661,9 @@ func TestSessionInvalidate_KeyUnlock(t *testing.T) {
t.Fatalf("unexpected fail") t.Fatalf("unexpected fail")
} }
notify1 := make(chan struct{}, 1)
store.WatchKV("/f", notify1)
// Delete the node // Delete the node
if err := store.DeleteNode(6, "foo"); err != nil { if err := store.DeleteNode(6, "foo"); err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
@ -2605,6 +2681,13 @@ func TestSessionInvalidate_KeyUnlock(t *testing.T) {
t.Fatalf("bad: %v", *d2) t.Fatalf("bad: %v", *d2)
} }
// Should notify of update
select {
case <-notify1:
default:
t.Fatalf("should notify /f")
}
// Key should have a lock delay // Key should have a lock delay
expires := store.KVSLockDelay("/foo") expires := store.KVSLockDelay("/foo")
if expires.Before(time.Now().Add(30 * time.Millisecond)) { if expires.Before(time.Now().Add(30 * time.Millisecond)) {
@ -2647,6 +2730,9 @@ func TestSessionInvalidate_KeyDelete(t *testing.T) {
t.Fatalf("unexpected fail") t.Fatalf("unexpected fail")
} }
notify1 := make(chan struct{}, 1)
store.WatchKV("/f", notify1)
// Delete the node // Delete the node
if err := store.DeleteNode(6, "foo"); err != nil { if err := store.DeleteNode(6, "foo"); err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
@ -2657,6 +2743,13 @@ func TestSessionInvalidate_KeyDelete(t *testing.T) {
if d2 != nil { if d2 != nil {
t.Fatalf("unexpected undeleted key") t.Fatalf("unexpected undeleted key")
} }
// Should notify of update
select {
case <-notify1:
default:
t.Fatalf("should notify /f")
}
} }
func TestACLSet_Get(t *testing.T) { func TestACLSet_Get(t *testing.T) {