diff --git a/consul/kvs_endpoint.go b/consul/kvs_endpoint.go index e4b5a36b6..0fa33131f 100644 --- a/consul/kvs_endpoint.go +++ b/consul/kvs_endpoint.go @@ -90,10 +90,13 @@ func (k *KVS) Get(args *structs.KeyRequest, reply *structs.IndexedDirEntries) er // Get the local state state := k.srv.fsm.State() - return k.srv.blockingRPC(&args.QueryOptions, - &reply.QueryMeta, - state.QueryTables("KVSGet"), - func() error { + opts := blockingRPCOptions{ + queryOpts: &args.QueryOptions, + queryMeta: &reply.QueryMeta, + tables: nil, + kvWatch: true, + kvPrefix: args.Key, + run: func() error { index, ent, err := state.KVSGet(args.Key) if err != nil { return err @@ -115,7 +118,9 @@ func (k *KVS) Get(args *structs.KeyRequest, reply *structs.IndexedDirEntries) er reply.Entries = structs.DirEntries{ent} } return nil - }) + }, + } + return k.srv.blockingRPCOpt(&opts) } // List is used to list all keys with a given prefix @@ -131,10 +136,13 @@ func (k *KVS) List(args *structs.KeyRequest, reply *structs.IndexedDirEntries) e // Get the local state state := k.srv.fsm.State() - return k.srv.blockingRPC(&args.QueryOptions, - &reply.QueryMeta, - state.QueryTables("KVSList"), - func() error { + opts := blockingRPCOptions{ + queryOpts: &args.QueryOptions, + queryMeta: &reply.QueryMeta, + tables: nil, + kvWatch: true, + kvPrefix: args.Key, + run: func() error { tombIndex, index, ent, err := state.KVSList(args.Key) if err != nil { return err @@ -166,7 +174,9 @@ func (k *KVS) List(args *structs.KeyRequest, reply *structs.IndexedDirEntries) e reply.Entries = ent } return nil - }) + }, + } + return k.srv.blockingRPCOpt(&opts) } // ListKeys is used to list all keys with a given prefix to a seperator @@ -182,10 +192,13 @@ func (k *KVS) ListKeys(args *structs.KeyListRequest, reply *structs.IndexedKeyLi // Get the local state state := k.srv.fsm.State() - return k.srv.blockingRPC(&args.QueryOptions, - &reply.QueryMeta, - state.QueryTables("KVSListKeys"), - func() error { + opts := blockingRPCOptions{ + queryOpts: &args.QueryOptions, + queryMeta: &reply.QueryMeta, + tables: nil, + kvWatch: true, + kvPrefix: args.Prefix, + run: func() error { index, keys, err := state.KVSListKeys(args.Prefix, args.Seperator) reply.Index = index if acl != nil { @@ -193,5 +206,8 @@ func (k *KVS) ListKeys(args *structs.KeyListRequest, reply *structs.IndexedKeyLi } reply.Keys = keys return err - }) + + }, + } + return k.srv.blockingRPCOpt(&opts) } diff --git a/consul/rpc.go b/consul/rpc.go index c98ebb60c..0818f842b 100644 --- a/consul/rpc.go +++ b/consul/rpc.go @@ -289,57 +289,84 @@ func (s *Server) raftApply(t structs.MessageType, msg interface{}) (interface{}, // minimum index. This is used to block and wait for changes. func (s *Server) blockingRPC(b *structs.QueryOptions, m *structs.QueryMeta, tables MDBTables, run func() error) error { + opts := blockingRPCOptions{ + queryOpts: b, + queryMeta: m, + tables: tables, + run: run, + } + return s.blockingRPCOpt(&opts) +} + +// blockingRPCOptions is used to parameterize blockingRPCOpt since +// it takes so many options. It should be prefered over blockingRPC. +type blockingRPCOptions struct { + queryOpts *structs.QueryOptions + queryMeta *structs.QueryMeta + tables MDBTables + kvWatch bool + kvPrefix string + run func() error +} + +// blockingRPCOpt is the replacement for blockingRPC as it allows +// for more parameterization easily. It should be prefered over blockingRPC. +func (s *Server) blockingRPCOpt(opts *blockingRPCOptions) error { var timeout <-chan time.Time var notifyCh chan struct{} // Fast path non-blocking - if b.MinQueryIndex == 0 { + if opts.queryOpts.MinQueryIndex == 0 { goto RUN_QUERY } // Sanity check that we have tables to block on - if len(tables) == 0 { + if len(opts.tables) == 0 && !opts.kvWatch { panic("no tables to block on") } // Restrict the max query time - if b.MaxQueryTime > maxQueryTime { - b.MaxQueryTime = maxQueryTime + if opts.queryOpts.MaxQueryTime > maxQueryTime { + opts.queryOpts.MaxQueryTime = maxQueryTime } // Ensure a time limit is set if we have an index - if b.MinQueryIndex > 0 && b.MaxQueryTime == 0 { - b.MaxQueryTime = maxQueryTime + if opts.queryOpts.MinQueryIndex > 0 && opts.queryOpts.MaxQueryTime == 0 { + opts.queryOpts.MaxQueryTime = maxQueryTime } // Setup a query timeout - if b.MaxQueryTime > 0 { - timeout = time.After(b.MaxQueryTime) + if opts.queryOpts.MaxQueryTime > 0 { + timeout = time.After(opts.queryOpts.MaxQueryTime) } // Setup a notification channel for changes SETUP_NOTIFY: - if b.MinQueryIndex > 0 { + if opts.queryOpts.MinQueryIndex > 0 { notifyCh = make(chan struct{}, 1) - s.fsm.State().Watch(tables, notifyCh) + state := s.fsm.State() + state.Watch(opts.tables, notifyCh) + if opts.kvWatch { + state.WatchKV(opts.kvPrefix, notifyCh) + } } RUN_QUERY: // Update the query meta data - s.setQueryMeta(m) + s.setQueryMeta(opts.queryMeta) // Check if query must be consistent - if b.RequireConsistent { + if opts.queryOpts.RequireConsistent { if err := s.consistentRead(); err != nil { return err } } // Run the query function - err := run() + err := opts.run() // Check for minimum query time - if err == nil && m.Index > 0 && m.Index <= b.MinQueryIndex { + if err == nil && opts.queryMeta.Index > 0 && opts.queryMeta.Index <= opts.queryOpts.MinQueryIndex { select { case <-notifyCh: goto SETUP_NOTIFY