Merge pull request #8215 from hashicorp/dnephin/support-not-modified-response-server
agent/consul: Add support for NotModified to two endpoints
This commit is contained in:
commit
24c6bcfbe8
|
@ -257,19 +257,21 @@ func (c *Catalog) ListNodes(args *structs.DCSpecificRequest, reply *structs.Inde
|
|||
&args.QueryOptions,
|
||||
&reply.QueryMeta,
|
||||
func(ws memdb.WatchSet, state *state.Store) error {
|
||||
var index uint64
|
||||
var nodes structs.Nodes
|
||||
var err error
|
||||
if len(args.NodeMetaFilters) > 0 {
|
||||
index, nodes, err = state.NodesByMeta(ws, args.NodeMetaFilters)
|
||||
reply.Index, reply.Nodes, err = state.NodesByMeta(ws, args.NodeMetaFilters)
|
||||
} else {
|
||||
index, nodes, err = state.Nodes(ws)
|
||||
reply.Index, reply.Nodes, err = state.Nodes(ws)
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if isUnmodified(args.QueryOptions, reply.Index) {
|
||||
reply.QueryMeta.NotModified = true
|
||||
reply.Nodes = nil
|
||||
return nil
|
||||
}
|
||||
|
||||
reply.Index, reply.Nodes = index, nodes
|
||||
if err := c.srv.filterACL(args.Token, reply); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -284,13 +286,17 @@ func (c *Catalog) ListNodes(args *structs.DCSpecificRequest, reply *structs.Inde
|
|||
})
|
||||
}
|
||||
|
||||
func isUnmodified(opts structs.QueryOptions, index uint64) bool {
|
||||
return opts.AllowNotModifiedResponse && opts.MinQueryIndex > 0 && opts.MinQueryIndex == index
|
||||
}
|
||||
|
||||
// ListServices is used to query the services in a DC
|
||||
func (c *Catalog) ListServices(args *structs.DCSpecificRequest, reply *structs.IndexedServices) error {
|
||||
if done, err := c.srv.forward("Catalog.ListServices", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
|
||||
(*reply).EnterpriseMeta = args.EnterpriseMeta
|
||||
reply.EnterpriseMeta = args.EnterpriseMeta
|
||||
|
||||
authz, err := c.srv.ResolveTokenAndDefaultMeta(args.Token, &args.EnterpriseMeta, nil)
|
||||
if err != nil {
|
||||
|
@ -305,19 +311,21 @@ func (c *Catalog) ListServices(args *structs.DCSpecificRequest, reply *structs.I
|
|||
&args.QueryOptions,
|
||||
&reply.QueryMeta,
|
||||
func(ws memdb.WatchSet, state *state.Store) error {
|
||||
var index uint64
|
||||
var services structs.Services
|
||||
var err error
|
||||
if len(args.NodeMetaFilters) > 0 {
|
||||
index, services, err = state.ServicesByNodeMeta(ws, args.NodeMetaFilters, &args.EnterpriseMeta)
|
||||
reply.Index, reply.Services, err = state.ServicesByNodeMeta(ws, args.NodeMetaFilters, &args.EnterpriseMeta)
|
||||
} else {
|
||||
index, services, err = state.Services(ws, &args.EnterpriseMeta)
|
||||
reply.Index, reply.Services, err = state.Services(ws, &args.EnterpriseMeta)
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if isUnmodified(args.QueryOptions, reply.Index) {
|
||||
reply.Services = nil
|
||||
reply.QueryMeta.NotModified = true
|
||||
return nil
|
||||
}
|
||||
|
||||
reply.Index, reply.Services, reply.EnterpriseMeta = index, services, args.EnterpriseMeta
|
||||
return c.srv.filterACLWithAuthorizer(authz, reply)
|
||||
})
|
||||
}
|
||||
|
|
|
@ -783,6 +783,21 @@ func TestCatalog_ListNodes(t *testing.T) {
|
|||
if out.Nodes[0].Address != "127.0.0.1" {
|
||||
t.Fatalf("bad: %v", out)
|
||||
}
|
||||
require.False(t, out.QueryMeta.NotModified)
|
||||
|
||||
t.Run("with option AllowNotModifiedResponse", func(t *testing.T) {
|
||||
args.QueryOptions = structs.QueryOptions{
|
||||
MinQueryIndex: out.QueryMeta.Index,
|
||||
MaxQueryTime: 20 * time.Millisecond,
|
||||
AllowNotModifiedResponse: true,
|
||||
}
|
||||
err := msgpackrpc.CallWithCodec(codec, "Catalog.ListNodes", &args, &out)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Equal(t, out.Index, out.QueryMeta.Index)
|
||||
require.Len(t, out.Nodes, 0)
|
||||
require.True(t, out.QueryMeta.NotModified, "NotModified should be true")
|
||||
})
|
||||
}
|
||||
|
||||
func TestCatalog_ListNodes_NodeMetaFilter(t *testing.T) {
|
||||
|
@ -1322,6 +1337,21 @@ func TestCatalog_ListServices(t *testing.T) {
|
|||
if out.Services["db"][0] != "primary" {
|
||||
t.Fatalf("bad: %v", out)
|
||||
}
|
||||
require.False(t, out.QueryMeta.NotModified)
|
||||
|
||||
t.Run("with option AllowNotModifiedResponse", func(t *testing.T) {
|
||||
args.QueryOptions = structs.QueryOptions{
|
||||
MinQueryIndex: out.QueryMeta.Index,
|
||||
MaxQueryTime: 20 * time.Millisecond,
|
||||
AllowNotModifiedResponse: true,
|
||||
}
|
||||
err := msgpackrpc.CallWithCodec(codec, "Catalog.ListServices", &args, &out)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Equal(t, out.Index, out.QueryMeta.Index)
|
||||
require.Len(t, out.Services, 0)
|
||||
require.True(t, out.QueryMeta.NotModified, "NotModified should be true")
|
||||
})
|
||||
}
|
||||
|
||||
func TestCatalog_ListServices_NodeMetaFilter(t *testing.T) {
|
||||
|
|
|
@ -212,6 +212,11 @@ type QueryOptions struct {
|
|||
// Filter specifies the go-bexpr filter expression to be used for
|
||||
// filtering the data prior to returning a response
|
||||
Filter string
|
||||
|
||||
// AllowNotModifiedResponse indicates that if the MinIndex matches the
|
||||
// QueryMeta.Index, the response can be left empty and QueryMeta.NotModified
|
||||
// will be set to true to indicate the result of the query has not changed.
|
||||
AllowNotModifiedResponse bool
|
||||
}
|
||||
|
||||
// IsRead is always true for QueryOption.
|
||||
|
@ -268,7 +273,7 @@ func (w *WriteRequest) SetTokenSecret(s string) {
|
|||
// QueryMeta allows a query response to include potentially
|
||||
// useful metadata about a query
|
||||
type QueryMeta struct {
|
||||
// This is the index associated with the read
|
||||
// Index in the raft log of the latest item returned by the query.
|
||||
Index uint64
|
||||
|
||||
// If AllowStale is used, this is time elapsed since
|
||||
|
@ -283,6 +288,12 @@ type QueryMeta struct {
|
|||
// Having `discovery_max_stale` on the agent can affect whether
|
||||
// the request was served by a leader.
|
||||
ConsistencyLevel string
|
||||
|
||||
// NotModified is true when the Index of the query is the same value as the
|
||||
// requested MinIndex. It indicates that the entity has not been modified.
|
||||
// When NotModified is true, the response will not contain the result of
|
||||
// the query.
|
||||
NotModified bool
|
||||
}
|
||||
|
||||
// RegisterRequest is used for the Catalog.Register endpoint
|
||||
|
|
Loading…
Reference in New Issue