Merge pull request #12110 from hashicorp/dnephin/blocking-queries-not-found
rpc: make blocking queries for non-existent items more efficient
This commit is contained in:
commit
2d5254a73b
|
@ -0,0 +1,3 @@
|
||||||
|
```release-note:improvement
|
||||||
|
rpc: improve blocking queries for items that do not exist, by continuing to block until they exist (or the timeout).
|
||||||
|
```
|
|
@ -322,6 +322,9 @@ func (a *ACL) TokenRead(args *structs.ACLTokenGetRequest, reply *structs.ACLToke
|
||||||
|
|
||||||
reply.Index, reply.Token = index, token
|
reply.Index, reply.Token = index, token
|
||||||
reply.SourceDatacenter = args.Datacenter
|
reply.SourceDatacenter = args.Datacenter
|
||||||
|
if token == nil {
|
||||||
|
return errNotFound
|
||||||
|
}
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
@ -1045,6 +1048,9 @@ func (a *ACL) PolicyRead(args *structs.ACLPolicyGetRequest, reply *structs.ACLPo
|
||||||
}
|
}
|
||||||
|
|
||||||
reply.Index, reply.Policy = index, policy
|
reply.Index, reply.Policy = index, policy
|
||||||
|
if policy == nil {
|
||||||
|
return errNotFound
|
||||||
|
}
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
@ -1428,6 +1434,9 @@ func (a *ACL) RoleRead(args *structs.ACLRoleGetRequest, reply *structs.ACLRoleRe
|
||||||
}
|
}
|
||||||
|
|
||||||
reply.Index, reply.Role = index, role
|
reply.Index, reply.Role = index, role
|
||||||
|
if role == nil {
|
||||||
|
return errNotFound
|
||||||
|
}
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
@ -1795,12 +1804,14 @@ func (a *ACL) BindingRuleRead(args *structs.ACLBindingRuleGetRequest, reply *str
|
||||||
return a.srv.blockingQuery(&args.QueryOptions, &reply.QueryMeta,
|
return a.srv.blockingQuery(&args.QueryOptions, &reply.QueryMeta,
|
||||||
func(ws memdb.WatchSet, state *state.Store) error {
|
func(ws memdb.WatchSet, state *state.Store) error {
|
||||||
index, rule, err := state.ACLBindingRuleGetByID(ws, args.BindingRuleID, &args.EnterpriseMeta)
|
index, rule, err := state.ACLBindingRuleGetByID(ws, args.BindingRuleID, &args.EnterpriseMeta)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
reply.Index, reply.BindingRule = index, rule
|
reply.Index, reply.BindingRule = index, rule
|
||||||
|
if rule == nil {
|
||||||
|
return errNotFound
|
||||||
|
}
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
@ -2052,16 +2063,16 @@ func (a *ACL) AuthMethodRead(args *structs.ACLAuthMethodGetRequest, reply *struc
|
||||||
return a.srv.blockingQuery(&args.QueryOptions, &reply.QueryMeta,
|
return a.srv.blockingQuery(&args.QueryOptions, &reply.QueryMeta,
|
||||||
func(ws memdb.WatchSet, state *state.Store) error {
|
func(ws memdb.WatchSet, state *state.Store) error {
|
||||||
index, method, err := state.ACLAuthMethodGetByName(ws, args.AuthMethodName, &args.EnterpriseMeta)
|
index, method, err := state.ACLAuthMethodGetByName(ws, args.AuthMethodName, &args.EnterpriseMeta)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if method != nil {
|
reply.Index, reply.AuthMethod = index, method
|
||||||
_ = a.enterpriseAuthMethodTypeValidation(method.Type)
|
if method == nil {
|
||||||
|
return errNotFound
|
||||||
}
|
}
|
||||||
|
|
||||||
reply.Index, reply.AuthMethod = index, method
|
_ = a.enterpriseAuthMethodTypeValidation(method.Type)
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
|
@ -207,7 +207,7 @@ func (c *ConfigEntry) Get(args *structs.ConfigEntryQuery, reply *structs.ConfigE
|
||||||
|
|
||||||
reply.Index = index
|
reply.Index = index
|
||||||
if entry == nil {
|
if entry == nil {
|
||||||
return nil
|
return errNotFound
|
||||||
}
|
}
|
||||||
|
|
||||||
reply.Entry = entry
|
reply.Entry = entry
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package consul
|
package consul
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
"sort"
|
"sort"
|
||||||
|
@ -9,6 +10,7 @@ import (
|
||||||
|
|
||||||
msgpackrpc "github.com/hashicorp/consul-net-rpc/net-rpc-msgpackrpc"
|
msgpackrpc "github.com/hashicorp/consul-net-rpc/net-rpc-msgpackrpc"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
"golang.org/x/sync/errgroup"
|
||||||
|
|
||||||
"github.com/hashicorp/consul/acl"
|
"github.com/hashicorp/consul/acl"
|
||||||
"github.com/hashicorp/consul/agent/structs"
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
|
@ -302,6 +304,71 @@ func TestConfigEntry_Get(t *testing.T) {
|
||||||
require.Equal(t, structs.ServiceDefaults, serviceConf.Kind)
|
require.Equal(t, structs.ServiceDefaults, serviceConf.Kind)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestConfigEntry_Get_BlockOnNonExistent(t *testing.T) {
|
||||||
|
if testing.Short() {
|
||||||
|
t.Skip("too slow for testing.Short")
|
||||||
|
}
|
||||||
|
|
||||||
|
_, s1 := testServerWithConfig(t)
|
||||||
|
codec := rpcClient(t, s1)
|
||||||
|
store := s1.fsm.State()
|
||||||
|
|
||||||
|
entry := &structs.ServiceConfigEntry{
|
||||||
|
Kind: structs.ServiceDefaults,
|
||||||
|
Name: "alpha",
|
||||||
|
}
|
||||||
|
require.NoError(t, store.EnsureConfigEntry(1, entry))
|
||||||
|
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
var count int
|
||||||
|
|
||||||
|
g, ctx := errgroup.WithContext(ctx)
|
||||||
|
g.Go(func() error {
|
||||||
|
args := structs.ConfigEntryQuery{
|
||||||
|
Kind: structs.ServiceDefaults,
|
||||||
|
Name: "does-not-exist",
|
||||||
|
}
|
||||||
|
args.QueryOptions.MaxQueryTime = time.Second
|
||||||
|
|
||||||
|
for ctx.Err() == nil {
|
||||||
|
var out structs.ConfigEntryResponse
|
||||||
|
|
||||||
|
err := msgpackrpc.CallWithCodec(codec, "ConfigEntry.Get", &args, &out)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
t.Log("blocking query index", out.QueryMeta.Index, out.Entry)
|
||||||
|
count++
|
||||||
|
args.QueryOptions.MinQueryIndex = out.QueryMeta.Index
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
|
g.Go(func() error {
|
||||||
|
for i := uint64(0); i < 200; i++ {
|
||||||
|
time.Sleep(5 * time.Millisecond)
|
||||||
|
entry := &structs.ServiceConfigEntry{
|
||||||
|
Kind: structs.ServiceDefaults,
|
||||||
|
Name: fmt.Sprintf("other%d", i),
|
||||||
|
}
|
||||||
|
if err := store.EnsureConfigEntry(i+2, entry); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
cancel()
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
|
require.NoError(t, g.Wait())
|
||||||
|
// The test is a bit racy because of the timing of the two goroutines, so
|
||||||
|
// we relax the check for the count to be within a small range.
|
||||||
|
if count < 2 || count > 3 {
|
||||||
|
t.Fatalf("expected count to be 2 or 3, got %d", count)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestConfigEntry_Get_ACLDeny(t *testing.T) {
|
func TestConfigEntry_Get_ACLDeny(t *testing.T) {
|
||||||
if testing.Short() {
|
if testing.Short() {
|
||||||
t.Skip("too slow for testing.Short")
|
t.Skip("too slow for testing.Short")
|
||||||
|
|
|
@ -267,6 +267,7 @@ func (c *Coordinate) Node(args *structs.NodeSpecificRequest, reply *structs.Inde
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
reply.Index, reply.Coordinates = index, coords
|
reply.Index, reply.Coordinates = index, coords
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
|
@ -124,7 +124,7 @@ func (c *FederationState) Get(args *structs.FederationStateQuery, reply *structs
|
||||||
|
|
||||||
reply.Index = index
|
reply.Index = index
|
||||||
if fedState == nil {
|
if fedState == nil {
|
||||||
return nil
|
return errNotFound
|
||||||
}
|
}
|
||||||
|
|
||||||
reply.State = fedState
|
reply.State = fedState
|
||||||
|
|
|
@ -160,18 +160,13 @@ func (k *KVS) Get(args *structs.KeyRequest, reply *structs.IndexedDirEntries) er
|
||||||
}
|
}
|
||||||
|
|
||||||
if ent == nil {
|
if ent == nil {
|
||||||
// Must provide non-zero index to prevent blocking
|
|
||||||
// Index 1 is impossible anyways (due to Raft internals)
|
|
||||||
if index == 0 {
|
|
||||||
reply.Index = 1
|
|
||||||
} else {
|
|
||||||
reply.Index = index
|
reply.Index = index
|
||||||
}
|
|
||||||
reply.Entries = nil
|
reply.Entries = nil
|
||||||
} else {
|
return errNotFound
|
||||||
|
}
|
||||||
|
|
||||||
reply.Index = ent.ModifyIndex
|
reply.Index = ent.ModifyIndex
|
||||||
reply.Entries = structs.DirEntries{ent}
|
reply.Entries = structs.DirEntries{ent}
|
||||||
}
|
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
|
@ -983,6 +983,9 @@ func (s *Server) blockingQuery(
|
||||||
var ws memdb.WatchSet
|
var ws memdb.WatchSet
|
||||||
err := query(ws, s.fsm.State())
|
err := query(ws, s.fsm.State())
|
||||||
s.setQueryMeta(responseMeta, opts.GetToken())
|
s.setQueryMeta(responseMeta, opts.GetToken())
|
||||||
|
if errors.Is(err, errNotFound) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -995,6 +998,8 @@ func (s *Server) blockingQuery(
|
||||||
// decrement the count when the function returns.
|
// decrement the count when the function returns.
|
||||||
defer atomic.AddUint64(&s.queriesBlocking, ^uint64(0))
|
defer atomic.AddUint64(&s.queriesBlocking, ^uint64(0))
|
||||||
|
|
||||||
|
var notFound bool
|
||||||
|
|
||||||
for {
|
for {
|
||||||
if opts.GetRequireConsistent() {
|
if opts.GetRequireConsistent() {
|
||||||
if err := s.consistentRead(); err != nil {
|
if err := s.consistentRead(); err != nil {
|
||||||
|
@ -1014,7 +1019,15 @@ func (s *Server) blockingQuery(
|
||||||
|
|
||||||
err := query(ws, state)
|
err := query(ws, state)
|
||||||
s.setQueryMeta(responseMeta, opts.GetToken())
|
s.setQueryMeta(responseMeta, opts.GetToken())
|
||||||
if err != nil {
|
switch {
|
||||||
|
case errors.Is(err, errNotFound):
|
||||||
|
if notFound {
|
||||||
|
// query result has not changed
|
||||||
|
minQueryIndex = responseMeta.GetIndex()
|
||||||
|
}
|
||||||
|
|
||||||
|
notFound = true
|
||||||
|
case err != nil:
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1037,6 +1050,8 @@ func (s *Server) blockingQuery(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var errNotFound = fmt.Errorf("no data found for query")
|
||||||
|
|
||||||
// setQueryMeta is used to populate the QueryMeta data for an RPC call
|
// setQueryMeta is used to populate the QueryMeta data for an RPC call
|
||||||
//
|
//
|
||||||
// Note: This method must be called *after* filtering query results with ACLs.
|
// Note: This method must be called *after* filtering query results with ACLs.
|
||||||
|
|
|
@ -227,11 +227,9 @@ func (m *MockSink) Close() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestRPC_blockingQuery(t *testing.T) {
|
func TestServer_blockingQuery(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
dir, s := testServer(t)
|
_, s := testServerWithConfig(t)
|
||||||
defer os.RemoveAll(dir)
|
|
||||||
defer s.Shutdown()
|
|
||||||
|
|
||||||
// Perform a non-blocking query. Note that it's significant that the meta has
|
// Perform a non-blocking query. Note that it's significant that the meta has
|
||||||
// a zero index in response - the implied opts.MinQueryIndex is also zero but
|
// a zero index in response - the implied opts.MinQueryIndex is also zero but
|
||||||
|
@ -391,6 +389,93 @@ func TestRPC_blockingQuery(t *testing.T) {
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.True(t, meta.ResultsFilteredByACLs, "ResultsFilteredByACLs should be honored for authenticated calls")
|
require.True(t, meta.ResultsFilteredByACLs, "ResultsFilteredByACLs should be honored for authenticated calls")
|
||||||
})
|
})
|
||||||
|
|
||||||
|
t.Run("non-blocking query for item that does not exist", func(t *testing.T) {
|
||||||
|
opts := structs.QueryOptions{}
|
||||||
|
meta := structs.QueryMeta{}
|
||||||
|
calls := 0
|
||||||
|
fn := func(_ memdb.WatchSet, _ *state.Store) error {
|
||||||
|
calls++
|
||||||
|
return errNotFound
|
||||||
|
}
|
||||||
|
|
||||||
|
err := s.blockingQuery(&opts, &meta, fn)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, 1, calls)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("blocking query for item that does not exist", func(t *testing.T) {
|
||||||
|
opts := structs.QueryOptions{MinQueryIndex: 3, MaxQueryTime: 100 * time.Millisecond}
|
||||||
|
meta := structs.QueryMeta{}
|
||||||
|
calls := 0
|
||||||
|
fn := func(ws memdb.WatchSet, _ *state.Store) error {
|
||||||
|
calls++
|
||||||
|
if calls == 1 {
|
||||||
|
meta.Index = 3
|
||||||
|
|
||||||
|
ch := make(chan struct{})
|
||||||
|
close(ch)
|
||||||
|
ws.Add(ch)
|
||||||
|
return errNotFound
|
||||||
|
}
|
||||||
|
meta.Index = 5
|
||||||
|
return errNotFound
|
||||||
|
}
|
||||||
|
|
||||||
|
err := s.blockingQuery(&opts, &meta, fn)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, 2, calls)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("blocking query for item that existed and is removed", func(t *testing.T) {
|
||||||
|
opts := structs.QueryOptions{MinQueryIndex: 3, MaxQueryTime: 100 * time.Millisecond}
|
||||||
|
meta := structs.QueryMeta{}
|
||||||
|
calls := 0
|
||||||
|
fn := func(ws memdb.WatchSet, _ *state.Store) error {
|
||||||
|
calls++
|
||||||
|
if calls == 1 {
|
||||||
|
meta.Index = 3
|
||||||
|
|
||||||
|
ch := make(chan struct{})
|
||||||
|
close(ch)
|
||||||
|
ws.Add(ch)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
meta.Index = 5
|
||||||
|
return errNotFound
|
||||||
|
}
|
||||||
|
|
||||||
|
start := time.Now()
|
||||||
|
err := s.blockingQuery(&opts, &meta, fn)
|
||||||
|
require.True(t, time.Since(start) < opts.MaxQueryTime, "query timed out")
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, 2, calls)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("blocking query for non-existent item that is created", func(t *testing.T) {
|
||||||
|
opts := structs.QueryOptions{MinQueryIndex: 3, MaxQueryTime: 100 * time.Millisecond}
|
||||||
|
meta := structs.QueryMeta{}
|
||||||
|
calls := 0
|
||||||
|
fn := func(ws memdb.WatchSet, _ *state.Store) error {
|
||||||
|
calls++
|
||||||
|
if calls == 1 {
|
||||||
|
meta.Index = 3
|
||||||
|
|
||||||
|
ch := make(chan struct{})
|
||||||
|
close(ch)
|
||||||
|
ws.Add(ch)
|
||||||
|
return errNotFound
|
||||||
|
}
|
||||||
|
meta.Index = 5
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
start := time.Now()
|
||||||
|
err := s.blockingQuery(&opts, &meta, fn)
|
||||||
|
require.True(t, time.Since(start) < opts.MaxQueryTime, "query timed out")
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, 2, calls)
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestRPC_ReadyForConsistentReads(t *testing.T) {
|
func TestRPC_ReadyForConsistentReads(t *testing.T) {
|
||||||
|
|
|
@ -198,6 +198,7 @@ func (s *Session) Get(args *structs.SessionSpecificRequest,
|
||||||
reply.Sessions = structs.Sessions{session}
|
reply.Sessions = structs.Sessions{session}
|
||||||
} else {
|
} else {
|
||||||
reply.Sessions = nil
|
reply.Sessions = nil
|
||||||
|
return errNotFound
|
||||||
}
|
}
|
||||||
s.srv.filterACLWithAuthorizer(authz, reply)
|
s.srv.filterACLWithAuthorizer(authz, reply)
|
||||||
return nil
|
return nil
|
||||||
|
|
Loading…
Reference in New Issue