agent/consul: Add support for NotModified to two endpoints

A query made with AllowNotModifiedResponse and a MinIndex, where the
result has the same Index as MinIndex, will return an empty response
with QueryMeta.NotModified set to true.

Co-authored-by: Pierre Souchay <pierresouchay@users.noreply.github.com>
This commit is contained in:
Daniel Nephin 2020-06-30 14:11:43 -04:00
parent 41ba3fca9c
commit dfa8856e5f
3 changed files with 61 additions and 12 deletions

View File

@ -257,19 +257,21 @@ func (c *Catalog) ListNodes(args *structs.DCSpecificRequest, reply *structs.Inde
&args.QueryOptions, &args.QueryOptions,
&reply.QueryMeta, &reply.QueryMeta,
func(ws memdb.WatchSet, state *state.Store) error { func(ws memdb.WatchSet, state *state.Store) error {
var index uint64
var nodes structs.Nodes
var err error var err error
if len(args.NodeMetaFilters) > 0 { if len(args.NodeMetaFilters) > 0 {
index, nodes, err = state.NodesByMeta(ws, args.NodeMetaFilters) reply.Index, reply.Nodes, err = state.NodesByMeta(ws, args.NodeMetaFilters)
} else { } else {
index, nodes, err = state.Nodes(ws) reply.Index, reply.Nodes, err = state.Nodes(ws)
} }
if err != nil { if err != nil {
return err return err
} }
if isUnmodified(args.QueryOptions, reply.Index) {
reply.QueryMeta.NotModified = true
reply.Nodes = nil
return nil
}
reply.Index, reply.Nodes = index, nodes
if err := c.srv.filterACL(args.Token, reply); err != nil { if err := c.srv.filterACL(args.Token, reply); err != nil {
return err return err
} }
@ -284,13 +286,17 @@ func (c *Catalog) ListNodes(args *structs.DCSpecificRequest, reply *structs.Inde
}) })
} }
func isUnmodified(opts structs.QueryOptions, index uint64) bool {
return opts.AllowNotModifiedResponse && opts.MinQueryIndex > 0 && opts.MinQueryIndex == index
}
// ListServices is used to query the services in a DC // ListServices is used to query the services in a DC
func (c *Catalog) ListServices(args *structs.DCSpecificRequest, reply *structs.IndexedServices) error { func (c *Catalog) ListServices(args *structs.DCSpecificRequest, reply *structs.IndexedServices) error {
if done, err := c.srv.forward("Catalog.ListServices", args, args, reply); done { if done, err := c.srv.forward("Catalog.ListServices", args, args, reply); done {
return err return err
} }
(*reply).EnterpriseMeta = args.EnterpriseMeta reply.EnterpriseMeta = args.EnterpriseMeta
authz, err := c.srv.ResolveTokenAndDefaultMeta(args.Token, &args.EnterpriseMeta, nil) authz, err := c.srv.ResolveTokenAndDefaultMeta(args.Token, &args.EnterpriseMeta, nil)
if err != nil { if err != nil {
@ -305,19 +311,21 @@ func (c *Catalog) ListServices(args *structs.DCSpecificRequest, reply *structs.I
&args.QueryOptions, &args.QueryOptions,
&reply.QueryMeta, &reply.QueryMeta,
func(ws memdb.WatchSet, state *state.Store) error { func(ws memdb.WatchSet, state *state.Store) error {
var index uint64
var services structs.Services
var err error var err error
if len(args.NodeMetaFilters) > 0 { if len(args.NodeMetaFilters) > 0 {
index, services, err = state.ServicesByNodeMeta(ws, args.NodeMetaFilters, &args.EnterpriseMeta) reply.Index, reply.Services, err = state.ServicesByNodeMeta(ws, args.NodeMetaFilters, &args.EnterpriseMeta)
} else { } else {
index, services, err = state.Services(ws, &args.EnterpriseMeta) reply.Index, reply.Services, err = state.Services(ws, &args.EnterpriseMeta)
} }
if err != nil { if err != nil {
return err return err
} }
if isUnmodified(args.QueryOptions, reply.Index) {
reply.Services = nil
reply.QueryMeta.NotModified = true
return nil
}
reply.Index, reply.Services, reply.EnterpriseMeta = index, services, args.EnterpriseMeta
return c.srv.filterACLWithAuthorizer(authz, reply) return c.srv.filterACLWithAuthorizer(authz, reply)
}) })
} }

View File

@ -783,6 +783,21 @@ func TestCatalog_ListNodes(t *testing.T) {
if out.Nodes[0].Address != "127.0.0.1" { if out.Nodes[0].Address != "127.0.0.1" {
t.Fatalf("bad: %v", out) t.Fatalf("bad: %v", out)
} }
require.False(t, out.QueryMeta.NotModified)
t.Run("with option AllowNotModifiedResponse", func(t *testing.T) {
args.QueryOptions = structs.QueryOptions{
MinQueryIndex: out.QueryMeta.Index,
MaxQueryTime: 20 * time.Millisecond,
AllowNotModifiedResponse: true,
}
err := msgpackrpc.CallWithCodec(codec, "Catalog.ListNodes", &args, &out)
require.NoError(t, err)
require.Equal(t, out.Index, out.QueryMeta.Index)
require.Len(t, out.Nodes, 0)
require.True(t, out.QueryMeta.NotModified, "NotModified should be true")
})
} }
func TestCatalog_ListNodes_NodeMetaFilter(t *testing.T) { func TestCatalog_ListNodes_NodeMetaFilter(t *testing.T) {
@ -1322,6 +1337,21 @@ func TestCatalog_ListServices(t *testing.T) {
if out.Services["db"][0] != "primary" { if out.Services["db"][0] != "primary" {
t.Fatalf("bad: %v", out) t.Fatalf("bad: %v", out)
} }
require.False(t, out.QueryMeta.NotModified)
t.Run("with option AllowNotModifiedResponse", func(t *testing.T) {
args.QueryOptions = structs.QueryOptions{
MinQueryIndex: out.QueryMeta.Index,
MaxQueryTime: 20 * time.Millisecond,
AllowNotModifiedResponse: true,
}
err := msgpackrpc.CallWithCodec(codec, "Catalog.ListServices", &args, &out)
require.NoError(t, err)
require.Equal(t, out.Index, out.QueryMeta.Index)
require.Len(t, out.Services, 0)
require.True(t, out.QueryMeta.NotModified, "NotModified should be true")
})
} }
func TestCatalog_ListServices_NodeMetaFilter(t *testing.T) { func TestCatalog_ListServices_NodeMetaFilter(t *testing.T) {

View File

@ -212,6 +212,11 @@ type QueryOptions struct {
// Filter specifies the go-bexpr filter expression to be used for // Filter specifies the go-bexpr filter expression to be used for
// filtering the data prior to returning a response // filtering the data prior to returning a response
Filter string Filter string
// AllowNotModifiedResponse indicates that if the MinIndex matches the
// QueryMeta.Index, the response can be left empty and QueryMeta.NotModified
// will be set to true to indicate the result of the query has not changed.
AllowNotModifiedResponse bool
} }
// IsRead is always true for QueryOption. // IsRead is always true for QueryOption.
@ -268,7 +273,7 @@ func (w *WriteRequest) SetTokenSecret(s string) {
// QueryMeta allows a query response to include potentially // QueryMeta allows a query response to include potentially
// useful metadata about a query // useful metadata about a query
type QueryMeta struct { type QueryMeta struct {
// This is the index associated with the read // Index in the raft log of the latest item returned by the query.
Index uint64 Index uint64
// If AllowStale is used, this is time elapsed since // If AllowStale is used, this is time elapsed since
@ -283,6 +288,12 @@ type QueryMeta struct {
// Having `discovery_max_stale` on the agent can affect whether // Having `discovery_max_stale` on the agent can affect whether
// the request was served by a leader. // the request was served by a leader.
ConsistencyLevel string ConsistencyLevel string
// NotModified is true when the Index of the query is the same value as the
// requested MinIndex. It indicates that the entity has not been modified.
// When NotModified is true, the response will not contain the result of
// the query.
NotModified bool
} }
// RegisterRequest is used for the Catalog.Register endpoint // RegisterRequest is used for the Catalog.Register endpoint