rpc: improve docs for blockingQuery

Follow the Go convention of accepting a small interface that documents
the methods used by the function.

Clarify the rules for implementing a query function passed to
blockingQuery.
This commit is contained in:
Daniel Nephin 2022-02-14 17:27:33 -05:00
parent b216d52b66
commit a4e1c59cd8
3 changed files with 73 additions and 24 deletions

View File

@ -8,14 +8,15 @@ import (
"sync"
"time"
"github.com/hashicorp/go-hclog"
memdb "github.com/hashicorp/go-memdb"
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/ipaddr"
"github.com/hashicorp/consul/lib/stringslice"
"github.com/hashicorp/consul/logging"
"github.com/hashicorp/go-hclog"
memdb "github.com/hashicorp/go-memdb"
)
// GatewayLocator assists in selecting an appropriate mesh gateway when wan
@ -269,7 +270,7 @@ func getRandomItem(items []string) string {
}
type serverDelegate interface {
blockingQuery(queryOpts structs.QueryOptionsCompat, queryMeta structs.QueryMetaCompat, fn queryFn) error
blockingQuery(queryOpts blockingQueryOptions, queryMeta blockingQueryResponseMeta, fn queryFn) error
IsLeader() bool
LeaderLastContact() time.Time
setDatacenterSupportsFederationStates()

View File

@ -493,8 +493,8 @@ func (d *testServerDelegate) datacenterSupportsFederationStates() bool {
// This is just enough to exercise the logic.
func (d *testServerDelegate) blockingQuery(
queryOpts structs.QueryOptionsCompat,
queryMeta structs.QueryMetaCompat,
queryOpts blockingQueryOptions,
queryMeta blockingQueryResponseMeta,
fn queryFn,
) error {
minQueryIndex := queryOpts.GetMinQueryIndex()

View File

@ -14,7 +14,6 @@ import (
"github.com/armon/go-metrics"
"github.com/armon/go-metrics/prometheus"
msgpackrpc "github.com/hashicorp/consul-net-rpc/net-rpc-msgpackrpc"
connlimit "github.com/hashicorp/go-connlimit"
"github.com/hashicorp/go-hclog"
memdb "github.com/hashicorp/go-memdb"
@ -24,6 +23,8 @@ import (
"github.com/hashicorp/yamux"
"google.golang.org/grpc"
msgpackrpc "github.com/hashicorp/consul-net-rpc/net-rpc-msgpackrpc"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/consul/wanfed"
@ -910,35 +911,82 @@ func (s *Server) raftApplyWithEncoder(
return resp, nil
}
// queryFn is used to perform a query operation. If a re-query is needed, the
// passed-in watch set will be used to block for changes. The passed-in state
// store should be used (vs. calling fsm.State()) since the given state store
// will be correctly watched for changes if the state store is restored from
// a snapshot.
// queryFn is used to perform a query operation. See Server.blockingQuery for
// the requirements of this function.
type queryFn func(memdb.WatchSet, *state.Store) error
// blockingQuery is used to process a potentially blocking query operation.
func (s *Server) blockingQuery(queryOpts structs.QueryOptionsCompat, queryMeta structs.QueryMetaCompat, fn queryFn) error {
// blockingQueryOptions are options used by Server.blockingQuery to modify the
// behaviour of the query operation, or to populate response metadata.
type blockingQueryOptions interface {
GetToken() string
GetMinQueryIndex() uint64
GetMaxQueryTime() time.Duration
GetRequireConsistent() bool
}
// blockingQueryResponseMeta is an interface used to populate the response struct
// with metadata about the query and the state of the server.
type blockingQueryResponseMeta interface {
SetLastContact(time.Duration)
SetKnownLeader(bool)
GetIndex() uint64
SetIndex(uint64)
SetResultsFilteredByACLs(bool)
}
// blockingQuery performs a blocking query if opts.GetMinQueryIndex is
// greater than 0, otherwise performs a non-blocking query. Blocking queries will
// block until responseMeta.Index is greater than opts.GetMinQueryIndex,
// or opts.GetMaxQueryTime is reached. Non-blocking queries return immediately
// after performing the query.
//
// If opts.GetRequireConsistent is true, blockingQuery will first verify it is
// still the cluster leader before performing the query.
//
// The query function is expected to be a closure that has access to responseMeta
// so that it can set the Index. The actual result of the query is opaque to blockingQuery.
// If query function returns an error, the error is returned to the caller immediately.
//
// The query function must follow these rules:
//
// 1. to access data it must use the passed in state.Store.
// 2. it must set the responseMeta.Index to an index greater than
// opts.GetMinQueryIndex if the results return by the query have changed.
// 3. any channels added to the memdb.WatchSet must unblock when the results
// returned by the query have changed.
//
// To ensure optimal performance of the query, the query function should make a
// best-effort attempt to follow these guidelines:
//
// 1. only set responseMeta.Index to an index greater than
// opts.GetMinQueryIndex when the results returned by the query have changed.
// 2. any channels added to the memdb.WatchSet should only unblock when the
// results returned by the query have changed.
func (s *Server) blockingQuery(
opts blockingQueryOptions,
responseMeta blockingQueryResponseMeta,
query queryFn,
) error {
var ctx context.Context = &lib.StopChannelContext{StopCh: s.shutdownCh}
metrics.IncrCounter([]string{"rpc", "query"}, 1)
minQueryIndex := queryOpts.GetMinQueryIndex()
minQueryIndex := opts.GetMinQueryIndex()
// Perform a non-blocking query
if minQueryIndex == 0 {
if queryOpts.GetRequireConsistent() {
if opts.GetRequireConsistent() {
if err := s.consistentRead(); err != nil {
return err
}
}
var ws memdb.WatchSet
err := fn(ws, s.fsm.State())
s.setQueryMeta(queryMeta, queryOpts.GetToken())
err := query(ws, s.fsm.State())
s.setQueryMeta(responseMeta, opts.GetToken())
return err
}
timeout := s.rpcQueryTimeout(queryOpts.GetMaxQueryTime())
timeout := s.rpcQueryTimeout(opts.GetMaxQueryTime())
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
@ -948,7 +996,7 @@ func (s *Server) blockingQuery(queryOpts structs.QueryOptionsCompat, queryMeta s
defer atomic.AddUint64(&s.queriesBlocking, ^uint64(0))
for {
if queryOpts.GetRequireConsistent() {
if opts.GetRequireConsistent() {
if err := s.consistentRead(); err != nil {
return err
}
@ -964,13 +1012,13 @@ func (s *Server) blockingQuery(queryOpts structs.QueryOptionsCompat, queryMeta s
// whole state store is abandoned.
ws.Add(state.AbandonCh())
err := fn(ws, state)
s.setQueryMeta(queryMeta, queryOpts.GetToken())
err := query(ws, state)
s.setQueryMeta(responseMeta, opts.GetToken())
if err != nil {
return err
}
if queryMeta.GetIndex() > minQueryIndex {
if responseMeta.GetIndex() > minQueryIndex {
return nil
}
@ -992,7 +1040,7 @@ func (s *Server) blockingQuery(queryOpts structs.QueryOptionsCompat, queryMeta s
// setQueryMeta is used to populate the QueryMeta data for an RPC call
//
// Note: This method must be called *after* filtering query results with ACLs.
func (s *Server) setQueryMeta(m structs.QueryMetaCompat, token string) {
func (s *Server) setQueryMeta(m blockingQueryResponseMeta, token string) {
if s.IsLeader() {
m.SetLastContact(0)
m.SetKnownLeader(true)
@ -1085,7 +1133,7 @@ func (s *Server) rpcQueryTimeout(queryTimeout time.Duration) time.Duration {
// will only check whether it is blank or not). It's a safe assumption because
// ResultsFilteredByACLs is only set to try when applying the already-resolved
// token's policies.
func maskResultsFilteredByACLs(token string, meta structs.QueryMetaCompat) {
func maskResultsFilteredByACLs(token string, meta blockingQueryResponseMeta) {
if token == "" {
meta.SetResultsFilteredByACLs(false)
}