consul: Use QueryMeta to simplify blockingRPC interface

This commit is contained in:
Armon Dadgar 2014-04-21 11:18:27 -07:00
parent 959ffb5758
commit 55b5fd4e6a
4 changed files with 25 additions and 25 deletions

View file

@ -100,9 +100,9 @@ func (c *Catalog) ListNodes(args *structs.DCSpecificRequest, reply *structs.Inde
return c.srv.blockingRPC(&args.BlockingQuery,
&reply.QueryMeta,
state.QueryTables("Nodes"),
func() (uint64, error) {
func() error {
reply.Index, reply.Nodes = state.Nodes()
return reply.Index, nil
return nil
})
}
@ -117,9 +117,9 @@ func (c *Catalog) ListServices(args *structs.DCSpecificRequest, reply *structs.I
return c.srv.blockingRPC(&args.BlockingQuery,
&reply.QueryMeta,
state.QueryTables("Services"),
func() (uint64, error) {
func() error {
reply.Index, reply.Services = state.Services()
return reply.Index, nil
return nil
})
}
@ -139,13 +139,13 @@ func (c *Catalog) ServiceNodes(args *structs.ServiceSpecificRequest, reply *stru
err := c.srv.blockingRPC(&args.BlockingQuery,
&reply.QueryMeta,
state.QueryTables("ServiceNodes"),
func() (uint64, error) {
func() error {
if args.TagFilter {
reply.Index, reply.ServiceNodes = state.ServiceTagNodes(args.ServiceName, args.ServiceTag)
} else {
reply.Index, reply.ServiceNodes = state.ServiceNodes(args.ServiceName)
}
return reply.Index, nil
return nil
})
// Provide some metrics
@ -177,8 +177,8 @@ func (c *Catalog) NodeServices(args *structs.NodeSpecificRequest, reply *structs
return c.srv.blockingRPC(&args.BlockingQuery,
&reply.QueryMeta,
state.QueryTables("NodeServices"),
func() (uint64, error) {
func() error {
reply.Index, reply.NodeServices = state.NodeServices(args.Node)
return reply.Index, nil
return nil
})
}

View file

@ -23,9 +23,9 @@ func (h *Health) ChecksInState(args *structs.ChecksInStateRequest,
return h.srv.blockingRPC(&args.BlockingQuery,
&reply.QueryMeta,
state.QueryTables("ChecksInState"),
func() (uint64, error) {
func() error {
reply.Index, reply.HealthChecks = state.ChecksInState(args.State)
return reply.Index, nil
return nil
})
}
@ -41,9 +41,9 @@ func (h *Health) NodeChecks(args *structs.NodeSpecificRequest,
return h.srv.blockingRPC(&args.BlockingQuery,
&reply.QueryMeta,
state.QueryTables("NodeChecks"),
func() (uint64, error) {
func() error {
reply.Index, reply.HealthChecks = state.NodeChecks(args.Node)
return reply.Index, nil
return nil
})
}
@ -65,9 +65,9 @@ func (h *Health) ServiceChecks(args *structs.ServiceSpecificRequest,
return h.srv.blockingRPC(&args.BlockingQuery,
&reply.QueryMeta,
state.QueryTables("ServiceChecks"),
func() (uint64, error) {
func() error {
reply.Index, reply.HealthChecks = state.ServiceChecks(args.ServiceName)
return reply.Index, nil
return nil
})
}
@ -87,13 +87,13 @@ func (h *Health) ServiceNodes(args *structs.ServiceSpecificRequest, reply *struc
err := h.srv.blockingRPC(&args.BlockingQuery,
&reply.QueryMeta,
state.QueryTables("CheckServiceNodes"),
func() (uint64, error) {
func() error {
if args.TagFilter {
reply.Index, reply.Nodes = state.CheckServiceTagNodes(args.ServiceName, args.ServiceTag)
} else {
reply.Index, reply.Nodes = state.CheckServiceNodes(args.ServiceName)
}
return reply.Index, nil
return nil
})
// Provide some metrics

View file

@ -53,10 +53,10 @@ func (k *KVS) Get(args *structs.KeyRequest, reply *structs.IndexedDirEntries) er
return k.srv.blockingRPC(&args.BlockingQuery,
&reply.QueryMeta,
state.QueryTables("KVSGet"),
func() (uint64, error) {
func() error {
index, ent, err := state.KVSGet(args.Key)
if err != nil {
return 0, err
return err
}
if ent == nil {
// Must provide non-zero index to prevent blocking
@ -71,7 +71,7 @@ func (k *KVS) Get(args *structs.KeyRequest, reply *structs.IndexedDirEntries) er
reply.Index = ent.ModifyIndex
reply.Entries = structs.DirEntries{ent}
}
return reply.Index, nil
return nil
})
}
@ -86,10 +86,10 @@ func (k *KVS) List(args *structs.KeyRequest, reply *structs.IndexedDirEntries) e
return k.srv.blockingRPC(&args.BlockingQuery,
&reply.QueryMeta,
state.QueryTables("KVSList"),
func() (uint64, error) {
func() error {
index, ent, err := state.KVSList(args.Key)
if err != nil {
return 0, err
return err
}
if len(ent) == 0 {
// Must provide non-zero index to prevent blocking
@ -112,6 +112,6 @@ func (k *KVS) List(args *structs.KeyRequest, reply *structs.IndexedDirEntries) e
reply.Index = maxIndex
reply.Entries = ent
}
return reply.Index, nil
return nil
})
}

View file

@ -204,7 +204,7 @@ func (s *Server) raftApply(t structs.MessageType, msg interface{}) (interface{},
// blockingRPC is used for queries that need to wait for a
// minimum index. This is used to block and wait for changes.
func (s *Server) blockingRPC(b *structs.BlockingQuery, m *structs.QueryMeta,
tables MDBTables, run func() (uint64, error)) error {
tables MDBTables, run func() error) error {
var timeout <-chan time.Time
var notifyCh chan struct{}
@ -245,10 +245,10 @@ RUN_QUERY:
s.setQueryMeta(m)
// Run the query function
idx, err := run()
err := run()
// Check for minimum query time
if err == nil && idx <= b.MinQueryIndex {
if err == nil && m.Index > 0 && m.Index <= b.MinQueryIndex {
select {
case <-notifyCh:
goto SETUP_NOTIFY