diff --git a/consul/kvs_endpoint.go b/consul/kvs_endpoint.go index e4b5a36b6..0851c9e4f 100644 --- a/consul/kvs_endpoint.go +++ b/consul/kvs_endpoint.go @@ -90,10 +90,12 @@ func (k *KVS) Get(args *structs.KeyRequest, reply *structs.IndexedDirEntries) er // Get the local state state := k.srv.fsm.State() - return k.srv.blockingRPC(&args.QueryOptions, - &reply.QueryMeta, - state.QueryTables("KVSGet"), - func() error { + opts := blockingRPCOptions{ + queryOpts: &args.QueryOptions, + queryMeta: &reply.QueryMeta, + kvWatch: true, + kvPrefix: args.Key, + run: func() error { index, ent, err := state.KVSGet(args.Key) if err != nil { return err @@ -115,7 +117,9 @@ func (k *KVS) Get(args *structs.KeyRequest, reply *structs.IndexedDirEntries) er reply.Entries = structs.DirEntries{ent} } return nil - }) + }, + } + return k.srv.blockingRPCOpt(&opts) } // List is used to list all keys with a given prefix @@ -131,10 +135,12 @@ func (k *KVS) List(args *structs.KeyRequest, reply *structs.IndexedDirEntries) e // Get the local state state := k.srv.fsm.State() - return k.srv.blockingRPC(&args.QueryOptions, - &reply.QueryMeta, - state.QueryTables("KVSList"), - func() error { + opts := blockingRPCOptions{ + queryOpts: &args.QueryOptions, + queryMeta: &reply.QueryMeta, + kvWatch: true, + kvPrefix: args.Key, + run: func() error { tombIndex, index, ent, err := state.KVSList(args.Key) if err != nil { return err @@ -166,7 +172,9 @@ func (k *KVS) List(args *structs.KeyRequest, reply *structs.IndexedDirEntries) e reply.Entries = ent } return nil - }) + }, + } + return k.srv.blockingRPCOpt(&opts) } // ListKeys is used to list all keys with a given prefix to a seperator @@ -182,10 +190,12 @@ func (k *KVS) ListKeys(args *structs.KeyListRequest, reply *structs.IndexedKeyLi // Get the local state state := k.srv.fsm.State() - return k.srv.blockingRPC(&args.QueryOptions, - &reply.QueryMeta, - state.QueryTables("KVSListKeys"), - func() error { + opts := blockingRPCOptions{ + queryOpts: &args.QueryOptions, + queryMeta: &reply.QueryMeta, + kvWatch: true, + kvPrefix: args.Prefix, + run: func() error { index, keys, err := state.KVSListKeys(args.Prefix, args.Seperator) reply.Index = index if acl != nil { @@ -193,5 +203,8 @@ func (k *KVS) ListKeys(args *structs.KeyListRequest, reply *structs.IndexedKeyLi } reply.Keys = keys return err - }) + + }, + } + return k.srv.blockingRPCOpt(&opts) } diff --git a/consul/kvs_endpoint_test.go b/consul/kvs_endpoint_test.go index 89615a76d..fef4508e7 100644 --- a/consul/kvs_endpoint_test.go +++ b/consul/kvs_endpoint_test.go @@ -279,6 +279,101 @@ func TestKVSEndpoint_List(t *testing.T) { } } +func TestKVSEndpoint_List_Blocking(t *testing.T) { + dir1, s1 := testServer(t) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + client := rpcClient(t, s1) + defer client.Close() + + testutil.WaitForLeader(t, client.Call, "dc1") + + keys := []string{ + "/test/key1", + "/test/key2", + "/test/sub/key3", + } + + for _, key := range keys { + arg := structs.KVSRequest{ + Datacenter: "dc1", + Op: structs.KVSSet, + DirEnt: structs.DirEntry{ + Key: key, + Flags: 1, + }, + } + var out bool + if err := client.Call("KVS.Apply", &arg, &out); err != nil { + t.Fatalf("err: %v", err) + } + } + + getR := structs.KeyRequest{ + Datacenter: "dc1", + Key: "/test", + } + var dirent structs.IndexedDirEntries + if err := client.Call("KVS.List", &getR, &dirent); err != nil { + t.Fatalf("err: %v", err) + } + + // Setup a blocking query + getR.MinQueryIndex = dirent.Index + getR.MaxQueryTime = time.Second + + // Async cause a change + start := time.Now() + go func() { + time.Sleep(100 * time.Millisecond) + client := rpcClient(t, s1) + defer client.Close() + arg := structs.KVSRequest{ + Datacenter: "dc1", + Op: structs.KVSDelete, + DirEnt: structs.DirEntry{ + Key: "/test/sub/key3", + }, + } + var out bool + if err := client.Call("KVS.Apply", &arg, &out); err != nil { + t.Fatalf("err: %v", err) + } + }() + + // Re-run the query + dirent = structs.IndexedDirEntries{} + if err := client.Call("KVS.List", &getR, &dirent); err != nil { + t.Fatalf("err: %v", err) + } + + // Should block at least 100ms + if time.Now().Sub(start) < 100*time.Millisecond { + t.Fatalf("too fast") + } + + if dirent.Index == 0 { + t.Fatalf("Bad: %v", dirent) + } + if len(dirent.Entries) != 2 { + for _, ent := range dirent.Entries { + t.Errorf("Bad: %#v", *ent) + } + } + for i := 0; i < len(dirent.Entries); i++ { + d := dirent.Entries[i] + if d.Key != keys[i] { + t.Fatalf("bad: %v", d) + } + if d.Flags != 1 { + t.Fatalf("bad: %v", d) + } + if d.Value != nil { + t.Fatalf("bad: %v", d) + } + } +} + func TestKVSEndpoint_List_ACLDeny(t *testing.T) { dir1, s1 := testServerWithConfig(t, func(c *Config) { c.ACLDatacenter = "dc1" diff --git a/consul/rpc.go b/consul/rpc.go index c98ebb60c..0818f842b 100644 --- a/consul/rpc.go +++ b/consul/rpc.go @@ -289,57 +289,84 @@ func (s *Server) raftApply(t structs.MessageType, msg interface{}) (interface{}, // minimum index. This is used to block and wait for changes. func (s *Server) blockingRPC(b *structs.QueryOptions, m *structs.QueryMeta, tables MDBTables, run func() error) error { + opts := blockingRPCOptions{ + queryOpts: b, + queryMeta: m, + tables: tables, + run: run, + } + return s.blockingRPCOpt(&opts) +} + +// blockingRPCOptions is used to parameterize blockingRPCOpt since +// it takes so many options. It should be prefered over blockingRPC. +type blockingRPCOptions struct { + queryOpts *structs.QueryOptions + queryMeta *structs.QueryMeta + tables MDBTables + kvWatch bool + kvPrefix string + run func() error +} + +// blockingRPCOpt is the replacement for blockingRPC as it allows +// for more parameterization easily. It should be prefered over blockingRPC. +func (s *Server) blockingRPCOpt(opts *blockingRPCOptions) error { var timeout <-chan time.Time var notifyCh chan struct{} // Fast path non-blocking - if b.MinQueryIndex == 0 { + if opts.queryOpts.MinQueryIndex == 0 { goto RUN_QUERY } // Sanity check that we have tables to block on - if len(tables) == 0 { + if len(opts.tables) == 0 && !opts.kvWatch { panic("no tables to block on") } // Restrict the max query time - if b.MaxQueryTime > maxQueryTime { - b.MaxQueryTime = maxQueryTime + if opts.queryOpts.MaxQueryTime > maxQueryTime { + opts.queryOpts.MaxQueryTime = maxQueryTime } // Ensure a time limit is set if we have an index - if b.MinQueryIndex > 0 && b.MaxQueryTime == 0 { - b.MaxQueryTime = maxQueryTime + if opts.queryOpts.MinQueryIndex > 0 && opts.queryOpts.MaxQueryTime == 0 { + opts.queryOpts.MaxQueryTime = maxQueryTime } // Setup a query timeout - if b.MaxQueryTime > 0 { - timeout = time.After(b.MaxQueryTime) + if opts.queryOpts.MaxQueryTime > 0 { + timeout = time.After(opts.queryOpts.MaxQueryTime) } // Setup a notification channel for changes SETUP_NOTIFY: - if b.MinQueryIndex > 0 { + if opts.queryOpts.MinQueryIndex > 0 { notifyCh = make(chan struct{}, 1) - s.fsm.State().Watch(tables, notifyCh) + state := s.fsm.State() + state.Watch(opts.tables, notifyCh) + if opts.kvWatch { + state.WatchKV(opts.kvPrefix, notifyCh) + } } RUN_QUERY: // Update the query meta data - s.setQueryMeta(m) + s.setQueryMeta(opts.queryMeta) // Check if query must be consistent - if b.RequireConsistent { + if opts.queryOpts.RequireConsistent { if err := s.consistentRead(); err != nil { return err } } // Run the query function - err := run() + err := opts.run() // Check for minimum query time - if err == nil && m.Index > 0 && m.Index <= b.MinQueryIndex { + if err == nil && opts.queryMeta.Index > 0 && opts.queryMeta.Index <= opts.queryOpts.MinQueryIndex { select { case <-notifyCh: goto SETUP_NOTIFY diff --git a/consul/state_store.go b/consul/state_store.go index 825c0131a..104253418 100644 --- a/consul/state_store.go +++ b/consul/state_store.go @@ -11,6 +11,7 @@ import ( "sync" "time" + "github.com/armon/go-radix" "github.com/armon/gomdb" "github.com/hashicorp/consul/consul/structs" ) @@ -63,6 +64,14 @@ type StateStore struct { watch map[*MDBTable]*NotifyGroup queryTables map[string]MDBTables + // kvWatch is a more optimized way of watching for KV changes. + // Instead of just using a NotifyGroup for the entire table, + // a watcher is instantiated on a given prefix. When a change happens, + // only the relevant watchers are woken up. This reduces the cost of + // watching for KV changes. + kvWatch *radix.Tree + kvWatchLock sync.Mutex + // lockDelay is used to mark certain locks as unacquirable. // When a lock is forcefully released (failing health // check, destroyed session, etc), it is subject to the LockDelay @@ -131,6 +140,7 @@ func NewStateStorePath(gc *TombstoneGC, path string, logOutput io.Writer) (*Stat path: path, env: env, watch: make(map[*MDBTable]*NotifyGroup), + kvWatch: radix.New(), lockDelay: make(map[string]time.Time), gc: gc, } @@ -414,6 +424,58 @@ func (s *StateStore) Watch(tables MDBTables, notify chan struct{}) { } } +// WatchKV is used to subscribe a channel to changes in KV data +func (s *StateStore) WatchKV(prefix string, notify chan struct{}) { + s.kvWatchLock.Lock() + defer s.kvWatchLock.Unlock() + + // Check for an existing notify group + if raw, ok := s.kvWatch.Get(prefix); ok { + grp := raw.(*NotifyGroup) + grp.Wait(notify) + return + } + + // Create new notify group + grp := &NotifyGroup{} + grp.Wait(notify) + s.kvWatch.Insert(prefix, grp) +} + +// notifyKV is used to notify any KV listeners of a change +// on a prefix +func (s *StateStore) notifyKV(path string, prefix bool) { + // Backwards compatibility for old listeners + s.watch[s.kvsTable].Notify() + + s.kvWatchLock.Lock() + defer s.kvWatchLock.Unlock() + + var toDelete []string + fn := func(s string, v interface{}) bool { + group := v.(*NotifyGroup) + group.Notify() + if s != "" { + toDelete = append(toDelete, s) + } + return false + } + + // Invoke any watcher on the path downward to the key. + s.kvWatch.WalkPath(path, fn) + + // If the entire prefix may be affected (e.g. delete tree), + // invoke the entire prefix + if prefix { + s.kvWatch.WalkPrefix(path, fn) + } + + // Delete the old watch groups + for i := len(toDelete) - 1; i >= 0; i-- { + s.kvWatch.Delete(toDelete[i]) + } +} + // QueryTables returns the Tables that are queried for a given query func (s *StateStore) QueryTables(q string) MDBTables { return s.queryTables[q] @@ -1298,7 +1360,17 @@ func (s *StateStore) kvsDeleteWithIndexTxn(index uint64, tx *MDBTxn, tableIndex return err } tx.Defer(func() { - s.watch[s.kvsTable].Notify() + // Trigger the most fine grained notifications if possible + switch { + case len(parts) == 0: + s.notifyKV("", true) + case tableIndex == "id": + s.notifyKV(parts[0], false) + case tableIndex == "id_prefix": + s.notifyKV(parts[0], true) + default: + s.notifyKV("", true) + } if s.gc != nil { // If GC is configured, then we hint that this index // required expiration. @@ -1426,7 +1498,7 @@ func (s *StateStore) kvsSet( if err := s.kvsTable.SetLastIndexTxn(tx, index); err != nil { return false, err } - tx.Defer(func() { s.watch[s.kvsTable].Notify() }) + tx.Defer(func() { s.notifyKV(d.Key, false) }) return true, tx.Commit() } @@ -1785,12 +1857,12 @@ func (s *StateStore) invalidateLocks(index uint64, tx *MDBTxn, s.lockDelayLock.Unlock() }) } + tx.Defer(func() { s.notifyKV(kv.Key, false) }) } if len(pairs) > 0 { if err := s.kvsTable.SetLastIndexTxn(tx, index); err != nil { return err } - tx.Defer(func() { s.watch[s.kvsTable].Notify() }) } return nil } diff --git a/consul/state_store_test.go b/consul/state_store_test.go index c115939f0..de4d4ebbe 100644 --- a/consul/state_store_test.go +++ b/consul/state_store_test.go @@ -1379,6 +1379,45 @@ func TestNodeDump(t *testing.T) { } } +func TestKVSSet_Watch(t *testing.T) { + store, err := testStateStore() + if err != nil { + t.Fatalf("err: %v", err) + } + defer store.Close() + + notify1 := make(chan struct{}, 1) + notify2 := make(chan struct{}, 1) + notify3 := make(chan struct{}, 1) + + store.WatchKV("", notify1) + store.WatchKV("foo/", notify2) + store.WatchKV("foo/bar", notify3) + + // Create the entry + d := &structs.DirEntry{Key: "foo/baz", Flags: 42, Value: []byte("test")} + if err := store.KVSSet(1000, d); err != nil { + t.Fatalf("err: %v", err) + } + + // Check that we've fired notify1 and notify2 + select { + case <-notify1: + default: + t.Fatalf("should notify root") + } + select { + case <-notify2: + default: + t.Fatalf("should notify foo/") + } + select { + case <-notify3: + t.Fatalf("should not notify foo/bar") + default: + } +} + func TestKVSSet_Get(t *testing.T) { store, err := testStateStore() if err != nil { @@ -1481,11 +1520,21 @@ func TestKVSDelete(t *testing.T) { t.Fatalf("err: %v", err) } + notify1 := make(chan struct{}, 1) + store.WatchKV("/", notify1) + // Delete the entry if err := store.KVSDelete(1020, "/foo"); err != nil { t.Fatalf("err: %v", err) } + // Check that we've fired notify1 + select { + case <-notify1: + default: + t.Fatalf("should notify /") + } + // Should not exist idx, d, err := store.KVSGet("/foo") if err != nil { @@ -1938,6 +1987,14 @@ func TestKVSDeleteTree(t *testing.T) { gc.SetEnabled(true) store.gc = gc + notify1 := make(chan struct{}, 1) + notify2 := make(chan struct{}, 1) + notify3 := make(chan struct{}, 1) + + store.WatchKV("", notify1) + store.WatchKV("/web/sub", notify2) + store.WatchKV("/other", notify3) + // Should not exist err = store.KVSDeleteTree(1000, "/web") if err != nil { @@ -1993,6 +2050,23 @@ func TestKVSDeleteTree(t *testing.T) { } } + // Check that we've fired notify1 and notify2 + select { + case <-notify1: + default: + t.Fatalf("should notify root") + } + select { + case <-notify2: + default: + t.Fatalf("should notify /web/sub") + } + select { + case <-notify3: + t.Fatalf("should not notify /other") + default: + } + // Check that we get a delete select { case idx := <-gc.ExpireCh(): @@ -2560,7 +2634,6 @@ func TestSessionInvalidate_KeyUnlock(t *testing.T) { t.Fatalf("err: %v", err) } defer store.Close() - if err := store.EnsureNode(3, structs.Node{"foo", "127.0.0.1"}); err != nil { t.Fatalf("err: %v", err) } @@ -2588,6 +2661,9 @@ func TestSessionInvalidate_KeyUnlock(t *testing.T) { t.Fatalf("unexpected fail") } + notify1 := make(chan struct{}, 1) + store.WatchKV("/f", notify1) + // Delete the node if err := store.DeleteNode(6, "foo"); err != nil { t.Fatalf("err: %v", err) @@ -2605,6 +2681,13 @@ func TestSessionInvalidate_KeyUnlock(t *testing.T) { t.Fatalf("bad: %v", *d2) } + // Should notify of update + select { + case <-notify1: + default: + t.Fatalf("should notify /f") + } + // Key should have a lock delay expires := store.KVSLockDelay("/foo") if expires.Before(time.Now().Add(30 * time.Millisecond)) { @@ -2647,6 +2730,9 @@ func TestSessionInvalidate_KeyDelete(t *testing.T) { t.Fatalf("unexpected fail") } + notify1 := make(chan struct{}, 1) + store.WatchKV("/f", notify1) + // Delete the node if err := store.DeleteNode(6, "foo"); err != nil { t.Fatalf("err: %v", err) @@ -2657,6 +2743,13 @@ func TestSessionInvalidate_KeyDelete(t *testing.T) { if d2 != nil { t.Fatalf("unexpected undeleted key") } + + // Should notify of update + select { + case <-notify1: + default: + t.Fatalf("should notify /f") + } } func TestACLSet_Get(t *testing.T) {