Merge pull request #12343 from hashicorp/dnephin/blocking-query-docs
rpc: improve docs for blockingQuery
This commit is contained in:
commit
7d190ceb8f
|
@ -8,14 +8,15 @@ import (
|
|||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/go-hclog"
|
||||
memdb "github.com/hashicorp/go-memdb"
|
||||
|
||||
"github.com/hashicorp/consul/agent/consul/state"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/api"
|
||||
"github.com/hashicorp/consul/ipaddr"
|
||||
"github.com/hashicorp/consul/lib/stringslice"
|
||||
"github.com/hashicorp/consul/logging"
|
||||
"github.com/hashicorp/go-hclog"
|
||||
memdb "github.com/hashicorp/go-memdb"
|
||||
)
|
||||
|
||||
// GatewayLocator assists in selecting an appropriate mesh gateway when wan
|
||||
|
@ -269,7 +270,7 @@ func getRandomItem(items []string) string {
|
|||
}
|
||||
|
||||
type serverDelegate interface {
|
||||
blockingQuery(queryOpts structs.QueryOptionsCompat, queryMeta structs.QueryMetaCompat, fn queryFn) error
|
||||
blockingQuery(queryOpts blockingQueryOptions, queryMeta blockingQueryResponseMeta, fn queryFn) error
|
||||
IsLeader() bool
|
||||
LeaderLastContact() time.Time
|
||||
setDatacenterSupportsFederationStates()
|
||||
|
|
|
@ -493,8 +493,8 @@ func (d *testServerDelegate) datacenterSupportsFederationStates() bool {
|
|||
|
||||
// This is just enough to exercise the logic.
|
||||
func (d *testServerDelegate) blockingQuery(
|
||||
queryOpts structs.QueryOptionsCompat,
|
||||
queryMeta structs.QueryMetaCompat,
|
||||
queryOpts blockingQueryOptions,
|
||||
queryMeta blockingQueryResponseMeta,
|
||||
fn queryFn,
|
||||
) error {
|
||||
minQueryIndex := queryOpts.GetMinQueryIndex()
|
||||
|
|
|
@ -14,7 +14,6 @@ import (
|
|||
|
||||
"github.com/armon/go-metrics"
|
||||
"github.com/armon/go-metrics/prometheus"
|
||||
msgpackrpc "github.com/hashicorp/consul-net-rpc/net-rpc-msgpackrpc"
|
||||
connlimit "github.com/hashicorp/go-connlimit"
|
||||
"github.com/hashicorp/go-hclog"
|
||||
memdb "github.com/hashicorp/go-memdb"
|
||||
|
@ -24,6 +23,8 @@ import (
|
|||
"github.com/hashicorp/yamux"
|
||||
"google.golang.org/grpc"
|
||||
|
||||
msgpackrpc "github.com/hashicorp/consul-net-rpc/net-rpc-msgpackrpc"
|
||||
|
||||
"github.com/hashicorp/consul/acl"
|
||||
"github.com/hashicorp/consul/agent/consul/state"
|
||||
"github.com/hashicorp/consul/agent/consul/wanfed"
|
||||
|
@ -910,35 +911,82 @@ func (s *Server) raftApplyWithEncoder(
|
|||
return resp, nil
|
||||
}
|
||||
|
||||
// queryFn is used to perform a query operation. If a re-query is needed, the
|
||||
// passed-in watch set will be used to block for changes. The passed-in state
|
||||
// store should be used (vs. calling fsm.State()) since the given state store
|
||||
// will be correctly watched for changes if the state store is restored from
|
||||
// a snapshot.
|
||||
// queryFn is used to perform a query operation. See Server.blockingQuery for
|
||||
// the requirements of this function.
|
||||
type queryFn func(memdb.WatchSet, *state.Store) error
|
||||
|
||||
// blockingQuery is used to process a potentially blocking query operation.
|
||||
func (s *Server) blockingQuery(queryOpts structs.QueryOptionsCompat, queryMeta structs.QueryMetaCompat, fn queryFn) error {
|
||||
// blockingQueryOptions are options used by Server.blockingQuery to modify the
|
||||
// behaviour of the query operation, or to populate response metadata.
|
||||
type blockingQueryOptions interface {
|
||||
GetToken() string
|
||||
GetMinQueryIndex() uint64
|
||||
GetMaxQueryTime() time.Duration
|
||||
GetRequireConsistent() bool
|
||||
}
|
||||
|
||||
// blockingQueryResponseMeta is an interface used to populate the response struct
|
||||
// with metadata about the query and the state of the server.
|
||||
type blockingQueryResponseMeta interface {
|
||||
SetLastContact(time.Duration)
|
||||
SetKnownLeader(bool)
|
||||
GetIndex() uint64
|
||||
SetIndex(uint64)
|
||||
SetResultsFilteredByACLs(bool)
|
||||
}
|
||||
|
||||
// blockingQuery performs a blocking query if opts.GetMinQueryIndex is
|
||||
// greater than 0, otherwise performs a non-blocking query. Blocking queries will
|
||||
// block until responseMeta.Index is greater than opts.GetMinQueryIndex,
|
||||
// or opts.GetMaxQueryTime is reached. Non-blocking queries return immediately
|
||||
// after performing the query.
|
||||
//
|
||||
// If opts.GetRequireConsistent is true, blockingQuery will first verify it is
|
||||
// still the cluster leader before performing the query.
|
||||
//
|
||||
// The query function is expected to be a closure that has access to responseMeta
|
||||
// so that it can set the Index. The actual result of the query is opaque to blockingQuery.
|
||||
// If query function returns an error, the error is returned to the caller immediately.
|
||||
//
|
||||
// The query function must follow these rules:
|
||||
//
|
||||
// 1. to access data it must use the passed in state.Store.
|
||||
// 2. it must set the responseMeta.Index to an index greater than
|
||||
// opts.GetMinQueryIndex if the results return by the query have changed.
|
||||
// 3. any channels added to the memdb.WatchSet must unblock when the results
|
||||
// returned by the query have changed.
|
||||
//
|
||||
// To ensure optimal performance of the query, the query function should make a
|
||||
// best-effort attempt to follow these guidelines:
|
||||
//
|
||||
// 1. only set responseMeta.Index to an index greater than
|
||||
// opts.GetMinQueryIndex when the results returned by the query have changed.
|
||||
// 2. any channels added to the memdb.WatchSet should only unblock when the
|
||||
// results returned by the query have changed.
|
||||
func (s *Server) blockingQuery(
|
||||
opts blockingQueryOptions,
|
||||
responseMeta blockingQueryResponseMeta,
|
||||
query queryFn,
|
||||
) error {
|
||||
var ctx context.Context = &lib.StopChannelContext{StopCh: s.shutdownCh}
|
||||
|
||||
metrics.IncrCounter([]string{"rpc", "query"}, 1)
|
||||
|
||||
minQueryIndex := queryOpts.GetMinQueryIndex()
|
||||
minQueryIndex := opts.GetMinQueryIndex()
|
||||
// Perform a non-blocking query
|
||||
if minQueryIndex == 0 {
|
||||
if queryOpts.GetRequireConsistent() {
|
||||
if opts.GetRequireConsistent() {
|
||||
if err := s.consistentRead(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
var ws memdb.WatchSet
|
||||
err := fn(ws, s.fsm.State())
|
||||
s.setQueryMeta(queryMeta, queryOpts.GetToken())
|
||||
err := query(ws, s.fsm.State())
|
||||
s.setQueryMeta(responseMeta, opts.GetToken())
|
||||
return err
|
||||
}
|
||||
|
||||
timeout := s.rpcQueryTimeout(queryOpts.GetMaxQueryTime())
|
||||
timeout := s.rpcQueryTimeout(opts.GetMaxQueryTime())
|
||||
ctx, cancel := context.WithTimeout(ctx, timeout)
|
||||
defer cancel()
|
||||
|
||||
|
@ -948,7 +996,7 @@ func (s *Server) blockingQuery(queryOpts structs.QueryOptionsCompat, queryMeta s
|
|||
defer atomic.AddUint64(&s.queriesBlocking, ^uint64(0))
|
||||
|
||||
for {
|
||||
if queryOpts.GetRequireConsistent() {
|
||||
if opts.GetRequireConsistent() {
|
||||
if err := s.consistentRead(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -964,13 +1012,13 @@ func (s *Server) blockingQuery(queryOpts structs.QueryOptionsCompat, queryMeta s
|
|||
// whole state store is abandoned.
|
||||
ws.Add(state.AbandonCh())
|
||||
|
||||
err := fn(ws, state)
|
||||
s.setQueryMeta(queryMeta, queryOpts.GetToken())
|
||||
err := query(ws, state)
|
||||
s.setQueryMeta(responseMeta, opts.GetToken())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if queryMeta.GetIndex() > minQueryIndex {
|
||||
if responseMeta.GetIndex() > minQueryIndex {
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -992,7 +1040,7 @@ func (s *Server) blockingQuery(queryOpts structs.QueryOptionsCompat, queryMeta s
|
|||
// setQueryMeta is used to populate the QueryMeta data for an RPC call
|
||||
//
|
||||
// Note: This method must be called *after* filtering query results with ACLs.
|
||||
func (s *Server) setQueryMeta(m structs.QueryMetaCompat, token string) {
|
||||
func (s *Server) setQueryMeta(m blockingQueryResponseMeta, token string) {
|
||||
if s.IsLeader() {
|
||||
m.SetLastContact(0)
|
||||
m.SetKnownLeader(true)
|
||||
|
@ -1085,7 +1133,7 @@ func (s *Server) rpcQueryTimeout(queryTimeout time.Duration) time.Duration {
|
|||
// will only check whether it is blank or not). It's a safe assumption because
|
||||
// ResultsFilteredByACLs is only set to try when applying the already-resolved
|
||||
// token's policies.
|
||||
func maskResultsFilteredByACLs(token string, meta structs.QueryMetaCompat) {
|
||||
func maskResultsFilteredByACLs(token string, meta blockingQueryResponseMeta) {
|
||||
if token == "" {
|
||||
meta.SetResultsFilteredByACLs(false)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue