From 55b5fd4e6ada4f76962a339619d9dcc52fb3448c Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Mon, 21 Apr 2014 11:18:27 -0700 Subject: [PATCH] consul: Use QueryMeta to simplify blockingRPC interface --- consul/catalog_endpoint.go | 16 ++++++++-------- consul/health_endpoint.go | 16 ++++++++-------- consul/kvs_endpoint.go | 12 ++++++------ consul/rpc.go | 6 +++--- 4 files changed, 25 insertions(+), 25 deletions(-) diff --git a/consul/catalog_endpoint.go b/consul/catalog_endpoint.go index a1ec50863..35a08a1c4 100644 --- a/consul/catalog_endpoint.go +++ b/consul/catalog_endpoint.go @@ -100,9 +100,9 @@ func (c *Catalog) ListNodes(args *structs.DCSpecificRequest, reply *structs.Inde return c.srv.blockingRPC(&args.BlockingQuery, &reply.QueryMeta, state.QueryTables("Nodes"), - func() (uint64, error) { + func() error { reply.Index, reply.Nodes = state.Nodes() - return reply.Index, nil + return nil }) } @@ -117,9 +117,9 @@ func (c *Catalog) ListServices(args *structs.DCSpecificRequest, reply *structs.I return c.srv.blockingRPC(&args.BlockingQuery, &reply.QueryMeta, state.QueryTables("Services"), - func() (uint64, error) { + func() error { reply.Index, reply.Services = state.Services() - return reply.Index, nil + return nil }) } @@ -139,13 +139,13 @@ func (c *Catalog) ServiceNodes(args *structs.ServiceSpecificRequest, reply *stru err := c.srv.blockingRPC(&args.BlockingQuery, &reply.QueryMeta, state.QueryTables("ServiceNodes"), - func() (uint64, error) { + func() error { if args.TagFilter { reply.Index, reply.ServiceNodes = state.ServiceTagNodes(args.ServiceName, args.ServiceTag) } else { reply.Index, reply.ServiceNodes = state.ServiceNodes(args.ServiceName) } - return reply.Index, nil + return nil }) // Provide some metrics @@ -177,8 +177,8 @@ func (c *Catalog) NodeServices(args *structs.NodeSpecificRequest, reply *structs return c.srv.blockingRPC(&args.BlockingQuery, &reply.QueryMeta, state.QueryTables("NodeServices"), - func() (uint64, error) { + func() error { reply.Index, reply.NodeServices = state.NodeServices(args.Node) - return reply.Index, nil + return nil }) } diff --git a/consul/health_endpoint.go b/consul/health_endpoint.go index c081a1afd..67da630d9 100644 --- a/consul/health_endpoint.go +++ b/consul/health_endpoint.go @@ -23,9 +23,9 @@ func (h *Health) ChecksInState(args *structs.ChecksInStateRequest, return h.srv.blockingRPC(&args.BlockingQuery, &reply.QueryMeta, state.QueryTables("ChecksInState"), - func() (uint64, error) { + func() error { reply.Index, reply.HealthChecks = state.ChecksInState(args.State) - return reply.Index, nil + return nil }) } @@ -41,9 +41,9 @@ func (h *Health) NodeChecks(args *structs.NodeSpecificRequest, return h.srv.blockingRPC(&args.BlockingQuery, &reply.QueryMeta, state.QueryTables("NodeChecks"), - func() (uint64, error) { + func() error { reply.Index, reply.HealthChecks = state.NodeChecks(args.Node) - return reply.Index, nil + return nil }) } @@ -65,9 +65,9 @@ func (h *Health) ServiceChecks(args *structs.ServiceSpecificRequest, return h.srv.blockingRPC(&args.BlockingQuery, &reply.QueryMeta, state.QueryTables("ServiceChecks"), - func() (uint64, error) { + func() error { reply.Index, reply.HealthChecks = state.ServiceChecks(args.ServiceName) - return reply.Index, nil + return nil }) } @@ -87,13 +87,13 @@ func (h *Health) ServiceNodes(args *structs.ServiceSpecificRequest, reply *struc err := h.srv.blockingRPC(&args.BlockingQuery, &reply.QueryMeta, state.QueryTables("CheckServiceNodes"), - func() (uint64, error) { + func() error { if args.TagFilter { reply.Index, reply.Nodes = state.CheckServiceTagNodes(args.ServiceName, args.ServiceTag) } else { reply.Index, reply.Nodes = state.CheckServiceNodes(args.ServiceName) } - return reply.Index, nil + return nil }) // Provide some metrics diff --git a/consul/kvs_endpoint.go b/consul/kvs_endpoint.go index 0e1ed1ead..780e52da5 100644 --- a/consul/kvs_endpoint.go +++ b/consul/kvs_endpoint.go @@ -53,10 +53,10 @@ func (k *KVS) Get(args *structs.KeyRequest, reply *structs.IndexedDirEntries) er return k.srv.blockingRPC(&args.BlockingQuery, &reply.QueryMeta, state.QueryTables("KVSGet"), - func() (uint64, error) { + func() error { index, ent, err := state.KVSGet(args.Key) if err != nil { - return 0, err + return err } if ent == nil { // Must provide non-zero index to prevent blocking @@ -71,7 +71,7 @@ func (k *KVS) Get(args *structs.KeyRequest, reply *structs.IndexedDirEntries) er reply.Index = ent.ModifyIndex reply.Entries = structs.DirEntries{ent} } - return reply.Index, nil + return nil }) } @@ -86,10 +86,10 @@ func (k *KVS) List(args *structs.KeyRequest, reply *structs.IndexedDirEntries) e return k.srv.blockingRPC(&args.BlockingQuery, &reply.QueryMeta, state.QueryTables("KVSList"), - func() (uint64, error) { + func() error { index, ent, err := state.KVSList(args.Key) if err != nil { - return 0, err + return err } if len(ent) == 0 { // Must provide non-zero index to prevent blocking @@ -112,6 +112,6 @@ func (k *KVS) List(args *structs.KeyRequest, reply *structs.IndexedDirEntries) e reply.Index = maxIndex reply.Entries = ent } - return reply.Index, nil + return nil }) } diff --git a/consul/rpc.go b/consul/rpc.go index afe0c5152..ffaa9664e 100644 --- a/consul/rpc.go +++ b/consul/rpc.go @@ -204,7 +204,7 @@ func (s *Server) raftApply(t structs.MessageType, msg interface{}) (interface{}, // blockingRPC is used for queries that need to wait for a // minimum index. This is used to block and wait for changes. func (s *Server) blockingRPC(b *structs.BlockingQuery, m *structs.QueryMeta, - tables MDBTables, run func() (uint64, error)) error { + tables MDBTables, run func() error) error { var timeout <-chan time.Time var notifyCh chan struct{} @@ -245,10 +245,10 @@ RUN_QUERY: s.setQueryMeta(m) // Run the query function - idx, err := run() + err := run() // Check for minimum query time - if err == nil && idx <= b.MinQueryIndex { + if err == nil && m.Index > 0 && m.Index <= b.MinQueryIndex { select { case <-notifyCh: goto SETUP_NOTIFY