rpc: refactor blocking query

To remove the TODO, and make it more readable.

In general this reduces the scope of variables, making them easier to reason about.
It also introduces more early returns so that we can see the flow from the structure
of the function.
This commit is contained in:
Daniel Nephin 2022-01-12 17:46:12 -05:00
parent 0d0b55bd1b
commit 6bf8efe607
1 changed files with 50 additions and 51 deletions

View File

@ -919,22 +919,26 @@ type queryFn func(memdb.WatchSet, *state.Store) error
// blockingQuery is used to process a potentially blocking query operation.
func (s *Server) blockingQuery(queryOpts structs.QueryOptionsCompat, queryMeta structs.QueryMetaCompat, fn queryFn) error {
var cancel func()
var ctx context.Context = &lib.StopChannelContext{StopCh: s.shutdownCh}
var queriesBlocking uint64
var queryTimeout time.Duration
// Instrument all queries run
metrics.IncrCounter([]string{"rpc", "query"}, 1)
minQueryIndex := queryOpts.GetMinQueryIndex()
// Fast path right to the non-blocking query.
// Perform a non-blocking query
if minQueryIndex == 0 {
goto RUN_QUERY
if queryOpts.GetRequireConsistent() {
if err := s.consistentRead(); err != nil {
return err
}
}
var ws memdb.WatchSet
err := fn(ws, s.fsm.State())
s.setQueryMeta(queryMeta, queryOpts.GetToken())
return err
}
queryTimeout = queryOpts.GetMaxQueryTime()
queryTimeout := queryOpts.GetMaxQueryTime()
// Restrict the max query time, and ensure there is always one.
if queryTimeout > s.config.MaxQueryTime {
queryTimeout = s.config.MaxQueryTime
@ -946,64 +950,57 @@ func (s *Server) blockingQuery(queryOpts structs.QueryOptionsCompat, queryMeta s
queryTimeout += lib.RandomStagger(queryTimeout / structs.JitterFraction)
// wrap the base context with a deadline
ctx, cancel = context.WithDeadline(ctx, time.Now().Add(queryTimeout))
ctx, cancel := context.WithTimeout(ctx, queryTimeout)
defer cancel()
// instrument blockingQueries
// atomic inc our server's count of in-flight blockingQueries and store the new value
queriesBlocking = atomic.AddUint64(&s.queriesBlocking, 1)
queriesBlocking := atomic.AddUint64(&s.queriesBlocking, 1)
// atomic dec when we return from blockingQuery()
defer atomic.AddUint64(&s.queriesBlocking, ^uint64(0))
// set the gauge directly to the new value of s.blockingQueries
metrics.SetGauge([]string{"rpc", "queries_blocking"}, float32(queriesBlocking))
RUN_QUERY:
// Setup blocking loop
// Validate
// If the read must be consistent we verify that we are still the leader.
if queryOpts.GetRequireConsistent() {
if err := s.consistentRead(); err != nil {
return err
for {
if queryOpts.GetRequireConsistent() {
if err := s.consistentRead(); err != nil {
return err
}
}
}
// Run query
// Operate on a consistent set of state. This makes sure that the
// abandon channel goes with the state that the caller is using to
// build watches.
state := s.fsm.State()
// We can skip all watch tracking if this isn't a blocking query.
var ws memdb.WatchSet
if minQueryIndex > 0 {
ws = memdb.NewWatchSet()
// Operate on a consistent set of state. This makes sure that the
// abandon channel goes with the state that the caller is using to
// build watches.
state := s.fsm.State()
ws := memdb.NewWatchSet()
// This channel will be closed if a snapshot is restored and the
// whole state store is abandoned.
ws.Add(state.AbandonCh())
}
// Execute the queryFn
err := fn(ws, state)
err := fn(ws, state)
s.setQueryMeta(queryMeta, queryOpts.GetToken())
if err != nil {
return err
}
// Update the query metadata.
s.setQueryMeta(queryMeta, queryOpts.GetToken())
// Note we check queryOpts.MinQueryIndex is greater than zero to determine if
// blocking was requested by client, NOT meta.Index since the state function
// might return zero if something is not initialized and care wasn't taken to
// handle that special case (in practice this happened a lot so fixing it
// systematically here beats trying to remember to add zero checks in every
// state method). We also need to ensure that unless there is an error, we
// return an index > 0 otherwise the client will never block and burn CPU and
// requests.
if queryMeta.GetIndex() < 1 {
queryMeta.SetIndex(1)
}
// Note we check queryOpts.MinQueryIndex is greater than zero to determine if
// blocking was requested by client, NOT meta.Index since the state function
// might return zero if something is not initialized and care wasn't taken to
// handle that special case (in practice this happened a lot so fixing it
// systematically here beats trying to remember to add zero checks in every
// state method). We also need to ensure that unless there is an error, we
// return an index > 0 otherwise the client will never block and burn CPU and
// requests.
if err == nil && queryMeta.GetIndex() < 1 {
queryMeta.SetIndex(1)
}
// block up to the timeout if we don't see anything fresh.
if err == nil && minQueryIndex > 0 && queryMeta.GetIndex() <= minQueryIndex {
if queryMeta.GetIndex() > minQueryIndex {
return nil
}
// block up to the timeout if we don't see anything fresh.
if err := ws.WatchCtx(ctx); err == nil {
// a non-nil error only occurs when the context is cancelled
@ -1014,13 +1011,15 @@ RUN_QUERY:
// case.
select {
case <-state.AbandonCh():
return nil
default:
// loop back and look for an update again
goto RUN_QUERY
}
}
if ctx.Err() != nil {
return nil
}
}
return err
}
// setQueryMeta is used to populate the QueryMeta data for an RPC call