diff --git a/consul/rpc.go b/consul/rpc.go index 315b7f1d2..0e1c88132 100644 --- a/consul/rpc.go +++ b/consul/rpc.go @@ -14,6 +14,7 @@ import ( "github.com/hashicorp/consul/consul/state" "github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/consul/lib" + "github.com/hashicorp/go-memdb" "github.com/hashicorp/memberlist" "github.com/hashicorp/net-rpc-msgpackrpc" "github.com/hashicorp/yamux" @@ -422,6 +423,59 @@ RUN_QUERY: return err } +// queryFn is used to perform a query operation. If a re-query is needed, the +// passed-in watch set will be used to block for changes. +type queryFn func(memdb.WatchSet) error + +// blockingQuery is used to process a potentially blocking query operation. +func (s *Server) blockingQuery(queryOpts *structs.QueryOptions, queryMeta *structs.QueryMeta, + fn queryFn) error { + var timeout *time.Timer + + // Fast path right to the non-blocking query. + if queryOpts.MinQueryIndex == 0 { + goto RUN_QUERY + } + + // Restrict the max query time, and ensure there is always one. + if queryOpts.MaxQueryTime > maxQueryTime { + queryOpts.MaxQueryTime = maxQueryTime + } else if queryOpts.MaxQueryTime <= 0 { + queryOpts.MaxQueryTime = defaultQueryTime + } + + // Apply a small amount of jitter to the request. + queryOpts.MaxQueryTime += lib.RandomStagger(queryOpts.MaxQueryTime / jitterFraction) + + // Setup a query timeout. + timeout = time.NewTimer(queryOpts.MaxQueryTime) + defer timeout.Stop() + +RUN_QUERY: + // Update the query metadata. + s.setQueryMeta(queryMeta) + + // If the read must be consistent we verify that we are still the leader. + if queryOpts.RequireConsistent { + if err := s.consistentRead(); err != nil { + return err + } + } + + // Run the query. + metrics.IncrCounter([]string{"consul", "rpc", "query"}, 1) + ws := memdb.NewWatchSet() + err := fn(ws) + + // Block up to the timeout if we didn't see anything fresh. + if err == nil && queryMeta.Index > 0 && queryMeta.Index <= queryOpts.MinQueryIndex { + if expired := ws.Watch(timeout.C); !expired { + goto RUN_QUERY + } + } + return err +} + // setQueryMeta is used to populate the QueryMeta data for an RPC call func (s *Server) setQueryMeta(m *structs.QueryMeta) { if s.IsLeader() { diff --git a/consul/state/state_store_test.go b/consul/state/state_store_test.go index 5a3c78171..56c31aef7 100644 --- a/consul/state/state_store_test.go +++ b/consul/state/state_store_test.go @@ -7,6 +7,7 @@ import ( "github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/consul/types" + "github.com/hashicorp/go-memdb" ) func testUUID() string { @@ -122,6 +123,15 @@ func testSetKey(t *testing.T, s *StateStore, idx uint64, key, value string) { } } +// watchFired is a helper for unit tests that returns if the given watch set +// fired (it doesn't care which watch actually fired). This uses a fixed +// 1 ms timeout since we already expect the event happened before calling +// this and just need to distinguish a fire from a timeout. +func watchFired(ws memdb.WatchSet) bool { + timedOut := ws.Watch(time.After(1 * time.Millisecond)) + return !timedOut +} + func TestStateStore_Restore_Abort(t *testing.T) { s := testStateStore(t)