Adds new variant of blocking query wrapper with WatchSet support.
This commit is contained in:
parent
b4272f31ad
commit
085672a126
|
@ -14,6 +14,7 @@ import (
|
||||||
"github.com/hashicorp/consul/consul/state"
|
"github.com/hashicorp/consul/consul/state"
|
||||||
"github.com/hashicorp/consul/consul/structs"
|
"github.com/hashicorp/consul/consul/structs"
|
||||||
"github.com/hashicorp/consul/lib"
|
"github.com/hashicorp/consul/lib"
|
||||||
|
"github.com/hashicorp/go-memdb"
|
||||||
"github.com/hashicorp/memberlist"
|
"github.com/hashicorp/memberlist"
|
||||||
"github.com/hashicorp/net-rpc-msgpackrpc"
|
"github.com/hashicorp/net-rpc-msgpackrpc"
|
||||||
"github.com/hashicorp/yamux"
|
"github.com/hashicorp/yamux"
|
||||||
|
@ -422,6 +423,59 @@ RUN_QUERY:
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// queryFn is used to perform a query operation. If a re-query is needed, the
|
||||||
|
// passed-in watch set will be used to block for changes.
|
||||||
|
type queryFn func(memdb.WatchSet) error
|
||||||
|
|
||||||
|
// blockingQuery is used to process a potentially blocking query operation.
|
||||||
|
func (s *Server) blockingQuery(queryOpts *structs.QueryOptions, queryMeta *structs.QueryMeta,
|
||||||
|
fn queryFn) error {
|
||||||
|
var timeout *time.Timer
|
||||||
|
|
||||||
|
// Fast path right to the non-blocking query.
|
||||||
|
if queryOpts.MinQueryIndex == 0 {
|
||||||
|
goto RUN_QUERY
|
||||||
|
}
|
||||||
|
|
||||||
|
// Restrict the max query time, and ensure there is always one.
|
||||||
|
if queryOpts.MaxQueryTime > maxQueryTime {
|
||||||
|
queryOpts.MaxQueryTime = maxQueryTime
|
||||||
|
} else if queryOpts.MaxQueryTime <= 0 {
|
||||||
|
queryOpts.MaxQueryTime = defaultQueryTime
|
||||||
|
}
|
||||||
|
|
||||||
|
// Apply a small amount of jitter to the request.
|
||||||
|
queryOpts.MaxQueryTime += lib.RandomStagger(queryOpts.MaxQueryTime / jitterFraction)
|
||||||
|
|
||||||
|
// Setup a query timeout.
|
||||||
|
timeout = time.NewTimer(queryOpts.MaxQueryTime)
|
||||||
|
defer timeout.Stop()
|
||||||
|
|
||||||
|
RUN_QUERY:
|
||||||
|
// Update the query metadata.
|
||||||
|
s.setQueryMeta(queryMeta)
|
||||||
|
|
||||||
|
// If the read must be consistent we verify that we are still the leader.
|
||||||
|
if queryOpts.RequireConsistent {
|
||||||
|
if err := s.consistentRead(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Run the query.
|
||||||
|
metrics.IncrCounter([]string{"consul", "rpc", "query"}, 1)
|
||||||
|
ws := memdb.NewWatchSet()
|
||||||
|
err := fn(ws)
|
||||||
|
|
||||||
|
// Block up to the timeout if we didn't see anything fresh.
|
||||||
|
if err == nil && queryMeta.Index > 0 && queryMeta.Index <= queryOpts.MinQueryIndex {
|
||||||
|
if expired := ws.Watch(timeout.C); !expired {
|
||||||
|
goto RUN_QUERY
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
// setQueryMeta is used to populate the QueryMeta data for an RPC call
|
// setQueryMeta is used to populate the QueryMeta data for an RPC call
|
||||||
func (s *Server) setQueryMeta(m *structs.QueryMeta) {
|
func (s *Server) setQueryMeta(m *structs.QueryMeta) {
|
||||||
if s.IsLeader() {
|
if s.IsLeader() {
|
||||||
|
|
|
@ -7,6 +7,7 @@ import (
|
||||||
|
|
||||||
"github.com/hashicorp/consul/consul/structs"
|
"github.com/hashicorp/consul/consul/structs"
|
||||||
"github.com/hashicorp/consul/types"
|
"github.com/hashicorp/consul/types"
|
||||||
|
"github.com/hashicorp/go-memdb"
|
||||||
)
|
)
|
||||||
|
|
||||||
func testUUID() string {
|
func testUUID() string {
|
||||||
|
@ -122,6 +123,15 @@ func testSetKey(t *testing.T, s *StateStore, idx uint64, key, value string) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// watchFired is a helper for unit tests that returns if the given watch set
|
||||||
|
// fired (it doesn't care which watch actually fired). This uses a fixed
|
||||||
|
// 1 ms timeout since we already expect the event happened before calling
|
||||||
|
// this and just need to distinguish a fire from a timeout.
|
||||||
|
func watchFired(ws memdb.WatchSet) bool {
|
||||||
|
timedOut := ws.Watch(time.After(1 * time.Millisecond))
|
||||||
|
return !timedOut
|
||||||
|
}
|
||||||
|
|
||||||
func TestStateStore_Restore_Abort(t *testing.T) {
|
func TestStateStore_Restore_Abort(t *testing.T) {
|
||||||
s := testStateStore(t)
|
s := testStateStore(t)
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue