From 5b91b6f06e93c5bc46cba6cb58d86b1a7fc369f0 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Mon, 21 Apr 2014 11:04:52 -0700 Subject: [PATCH] consul: Moving QueryMeta handling into blockingRPC --- consul/catalog_endpoint.go | 8 ++++---- consul/health_endpoint.go | 8 ++++---- consul/kvs_endpoint.go | 4 ++-- consul/rpc.go | 8 ++++++-- 4 files changed, 16 insertions(+), 12 deletions(-) diff --git a/consul/catalog_endpoint.go b/consul/catalog_endpoint.go index 6a65f1534..a1ec50863 100644 --- a/consul/catalog_endpoint.go +++ b/consul/catalog_endpoint.go @@ -98,9 +98,9 @@ func (c *Catalog) ListNodes(args *structs.DCSpecificRequest, reply *structs.Inde // Get the local state state := c.srv.fsm.State() return c.srv.blockingRPC(&args.BlockingQuery, + &reply.QueryMeta, state.QueryTables("Nodes"), func() (uint64, error) { - c.srv.setQueryMeta(&reply.QueryMeta) reply.Index, reply.Nodes = state.Nodes() return reply.Index, nil }) @@ -115,9 +115,9 @@ func (c *Catalog) ListServices(args *structs.DCSpecificRequest, reply *structs.I // Get the current nodes state := c.srv.fsm.State() return c.srv.blockingRPC(&args.BlockingQuery, + &reply.QueryMeta, state.QueryTables("Services"), func() (uint64, error) { - c.srv.setQueryMeta(&reply.QueryMeta) reply.Index, reply.Services = state.Services() return reply.Index, nil }) @@ -137,9 +137,9 @@ func (c *Catalog) ServiceNodes(args *structs.ServiceSpecificRequest, reply *stru // Get the nodes state := c.srv.fsm.State() err := c.srv.blockingRPC(&args.BlockingQuery, + &reply.QueryMeta, state.QueryTables("ServiceNodes"), func() (uint64, error) { - c.srv.setQueryMeta(&reply.QueryMeta) if args.TagFilter { reply.Index, reply.ServiceNodes = state.ServiceTagNodes(args.ServiceName, args.ServiceTag) } else { @@ -175,9 +175,9 @@ func (c *Catalog) NodeServices(args *structs.NodeSpecificRequest, reply *structs // Get the node services state := c.srv.fsm.State() return c.srv.blockingRPC(&args.BlockingQuery, + &reply.QueryMeta, state.QueryTables("NodeServices"), func() (uint64, error) { - c.srv.setQueryMeta(&reply.QueryMeta) reply.Index, reply.NodeServices = state.NodeServices(args.Node) return reply.Index, nil }) diff --git a/consul/health_endpoint.go b/consul/health_endpoint.go index fee6ef995..c081a1afd 100644 --- a/consul/health_endpoint.go +++ b/consul/health_endpoint.go @@ -21,9 +21,9 @@ func (h *Health) ChecksInState(args *structs.ChecksInStateRequest, // Get the state specific checks state := h.srv.fsm.State() return h.srv.blockingRPC(&args.BlockingQuery, + &reply.QueryMeta, state.QueryTables("ChecksInState"), func() (uint64, error) { - h.srv.setQueryMeta(&reply.QueryMeta) reply.Index, reply.HealthChecks = state.ChecksInState(args.State) return reply.Index, nil }) @@ -39,9 +39,9 @@ func (h *Health) NodeChecks(args *structs.NodeSpecificRequest, // Get the node checks state := h.srv.fsm.State() return h.srv.blockingRPC(&args.BlockingQuery, + &reply.QueryMeta, state.QueryTables("NodeChecks"), func() (uint64, error) { - h.srv.setQueryMeta(&reply.QueryMeta) reply.Index, reply.HealthChecks = state.NodeChecks(args.Node) return reply.Index, nil }) @@ -63,9 +63,9 @@ func (h *Health) ServiceChecks(args *structs.ServiceSpecificRequest, // Get the service checks state := h.srv.fsm.State() return h.srv.blockingRPC(&args.BlockingQuery, + &reply.QueryMeta, state.QueryTables("ServiceChecks"), func() (uint64, error) { - h.srv.setQueryMeta(&reply.QueryMeta) reply.Index, reply.HealthChecks = state.ServiceChecks(args.ServiceName) return reply.Index, nil }) @@ -85,9 +85,9 @@ func (h *Health) ServiceNodes(args *structs.ServiceSpecificRequest, reply *struc // Get the nodes state := h.srv.fsm.State() err := h.srv.blockingRPC(&args.BlockingQuery, + &reply.QueryMeta, state.QueryTables("CheckServiceNodes"), func() (uint64, error) { - h.srv.setQueryMeta(&reply.QueryMeta) if args.TagFilter { reply.Index, reply.Nodes = state.CheckServiceTagNodes(args.ServiceName, args.ServiceTag) } else { diff --git a/consul/kvs_endpoint.go b/consul/kvs_endpoint.go index fa99e3c0e..0e1ed1ead 100644 --- a/consul/kvs_endpoint.go +++ b/consul/kvs_endpoint.go @@ -51,9 +51,9 @@ func (k *KVS) Get(args *structs.KeyRequest, reply *structs.IndexedDirEntries) er // Get the local state state := k.srv.fsm.State() return k.srv.blockingRPC(&args.BlockingQuery, + &reply.QueryMeta, state.QueryTables("KVSGet"), func() (uint64, error) { - k.srv.setQueryMeta(&reply.QueryMeta) index, ent, err := state.KVSGet(args.Key) if err != nil { return 0, err @@ -84,9 +84,9 @@ func (k *KVS) List(args *structs.KeyRequest, reply *structs.IndexedDirEntries) e // Get the local state state := k.srv.fsm.State() return k.srv.blockingRPC(&args.BlockingQuery, + &reply.QueryMeta, state.QueryTables("KVSList"), func() (uint64, error) { - k.srv.setQueryMeta(&reply.QueryMeta) index, ent, err := state.KVSList(args.Key) if err != nil { return 0, err diff --git a/consul/rpc.go b/consul/rpc.go index a80840e22..afe0c5152 100644 --- a/consul/rpc.go +++ b/consul/rpc.go @@ -203,7 +203,8 @@ func (s *Server) raftApply(t structs.MessageType, msg interface{}) (interface{}, // blockingRPC is used for queries that need to wait for a // minimum index. This is used to block and wait for changes. -func (s *Server) blockingRPC(b *structs.BlockingQuery, tables MDBTables, run func() (uint64, error)) error { +func (s *Server) blockingRPC(b *structs.BlockingQuery, m *structs.QueryMeta, + tables MDBTables, run func() (uint64, error)) error { var timeout <-chan time.Time var notifyCh chan struct{} @@ -239,8 +240,11 @@ SETUP_NOTIFY: s.fsm.State().Watch(tables, notifyCh) } - // Run the query function RUN_QUERY: + // Update the query meta data + s.setQueryMeta(m) + + // Run the query function idx, err := run() // Check for minimum query time