consul: Optimize KV watching

This commit is contained in:
Armon Dadgar 2015-01-05 16:41:57 -08:00
parent 7a63f822a3
commit 0ea4f3d846
2 changed files with 72 additions and 29 deletions

View File

@ -90,10 +90,13 @@ func (k *KVS) Get(args *structs.KeyRequest, reply *structs.IndexedDirEntries) er
// Get the local state
state := k.srv.fsm.State()
return k.srv.blockingRPC(&args.QueryOptions,
&reply.QueryMeta,
state.QueryTables("KVSGet"),
func() error {
opts := blockingRPCOptions{
queryOpts: &args.QueryOptions,
queryMeta: &reply.QueryMeta,
tables: nil,
kvWatch: true,
kvPrefix: args.Key,
run: func() error {
index, ent, err := state.KVSGet(args.Key)
if err != nil {
return err
@ -115,7 +118,9 @@ func (k *KVS) Get(args *structs.KeyRequest, reply *structs.IndexedDirEntries) er
reply.Entries = structs.DirEntries{ent}
}
return nil
})
},
}
return k.srv.blockingRPCOpt(&opts)
}
// List is used to list all keys with a given prefix
@ -131,10 +136,13 @@ func (k *KVS) List(args *structs.KeyRequest, reply *structs.IndexedDirEntries) e
// Get the local state
state := k.srv.fsm.State()
return k.srv.blockingRPC(&args.QueryOptions,
&reply.QueryMeta,
state.QueryTables("KVSList"),
func() error {
opts := blockingRPCOptions{
queryOpts: &args.QueryOptions,
queryMeta: &reply.QueryMeta,
tables: nil,
kvWatch: true,
kvPrefix: args.Key,
run: func() error {
tombIndex, index, ent, err := state.KVSList(args.Key)
if err != nil {
return err
@ -166,7 +174,9 @@ func (k *KVS) List(args *structs.KeyRequest, reply *structs.IndexedDirEntries) e
reply.Entries = ent
}
return nil
})
},
}
return k.srv.blockingRPCOpt(&opts)
}
// ListKeys is used to list all keys with a given prefix to a seperator
@ -182,10 +192,13 @@ func (k *KVS) ListKeys(args *structs.KeyListRequest, reply *structs.IndexedKeyLi
// Get the local state
state := k.srv.fsm.State()
return k.srv.blockingRPC(&args.QueryOptions,
&reply.QueryMeta,
state.QueryTables("KVSListKeys"),
func() error {
opts := blockingRPCOptions{
queryOpts: &args.QueryOptions,
queryMeta: &reply.QueryMeta,
tables: nil,
kvWatch: true,
kvPrefix: args.Prefix,
run: func() error {
index, keys, err := state.KVSListKeys(args.Prefix, args.Seperator)
reply.Index = index
if acl != nil {
@ -193,5 +206,8 @@ func (k *KVS) ListKeys(args *structs.KeyListRequest, reply *structs.IndexedKeyLi
}
reply.Keys = keys
return err
})
},
}
return k.srv.blockingRPCOpt(&opts)
}

View File

@ -289,57 +289,84 @@ func (s *Server) raftApply(t structs.MessageType, msg interface{}) (interface{},
// minimum index. This is used to block and wait for changes.
func (s *Server) blockingRPC(b *structs.QueryOptions, m *structs.QueryMeta,
tables MDBTables, run func() error) error {
opts := blockingRPCOptions{
queryOpts: b,
queryMeta: m,
tables: tables,
run: run,
}
return s.blockingRPCOpt(&opts)
}
// blockingRPCOptions is used to parameterize blockingRPCOpt since
// it takes so many options. It should be prefered over blockingRPC.
type blockingRPCOptions struct {
queryOpts *structs.QueryOptions
queryMeta *structs.QueryMeta
tables MDBTables
kvWatch bool
kvPrefix string
run func() error
}
// blockingRPCOpt is the replacement for blockingRPC as it allows
// for more parameterization easily. It should be prefered over blockingRPC.
func (s *Server) blockingRPCOpt(opts *blockingRPCOptions) error {
var timeout <-chan time.Time
var notifyCh chan struct{}
// Fast path non-blocking
if b.MinQueryIndex == 0 {
if opts.queryOpts.MinQueryIndex == 0 {
goto RUN_QUERY
}
// Sanity check that we have tables to block on
if len(tables) == 0 {
if len(opts.tables) == 0 && !opts.kvWatch {
panic("no tables to block on")
}
// Restrict the max query time
if b.MaxQueryTime > maxQueryTime {
b.MaxQueryTime = maxQueryTime
if opts.queryOpts.MaxQueryTime > maxQueryTime {
opts.queryOpts.MaxQueryTime = maxQueryTime
}
// Ensure a time limit is set if we have an index
if b.MinQueryIndex > 0 && b.MaxQueryTime == 0 {
b.MaxQueryTime = maxQueryTime
if opts.queryOpts.MinQueryIndex > 0 && opts.queryOpts.MaxQueryTime == 0 {
opts.queryOpts.MaxQueryTime = maxQueryTime
}
// Setup a query timeout
if b.MaxQueryTime > 0 {
timeout = time.After(b.MaxQueryTime)
if opts.queryOpts.MaxQueryTime > 0 {
timeout = time.After(opts.queryOpts.MaxQueryTime)
}
// Setup a notification channel for changes
SETUP_NOTIFY:
if b.MinQueryIndex > 0 {
if opts.queryOpts.MinQueryIndex > 0 {
notifyCh = make(chan struct{}, 1)
s.fsm.State().Watch(tables, notifyCh)
state := s.fsm.State()
state.Watch(opts.tables, notifyCh)
if opts.kvWatch {
state.WatchKV(opts.kvPrefix, notifyCh)
}
}
RUN_QUERY:
// Update the query meta data
s.setQueryMeta(m)
s.setQueryMeta(opts.queryMeta)
// Check if query must be consistent
if b.RequireConsistent {
if opts.queryOpts.RequireConsistent {
if err := s.consistentRead(); err != nil {
return err
}
}
// Run the query function
err := run()
err := opts.run()
// Check for minimum query time
if err == nil && m.Index > 0 && m.Index <= b.MinQueryIndex {
if err == nil && opts.queryMeta.Index > 0 && opts.queryMeta.Index <= opts.queryOpts.MinQueryIndex {
select {
case <-notifyCh:
goto SETUP_NOTIFY