diff --git a/agent/consul/rpc.go b/agent/consul/rpc.go index c8fa6846c..6a5078170 100644 --- a/agent/consul/rpc.go +++ b/agent/consul/rpc.go @@ -919,108 +919,74 @@ type queryFn func(memdb.WatchSet, *state.Store) error // blockingQuery is used to process a potentially blocking query operation. func (s *Server) blockingQuery(queryOpts structs.QueryOptionsCompat, queryMeta structs.QueryMetaCompat, fn queryFn) error { - var cancel func() var ctx context.Context = &lib.StopChannelContext{StopCh: s.shutdownCh} - var queriesBlocking uint64 - var queryTimeout time.Duration - - // Instrument all queries run metrics.IncrCounter([]string{"rpc", "query"}, 1) minQueryIndex := queryOpts.GetMinQueryIndex() - // Fast path right to the non-blocking query. + // Perform a non-blocking query if minQueryIndex == 0 { - goto RUN_QUERY + if queryOpts.GetRequireConsistent() { + if err := s.consistentRead(); err != nil { + return err + } + } + + var ws memdb.WatchSet + err := fn(ws, s.fsm.State()) + s.setQueryMeta(queryMeta, queryOpts.GetToken()) + return err } - queryTimeout = queryOpts.GetMaxQueryTime() - // Restrict the max query time, and ensure there is always one. - if queryTimeout > s.config.MaxQueryTime { - queryTimeout = s.config.MaxQueryTime - } else if queryTimeout <= 0 { - queryTimeout = s.config.DefaultQueryTime - } - - // Apply a small amount of jitter to the request. - queryTimeout += lib.RandomStagger(queryTimeout / structs.JitterFraction) - - // wrap the base context with a deadline - ctx, cancel = context.WithDeadline(ctx, time.Now().Add(queryTimeout)) + timeout := s.rpcQueryTimeout(queryOpts.GetMaxQueryTime()) + ctx, cancel := context.WithTimeout(ctx, timeout) defer cancel() - // instrument blockingQueries - // atomic inc our server's count of in-flight blockingQueries and store the new value - queriesBlocking = atomic.AddUint64(&s.queriesBlocking, 1) - // atomic dec when we return from blockingQuery() + count := atomic.AddUint64(&s.queriesBlocking, 1) + metrics.SetGauge([]string{"rpc", "queries_blocking"}, float32(count)) + // decrement the count when the function returns. defer atomic.AddUint64(&s.queriesBlocking, ^uint64(0)) - // set the gauge directly to the new value of s.blockingQueries - metrics.SetGauge([]string{"rpc", "queries_blocking"}, float32(queriesBlocking)) -RUN_QUERY: - // Setup blocking loop - - // Validate - // If the read must be consistent we verify that we are still the leader. - if queryOpts.GetRequireConsistent() { - if err := s.consistentRead(); err != nil { - return err + for { + if queryOpts.GetRequireConsistent() { + if err := s.consistentRead(); err != nil { + return err + } } - } - // Run query - - // Operate on a consistent set of state. This makes sure that the - // abandon channel goes with the state that the caller is using to - // build watches. - state := s.fsm.State() - - // We can skip all watch tracking if this isn't a blocking query. - var ws memdb.WatchSet - if minQueryIndex > 0 { - ws = memdb.NewWatchSet() + // Operate on a consistent set of state. This makes sure that the + // abandon channel goes with the state that the caller is using to + // build watches. + state := s.fsm.State() + ws := memdb.NewWatchSet() // This channel will be closed if a snapshot is restored and the // whole state store is abandoned. ws.Add(state.AbandonCh()) - } - // Execute the queryFn - err := fn(ws, state) + err := fn(ws, state) + s.setQueryMeta(queryMeta, queryOpts.GetToken()) + if err != nil { + return err + } - // Update the query metadata. - s.setQueryMeta(queryMeta, queryOpts.GetToken()) + if queryMeta.GetIndex() > minQueryIndex { + return nil + } - // Note we check queryOpts.MinQueryIndex is greater than zero to determine if - // blocking was requested by client, NOT meta.Index since the state function - // might return zero if something is not initialized and care wasn't taken to - // handle that special case (in practice this happened a lot so fixing it - // systematically here beats trying to remember to add zero checks in every - // state method). We also need to ensure that unless there is an error, we - // return an index > 0 otherwise the client will never block and burn CPU and - // requests. - if err == nil && queryMeta.GetIndex() < 1 { - queryMeta.SetIndex(1) - } - // block up to the timeout if we don't see anything fresh. - if err == nil && minQueryIndex > 0 && queryMeta.GetIndex() <= minQueryIndex { - if err := ws.WatchCtx(ctx); err == nil { - // a non-nil error only occurs when the context is cancelled + // block until something changes, or the timeout + if err := ws.WatchCtx(ctx); err != nil { + // exit if we've reached the timeout, or other cancellation + return nil + } - // If a restore may have woken us up then bail out from - // the query immediately. This is slightly race-ey since - // this might have been interrupted for other reasons, - // but it's OK to kick it back to the caller in either - // case. - select { - case <-state.AbandonCh(): - default: - // loop back and look for an update again - goto RUN_QUERY - } + // exit if the state store has been abandoned + select { + case <-state.AbandonCh(): + return nil + default: } } - return err } // setQueryMeta is used to populate the QueryMeta data for an RPC call @@ -1035,6 +1001,17 @@ func (s *Server) setQueryMeta(m structs.QueryMetaCompat, token string) { m.SetKnownLeader(s.raft.Leader() != "") } maskResultsFilteredByACLs(token, m) + + // Always set a non-zero QueryMeta.Index. Generally we expect the + // QueryMeta.Index to be set to structs.RaftIndex.ModifyIndex. If the query + // returned no results we expect it to be set to the max index of the table, + // however we can't guarantee this always happens. + // To prevent a client from accidentally performing many non-blocking queries + // (which causes lots of unnecessary load), we always set a default value of 1. + // This is sufficient to prevent the unnecessary load in most cases. + if m.GetIndex() < 1 { + m.SetIndex(1) + } } // consistentRead is used to ensure we do not perform a stale @@ -1070,6 +1047,22 @@ func (s *Server) consistentRead() error { return structs.ErrNotReadyForConsistentReads } +// rpcQueryTimeout calculates the timeout for the query, ensures it is +// constrained to the configured limit, and adds jitter to prevent multiple +// blocking queries from all timing out at the same time. +func (s *Server) rpcQueryTimeout(queryTimeout time.Duration) time.Duration { + // Restrict the max query time, and ensure there is always one. + if queryTimeout > s.config.MaxQueryTime { + queryTimeout = s.config.MaxQueryTime + } else if queryTimeout <= 0 { + queryTimeout = s.config.DefaultQueryTime + } + + // Apply a small amount of jitter to the request. + queryTimeout += lib.RandomStagger(queryTimeout / structs.JitterFraction) + return queryTimeout +} + // maskResultsFilteredByACLs blanks out the ResultsFilteredByACLs flag if the // request is unauthenticated, to limit information leaking. // diff --git a/agent/consul/rpc_test.go b/agent/consul/rpc_test.go index 48ce26db0..0782cbade 100644 --- a/agent/consul/rpc_test.go +++ b/agent/consul/rpc_test.go @@ -236,7 +236,7 @@ func TestRPC_blockingQuery(t *testing.T) { // Perform a non-blocking query. Note that it's significant that the meta has // a zero index in response - the implied opts.MinQueryIndex is also zero but // this should not block still. - { + t.Run("non-blocking query", func(t *testing.T) { var opts structs.QueryOptions var meta structs.QueryMeta var calls int @@ -244,16 +244,13 @@ func TestRPC_blockingQuery(t *testing.T) { calls++ return nil } - if err := s.blockingQuery(&opts, &meta, fn); err != nil { - t.Fatalf("err: %v", err) - } - if calls != 1 { - t.Fatalf("bad: %d", calls) - } - } + err := s.blockingQuery(&opts, &meta, fn) + require.NoError(t, err) + require.Equal(t, 1, calls) + }) // Perform a blocking query that gets woken up and loops around once. - { + t.Run("blocking query - single loop", func(t *testing.T) { opts := structs.QueryOptions{ MinQueryIndex: 3, } @@ -272,13 +269,10 @@ func TestRPC_blockingQuery(t *testing.T) { calls++ return nil } - if err := s.blockingQuery(&opts, &meta, fn); err != nil { - t.Fatalf("err: %v", err) - } - if calls != 2 { - t.Fatalf("bad: %d", calls) - } - } + err := s.blockingQuery(&opts, &meta, fn) + require.NoError(t, err) + require.Equal(t, 2, calls) + }) // Perform a blocking query that returns a zero index from blocking func (e.g. // no state yet). This should still return an empty response immediately, but @@ -289,7 +283,7 @@ func TestRPC_blockingQuery(t *testing.T) { // covered by tests but eventually when hit in the wild causes blocking // clients to busy loop and burn CPU. This test ensure that blockingQuery // systematically does the right thing to prevent future bugs like that. - { + t.Run("blocking query with 0 modifyIndex from state func", func(t *testing.T) { opts := structs.QueryOptions{ MinQueryIndex: 0, } @@ -327,11 +321,11 @@ func TestRPC_blockingQuery(t *testing.T) { assert.True(t, t1.Sub(t0) > 20*time.Millisecond, "should have actually blocked waiting for timeout") - } + }) // Perform a query that blocks and gets interrupted when the state store // is abandoned. - { + t.Run("blocking query interrupted by abandonCh", func(t *testing.T) { opts := structs.QueryOptions{ MinQueryIndex: 3, } @@ -360,13 +354,10 @@ func TestRPC_blockingQuery(t *testing.T) { calls++ return nil } - if err := s.blockingQuery(&opts, &meta, fn); err != nil { - t.Fatalf("err: %v", err) - } - if calls != 1 { - t.Fatalf("bad: %d", calls) - } - } + err := s.blockingQuery(&opts, &meta, fn) + require.NoError(t, err) + require.Equal(t, 1, calls) + }) t.Run("ResultsFilteredByACLs is reset for unauthenticated calls", func(t *testing.T) { opts := structs.QueryOptions{ diff --git a/agent/consul/txn_endpoint_test.go b/agent/consul/txn_endpoint_test.go index 106f8f679..576343edf 100644 --- a/agent/consul/txn_endpoint_test.go +++ b/agent/consul/txn_endpoint_test.go @@ -817,6 +817,7 @@ func TestTxn_Read(t *testing.T) { }, QueryMeta: structs.QueryMeta{ KnownLeader: true, + Index: 1, }, } require.Equal(t, expected, out) diff --git a/agent/structs/structs.go b/agent/structs/structs.go index cb84f77d9..168c70efe 100644 --- a/agent/structs/structs.go +++ b/agent/structs/structs.go @@ -370,7 +370,9 @@ func (q QueryBackend) String() string { // QueryMeta allows a query response to include potentially // useful metadata about a query type QueryMeta struct { - // Index in the raft log of the latest item returned by the query. + // Index in the raft log of the latest item returned by the query. If the + // query did not return any results the Index will be a value that will + // change when a new item is added. Index uint64 // If AllowStale is used, this is time elapsed since diff --git a/agent/txn_endpoint_test.go b/agent/txn_endpoint_test.go index 1d6d3f01b..6b3a7c468 100644 --- a/agent/txn_endpoint_test.go +++ b/agent/txn_endpoint_test.go @@ -10,11 +10,12 @@ import ( "testing" "time" + "github.com/hashicorp/raft" + "github.com/stretchr/testify/assert" + "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/testrpc" - "github.com/hashicorp/raft" - "github.com/stretchr/testify/assert" ) func TestTxnEndpoint_Bad_JSON(t *testing.T) { @@ -385,6 +386,7 @@ func TestTxnEndpoint_KV_Actions(t *testing.T) { }, QueryMeta: structs.QueryMeta{ KnownLeader: true, + Index: 1, }, } assert.Equal(t, expected, txnResp)