Merge pull request #12109 from hashicorp/dnephin/blocking-query-1
rpc: make blockingQuery easier to read
This commit is contained in:
commit
d3324d0d27
|
@ -919,108 +919,74 @@ type queryFn func(memdb.WatchSet, *state.Store) error
|
|||
|
||||
// blockingQuery is used to process a potentially blocking query operation.
|
||||
func (s *Server) blockingQuery(queryOpts structs.QueryOptionsCompat, queryMeta structs.QueryMetaCompat, fn queryFn) error {
|
||||
var cancel func()
|
||||
var ctx context.Context = &lib.StopChannelContext{StopCh: s.shutdownCh}
|
||||
|
||||
var queriesBlocking uint64
|
||||
var queryTimeout time.Duration
|
||||
|
||||
// Instrument all queries run
|
||||
metrics.IncrCounter([]string{"rpc", "query"}, 1)
|
||||
|
||||
minQueryIndex := queryOpts.GetMinQueryIndex()
|
||||
// Fast path right to the non-blocking query.
|
||||
// Perform a non-blocking query
|
||||
if minQueryIndex == 0 {
|
||||
goto RUN_QUERY
|
||||
if queryOpts.GetRequireConsistent() {
|
||||
if err := s.consistentRead(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
var ws memdb.WatchSet
|
||||
err := fn(ws, s.fsm.State())
|
||||
s.setQueryMeta(queryMeta, queryOpts.GetToken())
|
||||
return err
|
||||
}
|
||||
|
||||
queryTimeout = queryOpts.GetMaxQueryTime()
|
||||
// Restrict the max query time, and ensure there is always one.
|
||||
if queryTimeout > s.config.MaxQueryTime {
|
||||
queryTimeout = s.config.MaxQueryTime
|
||||
} else if queryTimeout <= 0 {
|
||||
queryTimeout = s.config.DefaultQueryTime
|
||||
}
|
||||
|
||||
// Apply a small amount of jitter to the request.
|
||||
queryTimeout += lib.RandomStagger(queryTimeout / structs.JitterFraction)
|
||||
|
||||
// wrap the base context with a deadline
|
||||
ctx, cancel = context.WithDeadline(ctx, time.Now().Add(queryTimeout))
|
||||
timeout := s.rpcQueryTimeout(queryOpts.GetMaxQueryTime())
|
||||
ctx, cancel := context.WithTimeout(ctx, timeout)
|
||||
defer cancel()
|
||||
|
||||
// instrument blockingQueries
|
||||
// atomic inc our server's count of in-flight blockingQueries and store the new value
|
||||
queriesBlocking = atomic.AddUint64(&s.queriesBlocking, 1)
|
||||
// atomic dec when we return from blockingQuery()
|
||||
count := atomic.AddUint64(&s.queriesBlocking, 1)
|
||||
metrics.SetGauge([]string{"rpc", "queries_blocking"}, float32(count))
|
||||
// decrement the count when the function returns.
|
||||
defer atomic.AddUint64(&s.queriesBlocking, ^uint64(0))
|
||||
// set the gauge directly to the new value of s.blockingQueries
|
||||
metrics.SetGauge([]string{"rpc", "queries_blocking"}, float32(queriesBlocking))
|
||||
|
||||
RUN_QUERY:
|
||||
// Setup blocking loop
|
||||
|
||||
// Validate
|
||||
// If the read must be consistent we verify that we are still the leader.
|
||||
if queryOpts.GetRequireConsistent() {
|
||||
if err := s.consistentRead(); err != nil {
|
||||
return err
|
||||
for {
|
||||
if queryOpts.GetRequireConsistent() {
|
||||
if err := s.consistentRead(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Run query
|
||||
|
||||
// Operate on a consistent set of state. This makes sure that the
|
||||
// abandon channel goes with the state that the caller is using to
|
||||
// build watches.
|
||||
state := s.fsm.State()
|
||||
|
||||
// We can skip all watch tracking if this isn't a blocking query.
|
||||
var ws memdb.WatchSet
|
||||
if minQueryIndex > 0 {
|
||||
ws = memdb.NewWatchSet()
|
||||
// Operate on a consistent set of state. This makes sure that the
|
||||
// abandon channel goes with the state that the caller is using to
|
||||
// build watches.
|
||||
state := s.fsm.State()
|
||||
|
||||
ws := memdb.NewWatchSet()
|
||||
// This channel will be closed if a snapshot is restored and the
|
||||
// whole state store is abandoned.
|
||||
ws.Add(state.AbandonCh())
|
||||
}
|
||||
|
||||
// Execute the queryFn
|
||||
err := fn(ws, state)
|
||||
err := fn(ws, state)
|
||||
s.setQueryMeta(queryMeta, queryOpts.GetToken())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Update the query metadata.
|
||||
s.setQueryMeta(queryMeta, queryOpts.GetToken())
|
||||
if queryMeta.GetIndex() > minQueryIndex {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Note we check queryOpts.MinQueryIndex is greater than zero to determine if
|
||||
// blocking was requested by client, NOT meta.Index since the state function
|
||||
// might return zero if something is not initialized and care wasn't taken to
|
||||
// handle that special case (in practice this happened a lot so fixing it
|
||||
// systematically here beats trying to remember to add zero checks in every
|
||||
// state method). We also need to ensure that unless there is an error, we
|
||||
// return an index > 0 otherwise the client will never block and burn CPU and
|
||||
// requests.
|
||||
if err == nil && queryMeta.GetIndex() < 1 {
|
||||
queryMeta.SetIndex(1)
|
||||
}
|
||||
// block up to the timeout if we don't see anything fresh.
|
||||
if err == nil && minQueryIndex > 0 && queryMeta.GetIndex() <= minQueryIndex {
|
||||
if err := ws.WatchCtx(ctx); err == nil {
|
||||
// a non-nil error only occurs when the context is cancelled
|
||||
// block until something changes, or the timeout
|
||||
if err := ws.WatchCtx(ctx); err != nil {
|
||||
// exit if we've reached the timeout, or other cancellation
|
||||
return nil
|
||||
}
|
||||
|
||||
// If a restore may have woken us up then bail out from
|
||||
// the query immediately. This is slightly race-ey since
|
||||
// this might have been interrupted for other reasons,
|
||||
// but it's OK to kick it back to the caller in either
|
||||
// case.
|
||||
select {
|
||||
case <-state.AbandonCh():
|
||||
default:
|
||||
// loop back and look for an update again
|
||||
goto RUN_QUERY
|
||||
}
|
||||
// exit if the state store has been abandoned
|
||||
select {
|
||||
case <-state.AbandonCh():
|
||||
return nil
|
||||
default:
|
||||
}
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// setQueryMeta is used to populate the QueryMeta data for an RPC call
|
||||
|
@ -1035,6 +1001,17 @@ func (s *Server) setQueryMeta(m structs.QueryMetaCompat, token string) {
|
|||
m.SetKnownLeader(s.raft.Leader() != "")
|
||||
}
|
||||
maskResultsFilteredByACLs(token, m)
|
||||
|
||||
// Always set a non-zero QueryMeta.Index. Generally we expect the
|
||||
// QueryMeta.Index to be set to structs.RaftIndex.ModifyIndex. If the query
|
||||
// returned no results we expect it to be set to the max index of the table,
|
||||
// however we can't guarantee this always happens.
|
||||
// To prevent a client from accidentally performing many non-blocking queries
|
||||
// (which causes lots of unnecessary load), we always set a default value of 1.
|
||||
// This is sufficient to prevent the unnecessary load in most cases.
|
||||
if m.GetIndex() < 1 {
|
||||
m.SetIndex(1)
|
||||
}
|
||||
}
|
||||
|
||||
// consistentRead is used to ensure we do not perform a stale
|
||||
|
@ -1070,6 +1047,22 @@ func (s *Server) consistentRead() error {
|
|||
return structs.ErrNotReadyForConsistentReads
|
||||
}
|
||||
|
||||
// rpcQueryTimeout calculates the timeout for the query, ensures it is
|
||||
// constrained to the configured limit, and adds jitter to prevent multiple
|
||||
// blocking queries from all timing out at the same time.
|
||||
func (s *Server) rpcQueryTimeout(queryTimeout time.Duration) time.Duration {
|
||||
// Restrict the max query time, and ensure there is always one.
|
||||
if queryTimeout > s.config.MaxQueryTime {
|
||||
queryTimeout = s.config.MaxQueryTime
|
||||
} else if queryTimeout <= 0 {
|
||||
queryTimeout = s.config.DefaultQueryTime
|
||||
}
|
||||
|
||||
// Apply a small amount of jitter to the request.
|
||||
queryTimeout += lib.RandomStagger(queryTimeout / structs.JitterFraction)
|
||||
return queryTimeout
|
||||
}
|
||||
|
||||
// maskResultsFilteredByACLs blanks out the ResultsFilteredByACLs flag if the
|
||||
// request is unauthenticated, to limit information leaking.
|
||||
//
|
||||
|
|
|
@ -236,7 +236,7 @@ func TestRPC_blockingQuery(t *testing.T) {
|
|||
// Perform a non-blocking query. Note that it's significant that the meta has
|
||||
// a zero index in response - the implied opts.MinQueryIndex is also zero but
|
||||
// this should not block still.
|
||||
{
|
||||
t.Run("non-blocking query", func(t *testing.T) {
|
||||
var opts structs.QueryOptions
|
||||
var meta structs.QueryMeta
|
||||
var calls int
|
||||
|
@ -244,16 +244,13 @@ func TestRPC_blockingQuery(t *testing.T) {
|
|||
calls++
|
||||
return nil
|
||||
}
|
||||
if err := s.blockingQuery(&opts, &meta, fn); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if calls != 1 {
|
||||
t.Fatalf("bad: %d", calls)
|
||||
}
|
||||
}
|
||||
err := s.blockingQuery(&opts, &meta, fn)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 1, calls)
|
||||
})
|
||||
|
||||
// Perform a blocking query that gets woken up and loops around once.
|
||||
{
|
||||
t.Run("blocking query - single loop", func(t *testing.T) {
|
||||
opts := structs.QueryOptions{
|
||||
MinQueryIndex: 3,
|
||||
}
|
||||
|
@ -272,13 +269,10 @@ func TestRPC_blockingQuery(t *testing.T) {
|
|||
calls++
|
||||
return nil
|
||||
}
|
||||
if err := s.blockingQuery(&opts, &meta, fn); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if calls != 2 {
|
||||
t.Fatalf("bad: %d", calls)
|
||||
}
|
||||
}
|
||||
err := s.blockingQuery(&opts, &meta, fn)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 2, calls)
|
||||
})
|
||||
|
||||
// Perform a blocking query that returns a zero index from blocking func (e.g.
|
||||
// no state yet). This should still return an empty response immediately, but
|
||||
|
@ -289,7 +283,7 @@ func TestRPC_blockingQuery(t *testing.T) {
|
|||
// covered by tests but eventually when hit in the wild causes blocking
|
||||
// clients to busy loop and burn CPU. This test ensure that blockingQuery
|
||||
// systematically does the right thing to prevent future bugs like that.
|
||||
{
|
||||
t.Run("blocking query with 0 modifyIndex from state func", func(t *testing.T) {
|
||||
opts := structs.QueryOptions{
|
||||
MinQueryIndex: 0,
|
||||
}
|
||||
|
@ -327,11 +321,11 @@ func TestRPC_blockingQuery(t *testing.T) {
|
|||
assert.True(t, t1.Sub(t0) > 20*time.Millisecond,
|
||||
"should have actually blocked waiting for timeout")
|
||||
|
||||
}
|
||||
})
|
||||
|
||||
// Perform a query that blocks and gets interrupted when the state store
|
||||
// is abandoned.
|
||||
{
|
||||
t.Run("blocking query interrupted by abandonCh", func(t *testing.T) {
|
||||
opts := structs.QueryOptions{
|
||||
MinQueryIndex: 3,
|
||||
}
|
||||
|
@ -360,13 +354,10 @@ func TestRPC_blockingQuery(t *testing.T) {
|
|||
calls++
|
||||
return nil
|
||||
}
|
||||
if err := s.blockingQuery(&opts, &meta, fn); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if calls != 1 {
|
||||
t.Fatalf("bad: %d", calls)
|
||||
}
|
||||
}
|
||||
err := s.blockingQuery(&opts, &meta, fn)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 1, calls)
|
||||
})
|
||||
|
||||
t.Run("ResultsFilteredByACLs is reset for unauthenticated calls", func(t *testing.T) {
|
||||
opts := structs.QueryOptions{
|
||||
|
|
|
@ -817,6 +817,7 @@ func TestTxn_Read(t *testing.T) {
|
|||
},
|
||||
QueryMeta: structs.QueryMeta{
|
||||
KnownLeader: true,
|
||||
Index: 1,
|
||||
},
|
||||
}
|
||||
require.Equal(t, expected, out)
|
||||
|
|
|
@ -370,7 +370,9 @@ func (q QueryBackend) String() string {
|
|||
// QueryMeta allows a query response to include potentially
|
||||
// useful metadata about a query
|
||||
type QueryMeta struct {
|
||||
// Index in the raft log of the latest item returned by the query.
|
||||
// Index in the raft log of the latest item returned by the query. If the
|
||||
// query did not return any results the Index will be a value that will
|
||||
// change when a new item is added.
|
||||
Index uint64
|
||||
|
||||
// If AllowStale is used, this is time elapsed since
|
||||
|
|
|
@ -10,11 +10,12 @@ import (
|
|||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/raft"
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/api"
|
||||
"github.com/hashicorp/consul/testrpc"
|
||||
"github.com/hashicorp/raft"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestTxnEndpoint_Bad_JSON(t *testing.T) {
|
||||
|
@ -385,6 +386,7 @@ func TestTxnEndpoint_KV_Actions(t *testing.T) {
|
|||
},
|
||||
QueryMeta: structs.QueryMeta{
|
||||
KnownLeader: true,
|
||||
Index: 1,
|
||||
},
|
||||
}
|
||||
assert.Equal(t, expected, txnResp)
|
||||
|
|
Loading…
Reference in New Issue