consul: Moving QueryMeta handling into blockingRPC

This commit is contained in:
Armon Dadgar 2014-04-21 11:04:52 -07:00
parent f7bcffe627
commit 5b91b6f06e
4 changed files with 16 additions and 12 deletions

View File

@ -98,9 +98,9 @@ func (c *Catalog) ListNodes(args *structs.DCSpecificRequest, reply *structs.Inde
// Get the local state // Get the local state
state := c.srv.fsm.State() state := c.srv.fsm.State()
return c.srv.blockingRPC(&args.BlockingQuery, return c.srv.blockingRPC(&args.BlockingQuery,
&reply.QueryMeta,
state.QueryTables("Nodes"), state.QueryTables("Nodes"),
func() (uint64, error) { func() (uint64, error) {
c.srv.setQueryMeta(&reply.QueryMeta)
reply.Index, reply.Nodes = state.Nodes() reply.Index, reply.Nodes = state.Nodes()
return reply.Index, nil return reply.Index, nil
}) })
@ -115,9 +115,9 @@ func (c *Catalog) ListServices(args *structs.DCSpecificRequest, reply *structs.I
// Get the current nodes // Get the current nodes
state := c.srv.fsm.State() state := c.srv.fsm.State()
return c.srv.blockingRPC(&args.BlockingQuery, return c.srv.blockingRPC(&args.BlockingQuery,
&reply.QueryMeta,
state.QueryTables("Services"), state.QueryTables("Services"),
func() (uint64, error) { func() (uint64, error) {
c.srv.setQueryMeta(&reply.QueryMeta)
reply.Index, reply.Services = state.Services() reply.Index, reply.Services = state.Services()
return reply.Index, nil return reply.Index, nil
}) })
@ -137,9 +137,9 @@ func (c *Catalog) ServiceNodes(args *structs.ServiceSpecificRequest, reply *stru
// Get the nodes // Get the nodes
state := c.srv.fsm.State() state := c.srv.fsm.State()
err := c.srv.blockingRPC(&args.BlockingQuery, err := c.srv.blockingRPC(&args.BlockingQuery,
&reply.QueryMeta,
state.QueryTables("ServiceNodes"), state.QueryTables("ServiceNodes"),
func() (uint64, error) { func() (uint64, error) {
c.srv.setQueryMeta(&reply.QueryMeta)
if args.TagFilter { if args.TagFilter {
reply.Index, reply.ServiceNodes = state.ServiceTagNodes(args.ServiceName, args.ServiceTag) reply.Index, reply.ServiceNodes = state.ServiceTagNodes(args.ServiceName, args.ServiceTag)
} else { } else {
@ -175,9 +175,9 @@ func (c *Catalog) NodeServices(args *structs.NodeSpecificRequest, reply *structs
// Get the node services // Get the node services
state := c.srv.fsm.State() state := c.srv.fsm.State()
return c.srv.blockingRPC(&args.BlockingQuery, return c.srv.blockingRPC(&args.BlockingQuery,
&reply.QueryMeta,
state.QueryTables("NodeServices"), state.QueryTables("NodeServices"),
func() (uint64, error) { func() (uint64, error) {
c.srv.setQueryMeta(&reply.QueryMeta)
reply.Index, reply.NodeServices = state.NodeServices(args.Node) reply.Index, reply.NodeServices = state.NodeServices(args.Node)
return reply.Index, nil return reply.Index, nil
}) })

View File

@ -21,9 +21,9 @@ func (h *Health) ChecksInState(args *structs.ChecksInStateRequest,
// Get the state specific checks // Get the state specific checks
state := h.srv.fsm.State() state := h.srv.fsm.State()
return h.srv.blockingRPC(&args.BlockingQuery, return h.srv.blockingRPC(&args.BlockingQuery,
&reply.QueryMeta,
state.QueryTables("ChecksInState"), state.QueryTables("ChecksInState"),
func() (uint64, error) { func() (uint64, error) {
h.srv.setQueryMeta(&reply.QueryMeta)
reply.Index, reply.HealthChecks = state.ChecksInState(args.State) reply.Index, reply.HealthChecks = state.ChecksInState(args.State)
return reply.Index, nil return reply.Index, nil
}) })
@ -39,9 +39,9 @@ func (h *Health) NodeChecks(args *structs.NodeSpecificRequest,
// Get the node checks // Get the node checks
state := h.srv.fsm.State() state := h.srv.fsm.State()
return h.srv.blockingRPC(&args.BlockingQuery, return h.srv.blockingRPC(&args.BlockingQuery,
&reply.QueryMeta,
state.QueryTables("NodeChecks"), state.QueryTables("NodeChecks"),
func() (uint64, error) { func() (uint64, error) {
h.srv.setQueryMeta(&reply.QueryMeta)
reply.Index, reply.HealthChecks = state.NodeChecks(args.Node) reply.Index, reply.HealthChecks = state.NodeChecks(args.Node)
return reply.Index, nil return reply.Index, nil
}) })
@ -63,9 +63,9 @@ func (h *Health) ServiceChecks(args *structs.ServiceSpecificRequest,
// Get the service checks // Get the service checks
state := h.srv.fsm.State() state := h.srv.fsm.State()
return h.srv.blockingRPC(&args.BlockingQuery, return h.srv.blockingRPC(&args.BlockingQuery,
&reply.QueryMeta,
state.QueryTables("ServiceChecks"), state.QueryTables("ServiceChecks"),
func() (uint64, error) { func() (uint64, error) {
h.srv.setQueryMeta(&reply.QueryMeta)
reply.Index, reply.HealthChecks = state.ServiceChecks(args.ServiceName) reply.Index, reply.HealthChecks = state.ServiceChecks(args.ServiceName)
return reply.Index, nil return reply.Index, nil
}) })
@ -85,9 +85,9 @@ func (h *Health) ServiceNodes(args *structs.ServiceSpecificRequest, reply *struc
// Get the nodes // Get the nodes
state := h.srv.fsm.State() state := h.srv.fsm.State()
err := h.srv.blockingRPC(&args.BlockingQuery, err := h.srv.blockingRPC(&args.BlockingQuery,
&reply.QueryMeta,
state.QueryTables("CheckServiceNodes"), state.QueryTables("CheckServiceNodes"),
func() (uint64, error) { func() (uint64, error) {
h.srv.setQueryMeta(&reply.QueryMeta)
if args.TagFilter { if args.TagFilter {
reply.Index, reply.Nodes = state.CheckServiceTagNodes(args.ServiceName, args.ServiceTag) reply.Index, reply.Nodes = state.CheckServiceTagNodes(args.ServiceName, args.ServiceTag)
} else { } else {

View File

@ -51,9 +51,9 @@ func (k *KVS) Get(args *structs.KeyRequest, reply *structs.IndexedDirEntries) er
// Get the local state // Get the local state
state := k.srv.fsm.State() state := k.srv.fsm.State()
return k.srv.blockingRPC(&args.BlockingQuery, return k.srv.blockingRPC(&args.BlockingQuery,
&reply.QueryMeta,
state.QueryTables("KVSGet"), state.QueryTables("KVSGet"),
func() (uint64, error) { func() (uint64, error) {
k.srv.setQueryMeta(&reply.QueryMeta)
index, ent, err := state.KVSGet(args.Key) index, ent, err := state.KVSGet(args.Key)
if err != nil { if err != nil {
return 0, err return 0, err
@ -84,9 +84,9 @@ func (k *KVS) List(args *structs.KeyRequest, reply *structs.IndexedDirEntries) e
// Get the local state // Get the local state
state := k.srv.fsm.State() state := k.srv.fsm.State()
return k.srv.blockingRPC(&args.BlockingQuery, return k.srv.blockingRPC(&args.BlockingQuery,
&reply.QueryMeta,
state.QueryTables("KVSList"), state.QueryTables("KVSList"),
func() (uint64, error) { func() (uint64, error) {
k.srv.setQueryMeta(&reply.QueryMeta)
index, ent, err := state.KVSList(args.Key) index, ent, err := state.KVSList(args.Key)
if err != nil { if err != nil {
return 0, err return 0, err

View File

@ -203,7 +203,8 @@ func (s *Server) raftApply(t structs.MessageType, msg interface{}) (interface{},
// blockingRPC is used for queries that need to wait for a // blockingRPC is used for queries that need to wait for a
// minimum index. This is used to block and wait for changes. // minimum index. This is used to block and wait for changes.
func (s *Server) blockingRPC(b *structs.BlockingQuery, tables MDBTables, run func() (uint64, error)) error { func (s *Server) blockingRPC(b *structs.BlockingQuery, m *structs.QueryMeta,
tables MDBTables, run func() (uint64, error)) error {
var timeout <-chan time.Time var timeout <-chan time.Time
var notifyCh chan struct{} var notifyCh chan struct{}
@ -239,8 +240,11 @@ SETUP_NOTIFY:
s.fsm.State().Watch(tables, notifyCh) s.fsm.State().Watch(tables, notifyCh)
} }
// Run the query function
RUN_QUERY: RUN_QUERY:
// Update the query meta data
s.setQueryMeta(m)
// Run the query function
idx, err := run() idx, err := run()
// Check for minimum query time // Check for minimum query time