Add pagination, filtering and sort to more API endpoints (#12186)

This commit is contained in:
Luiz Aoqui 2022-03-08 20:54:17 -05:00 committed by GitHub
parent e096a0a5ab
commit ab8ce87bba
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
33 changed files with 2027 additions and 508 deletions

7
.changelog/12186.txt Normal file
View File

@ -0,0 +1,7 @@
```release-note:improvement
api: Add support for filtering, sorting, and pagination to the ACL tokens and allocations list endpoint
```
```release-note:improvement
api: Add support for filtering and pagination to the jobs and volumes list endpoint
```

View File

@ -82,10 +82,10 @@ type QueryOptions struct {
// previous response.
NextToken string
// Ascending is used to have results sorted in ascending chronological order.
// Reverse is used to reverse the default order of list results.
//
// Currently only supported by evaluations.List and deployments.list endpoints.
Ascending bool
// Currently only supported by specific endpoints.
Reverse bool
// ctx is an optional context pass through to the underlying HTTP
// request layer. Use Context() and WithContext() to manage this.
@ -605,8 +605,8 @@ func (r *request) setQueryOptions(q *QueryOptions) {
if q.NextToken != "" {
r.params.Set("next_token", q.NextToken)
}
if q.Ascending {
r.params.Set("ascending", "true")
if q.Reverse {
r.params.Set("reverse", "true")
}
for k, v := range q.Params {
r.params.Set(k, v)

View File

@ -181,7 +181,7 @@ func TestSetQueryOptions(t *testing.T) {
WaitIndex: 1000,
WaitTime: 100 * time.Second,
AuthToken: "foobar",
Ascending: true,
Reverse: true,
}
r.setQueryOptions(q)
@ -199,7 +199,7 @@ func TestSetQueryOptions(t *testing.T) {
try("stale", "") // should not be present
try("index", "1000")
try("wait", "100000ms")
try("ascending", "true")
try("reverse", "true")
}
func TestQueryOptionsContext(t *testing.T) {

View File

@ -788,7 +788,7 @@ func (s *HTTPServer) parse(resp http.ResponseWriter, req *http.Request, r *strin
parseNamespace(req, &b.Namespace)
parsePagination(req, b)
parseFilter(req, b)
parseAscending(req, b)
parseReverse(req, b)
return parseWait(resp, req, b)
}
@ -814,10 +814,10 @@ func parseFilter(req *http.Request, b *structs.QueryOptions) {
}
}
// parseAscending parses the ascending query parameter for QueryOptions
func parseAscending(req *http.Request, b *structs.QueryOptions) {
// parseReverse parses the reverse query parameter for QueryOptions
func parseReverse(req *http.Request, b *structs.QueryOptions) {
query := req.URL.Query()
b.Ascending = query.Get("ascending") == "true"
b.Reverse = query.Get("reverse") == "true"
}
// parseWriteRequest is a convenience method for endpoints that need to parse a

View File

@ -185,28 +185,28 @@ func (f *FSMHelper) StateAsMap() map[string][]interface{} {
}
// StateAsMap returns a json-able representation of the state
func StateAsMap(state *state.StateStore) map[string][]interface{} {
func StateAsMap(store *state.StateStore) map[string][]interface{} {
result := map[string][]interface{}{
"ACLPolicies": toArray(state.ACLPolicies(nil)),
"ACLTokens": toArray(state.ACLTokens(nil)),
"Allocs": toArray(state.Allocs(nil)),
"CSIPlugins": toArray(state.CSIPlugins(nil)),
"CSIVolumes": toArray(state.CSIVolumes(nil)),
"Deployments": toArray(state.Deployments(nil, false)),
"Evals": toArray(state.Evals(nil, false)),
"Indexes": toArray(state.Indexes()),
"JobSummaries": toArray(state.JobSummaries(nil)),
"JobVersions": toArray(state.JobVersions(nil)),
"Jobs": toArray(state.Jobs(nil)),
"Nodes": toArray(state.Nodes(nil)),
"PeriodicLaunches": toArray(state.PeriodicLaunches(nil)),
"SITokenAccessors": toArray(state.SITokenAccessors(nil)),
"ScalingEvents": toArray(state.ScalingEvents(nil)),
"ScalingPolicies": toArray(state.ScalingPolicies(nil)),
"VaultAccessors": toArray(state.VaultAccessors(nil)),
"ACLPolicies": toArray(store.ACLPolicies(nil)),
"ACLTokens": toArray(store.ACLTokens(nil, state.SortDefault)),
"Allocs": toArray(store.Allocs(nil, state.SortDefault)),
"CSIPlugins": toArray(store.CSIPlugins(nil)),
"CSIVolumes": toArray(store.CSIVolumes(nil)),
"Deployments": toArray(store.Deployments(nil, state.SortDefault)),
"Evals": toArray(store.Evals(nil, state.SortDefault)),
"Indexes": toArray(store.Indexes()),
"JobSummaries": toArray(store.JobSummaries(nil)),
"JobVersions": toArray(store.JobVersions(nil)),
"Jobs": toArray(store.Jobs(nil)),
"Nodes": toArray(store.Nodes(nil)),
"PeriodicLaunches": toArray(store.PeriodicLaunches(nil)),
"SITokenAccessors": toArray(store.SITokenAccessors(nil)),
"ScalingEvents": toArray(store.ScalingEvents(nil)),
"ScalingPolicies": toArray(store.ScalingPolicies(nil)),
"VaultAccessors": toArray(store.VaultAccessors(nil)),
}
insertEnterpriseState(result, state)
insertEnterpriseState(result, store)
return result

View File

@ -3,6 +3,7 @@ package nomad
import (
"fmt"
"io/ioutil"
"net/http"
"os"
"path/filepath"
"strings"
@ -14,6 +15,7 @@ import (
policy "github.com/hashicorp/nomad/acl"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/state/paginator"
"github.com/hashicorp/nomad/nomad/structs"
)
@ -652,6 +654,7 @@ func (a *ACL) ListTokens(args *structs.ACLTokenListRequest, reply *structs.ACLTo
}
// Setup the blocking query
sort := state.SortOption(args.Reverse)
opts := blockingOptions{
queryOpts: &args.QueryOptions,
queryMeta: &reply.QueryMeta,
@ -659,34 +662,59 @@ func (a *ACL) ListTokens(args *structs.ACLTokenListRequest, reply *structs.ACLTo
// Iterate over all the tokens
var err error
var iter memdb.ResultIterator
var opts paginator.StructsTokenizerOptions
if prefix := args.QueryOptions.Prefix; prefix != "" {
iter, err = state.ACLTokenByAccessorIDPrefix(ws, prefix)
opts = paginator.StructsTokenizerOptions{
WithID: true,
}
} else if args.GlobalOnly {
iter, err = state.ACLTokensByGlobal(ws, true)
opts = paginator.StructsTokenizerOptions{
WithID: true,
}
} else {
iter, err = state.ACLTokens(ws)
iter, err = state.ACLTokens(ws, sort)
opts = paginator.StructsTokenizerOptions{
WithCreateIndex: true,
WithID: true,
}
}
if err != nil {
return err
}
// Convert all the tokens to a list stub
reply.Tokens = nil
for {
raw := iter.Next()
if raw == nil {
break
}
token := raw.(*structs.ACLToken)
reply.Tokens = append(reply.Tokens, token.Stub())
tokenizer := paginator.NewStructsTokenizer(iter, opts)
var tokens []*structs.ACLTokenListStub
paginator, err := paginator.NewPaginator(iter, tokenizer, nil, args.QueryOptions,
func(raw interface{}) error {
token := raw.(*structs.ACLToken)
tokens = append(tokens, token.Stub())
return nil
})
if err != nil {
return structs.NewErrRPCCodedf(
http.StatusBadRequest, "failed to create result paginator: %v", err)
}
nextToken, err := paginator.Page()
if err != nil {
return structs.NewErrRPCCodedf(
http.StatusBadRequest, "failed to read result page: %v", err)
}
reply.QueryMeta.NextToken = nextToken
reply.Tokens = tokens
// Use the last index that affected the token table
index, err := state.Index("acl_token")
if err != nil {
return err
}
reply.Index = index
return nil
}}
return a.srv.blockingRPC(&opts)

View File

@ -919,6 +919,286 @@ func TestACLEndpoint_ListTokens(t *testing.T) {
assert.Equal(t, 2, len(resp3.Tokens))
}
func TestACLEndpoint_ListTokens_PaginationFiltering(t *testing.T) {
t.Parallel()
s1, cleanupS1 := TestServer(t, func(c *Config) {
c.ACLEnabled = true
})
defer cleanupS1()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
// create a set of ACL tokens. these are in the order that the state store
// will return them from the iterator (sorted by key) for ease of writing
// tests
mocks := []struct {
ids []string
typ string
}{
{ids: []string{"aaaa1111-3350-4b4b-d185-0e1992ed43e9"}, typ: "management"}, // 0
{ids: []string{"aaaaaa22-3350-4b4b-d185-0e1992ed43e9"}}, // 1
{ids: []string{"aaaaaa33-3350-4b4b-d185-0e1992ed43e9"}}, // 2
{ids: []string{"aaaaaaaa-3350-4b4b-d185-0e1992ed43e9"}}, // 3
{ids: []string{"aaaaaabb-3350-4b4b-d185-0e1992ed43e9"}}, // 4
{ids: []string{"aaaaaacc-3350-4b4b-d185-0e1992ed43e9"}}, // 5
{ids: []string{"aaaaaadd-3350-4b4b-d185-0e1992ed43e9"}}, // 6
{ids: []string{"00000111-3350-4b4b-d185-0e1992ed43e9"}}, // 7
{ids: []string{ // 8
"00000222-3350-4b4b-d185-0e1992ed43e9",
"00000333-3350-4b4b-d185-0e1992ed43e9",
}},
{}, // 9, index missing
{ids: []string{"bbbb1111-3350-4b4b-d185-0e1992ed43e9"}}, // 10
}
state := s1.fsm.State()
var bootstrapToken string
for i, m := range mocks {
tokensInTx := []*structs.ACLToken{}
for _, id := range m.ids {
token := mock.ACLToken()
token.AccessorID = id
token.Type = m.typ
tokensInTx = append(tokensInTx, token)
}
index := 1000 + uint64(i)
// bootstrap cluster with the first token
if i == 0 {
token := tokensInTx[0]
bootstrapToken = token.SecretID
err := s1.State().BootstrapACLTokens(structs.MsgTypeTestSetup, index, 0, token)
require.NoError(t, err)
err = state.UpsertACLTokens(structs.MsgTypeTestSetup, index, tokensInTx[1:])
require.NoError(t, err)
} else {
err := state.UpsertACLTokens(structs.MsgTypeTestSetup, index, tokensInTx)
require.NoError(t, err)
}
}
cases := []struct {
name string
prefix string
filter string
nextToken string
pageSize int32
expectedNextToken string
expectedIDs []string
expectedError string
}{
{
name: "test01 size-2 page-1",
pageSize: 2,
expectedNextToken: "1002.aaaaaa33-3350-4b4b-d185-0e1992ed43e9",
expectedIDs: []string{
"aaaa1111-3350-4b4b-d185-0e1992ed43e9",
"aaaaaa22-3350-4b4b-d185-0e1992ed43e9",
},
},
{
name: "test02 size-2 page-1 with prefix",
prefix: "aaaa",
pageSize: 2,
expectedNextToken: "aaaaaa33-3350-4b4b-d185-0e1992ed43e9",
expectedIDs: []string{
"aaaa1111-3350-4b4b-d185-0e1992ed43e9",
"aaaaaa22-3350-4b4b-d185-0e1992ed43e9",
},
},
{
name: "test03 size-2 page-2 default NS",
pageSize: 2,
nextToken: "1002.aaaaaa33-3350-4b4b-d185-0e1992ed43e9",
expectedNextToken: "1004.aaaaaabb-3350-4b4b-d185-0e1992ed43e9",
expectedIDs: []string{
"aaaaaa33-3350-4b4b-d185-0e1992ed43e9",
"aaaaaaaa-3350-4b4b-d185-0e1992ed43e9",
},
},
{
name: "test04 go-bexpr filter",
filter: `AccessorID matches "^a+[123]"`,
expectedIDs: []string{
"aaaa1111-3350-4b4b-d185-0e1992ed43e9",
"aaaaaa22-3350-4b4b-d185-0e1992ed43e9",
"aaaaaa33-3350-4b4b-d185-0e1992ed43e9",
},
},
{
name: "test05 go-bexpr filter with pagination",
filter: `AccessorID matches "^a+[123]"`,
pageSize: 2,
expectedNextToken: "1002.aaaaaa33-3350-4b4b-d185-0e1992ed43e9",
expectedIDs: []string{
"aaaa1111-3350-4b4b-d185-0e1992ed43e9",
"aaaaaa22-3350-4b4b-d185-0e1992ed43e9",
},
},
{
name: "test06 go-bexpr invalid expression",
filter: `NotValid`,
expectedError: "failed to read filter expression",
},
{
name: "test07 go-bexpr invalid field",
filter: `InvalidField == "value"`,
expectedError: "error finding value in datum",
},
{
name: "test08 non-lexicographic order",
pageSize: 1,
nextToken: "1007.00000111-3350-4b4b-d185-0e1992ed43e9",
expectedNextToken: "1008.00000222-3350-4b4b-d185-0e1992ed43e9",
expectedIDs: []string{
"00000111-3350-4b4b-d185-0e1992ed43e9",
},
},
{
name: "test09 same index",
pageSize: 1,
nextToken: "1008.00000222-3350-4b4b-d185-0e1992ed43e9",
expectedNextToken: "1008.00000333-3350-4b4b-d185-0e1992ed43e9",
expectedIDs: []string{
"00000222-3350-4b4b-d185-0e1992ed43e9",
},
},
{
name: "test10 missing index",
pageSize: 1,
nextToken: "1009.e9522802-0cd8-4b1d-9c9e-ab3d97938371",
expectedIDs: []string{
"bbbb1111-3350-4b4b-d185-0e1992ed43e9",
},
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
req := &structs.ACLTokenListRequest{
QueryOptions: structs.QueryOptions{
Region: "global",
Prefix: tc.prefix,
Filter: tc.filter,
PerPage: tc.pageSize,
NextToken: tc.nextToken,
},
}
req.AuthToken = bootstrapToken
var resp structs.ACLTokenListResponse
err := msgpackrpc.CallWithCodec(codec, "ACL.ListTokens", req, &resp)
if tc.expectedError == "" {
require.NoError(t, err)
} else {
require.Error(t, err)
require.Contains(t, err.Error(), tc.expectedError)
return
}
gotIDs := []string{}
for _, token := range resp.Tokens {
gotIDs = append(gotIDs, token.AccessorID)
}
require.Equal(t, tc.expectedIDs, gotIDs, "unexpected page of tokens")
require.Equal(t, tc.expectedNextToken, resp.QueryMeta.NextToken, "unexpected NextToken")
})
}
}
func TestACLEndpoint_ListTokens_Order(t *testing.T) {
t.Parallel()
s1, cleanupS1 := TestServer(t, func(c *Config) {
c.ACLEnabled = true
})
defer cleanupS1()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
// Create register requests
uuid1 := uuid.Generate()
token1 := mock.ACLManagementToken()
token1.AccessorID = uuid1
uuid2 := uuid.Generate()
token2 := mock.ACLToken()
token2.AccessorID = uuid2
uuid3 := uuid.Generate()
token3 := mock.ACLToken()
token3.AccessorID = uuid3
// bootstrap cluster with the first token
bootstrapToken := token1.SecretID
err := s1.State().BootstrapACLTokens(structs.MsgTypeTestSetup, 1000, 0, token1)
require.NoError(t, err)
err = s1.fsm.State().UpsertACLTokens(structs.MsgTypeTestSetup, 1001, []*structs.ACLToken{token2})
require.NoError(t, err)
err = s1.fsm.State().UpsertACLTokens(structs.MsgTypeTestSetup, 1002, []*structs.ACLToken{token3})
require.NoError(t, err)
// update token2 again so we can later assert create index order did not change
err = s1.fsm.State().UpsertACLTokens(structs.MsgTypeTestSetup, 1003, []*structs.ACLToken{token2})
require.NoError(t, err)
t.Run("default", func(t *testing.T) {
// Lookup the tokens in the default order (oldest first)
get := &structs.ACLTokenListRequest{
QueryOptions: structs.QueryOptions{
Region: "global",
},
}
get.AuthToken = bootstrapToken
var resp structs.ACLTokenListResponse
err = msgpackrpc.CallWithCodec(codec, "ACL.ListTokens", get, &resp)
require.NoError(t, err)
require.Equal(t, uint64(1003), resp.Index)
require.Len(t, resp.Tokens, 3)
// Assert returned order is by CreateIndex (ascending)
require.Equal(t, uint64(1000), resp.Tokens[0].CreateIndex)
require.Equal(t, uuid1, resp.Tokens[0].AccessorID)
require.Equal(t, uint64(1001), resp.Tokens[1].CreateIndex)
require.Equal(t, uuid2, resp.Tokens[1].AccessorID)
require.Equal(t, uint64(1002), resp.Tokens[2].CreateIndex)
require.Equal(t, uuid3, resp.Tokens[2].AccessorID)
})
t.Run("reverse", func(t *testing.T) {
// Lookup the tokens in reverse order (newest first)
get := &structs.ACLTokenListRequest{
QueryOptions: structs.QueryOptions{
Region: "global",
Reverse: true,
},
}
get.AuthToken = bootstrapToken
var resp structs.ACLTokenListResponse
err = msgpackrpc.CallWithCodec(codec, "ACL.ListTokens", get, &resp)
require.NoError(t, err)
require.Equal(t, uint64(1003), resp.Index)
require.Len(t, resp.Tokens, 3)
// Assert returned order is by CreateIndex (descending)
require.Equal(t, uint64(1002), resp.Tokens[0].CreateIndex)
require.Equal(t, uuid3, resp.Tokens[0].AccessorID)
require.Equal(t, uint64(1001), resp.Tokens[1].CreateIndex)
require.Equal(t, uuid2, resp.Tokens[1].AccessorID)
require.Equal(t, uint64(1000), resp.Tokens[2].CreateIndex)
require.Equal(t, uuid1, resp.Tokens[2].AccessorID)
})
}
func TestACLEndpoint_ListTokens_Blocking(t *testing.T) {
t.Parallel()

View File

@ -2,6 +2,7 @@ package nomad
import (
"fmt"
"net/http"
"time"
metrics "github.com/armon/go-metrics"
@ -13,6 +14,7 @@ import (
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/state/paginator"
"github.com/hashicorp/nomad/nomad/structs"
)
@ -32,111 +34,103 @@ func (a *Alloc) List(args *structs.AllocListRequest, reply *structs.AllocListRes
}
defer metrics.MeasureSince([]string{"nomad", "alloc", "list"}, time.Now())
if args.RequestNamespace() == structs.AllNamespacesSentinel {
return a.listAllNamespaces(args, reply)
}
namespace := args.RequestNamespace()
var allow func(string) bool
// Check namespace read-job permissions
aclObj, err := a.srv.ResolveToken(args.AuthToken)
if err != nil {
switch {
case err != nil:
return err
} else if aclObj != nil && !aclObj.AllowNsOp(args.RequestNamespace(), acl.NamespaceCapabilityReadJob) {
case aclObj == nil:
allow = func(string) bool {
return true
}
case namespace == structs.AllNamespacesSentinel:
allow = func(ns string) bool {
return aclObj.AllowNsOp(ns, acl.NamespaceCapabilityReadJob)
}
case !aclObj.AllowNsOp(namespace, acl.NamespaceCapabilityReadJob):
return structs.ErrPermissionDenied
default:
allow = func(string) bool {
return true
}
}
// Setup the blocking query
sort := state.SortOption(args.Reverse)
opts := blockingOptions{
queryOpts: &args.QueryOptions,
queryMeta: &reply.QueryMeta,
run: func(ws memdb.WatchSet, state *state.StateStore) error {
// Capture all the allocations
// Scan all the allocations
var err error
var iter memdb.ResultIterator
var opts paginator.StructsTokenizerOptions
prefix := args.QueryOptions.Prefix
if prefix != "" {
iter, err = state.AllocsByIDPrefix(ws, args.RequestNamespace(), prefix)
} else {
iter, err = state.AllocsByNamespace(ws, args.RequestNamespace())
}
if err != nil {
return err
}
var allocs []*structs.AllocListStub
for {
raw := iter.Next()
if raw == nil {
break
}
alloc := raw.(*structs.Allocation)
allocs = append(allocs, alloc.Stub(args.Fields))
}
reply.Allocations = allocs
// Use the last index that affected the jobs table
index, err := state.Index("allocs")
if err != nil {
return err
}
reply.Index = index
// Set the query response
a.srv.setQueryMeta(&reply.QueryMeta)
return nil
}}
return a.srv.blockingRPC(&opts)
}
// listAllNamespaces lists all allocations across all namespaces
func (a *Alloc) listAllNamespaces(args *structs.AllocListRequest, reply *structs.AllocListResponse) error {
// Check for read-job permissions
aclObj, err := a.srv.ResolveToken(args.AuthToken)
if err != nil {
return err
}
prefix := args.QueryOptions.Prefix
allow := func(ns string) bool {
return aclObj.AllowNsOp(ns, acl.NamespaceCapabilityReadJob)
}
// Setup the blocking query
opts := blockingOptions{
queryOpts: &args.QueryOptions,
queryMeta: &reply.QueryMeta,
run: func(ws memdb.WatchSet, state *state.StateStore) error {
// get list of accessible namespaces
allowedNSes, err := allowedNSes(aclObj, state, allow)
allowableNamespaces, err := allowedNSes(aclObj, state, allow)
if err == structs.ErrPermissionDenied {
// return empty allocations if token isn't authorized for any
// return empty allocation if token is not authorized for any
// namespace, matching other endpoints
reply.Allocations = []*structs.AllocListStub{}
reply.Allocations = make([]*structs.AllocListStub, 0)
} else if err != nil {
return err
} else {
var iter memdb.ResultIterator
var err error
if prefix != "" {
iter, err = state.AllocsByIDPrefixAllNSs(ws, prefix)
if prefix := args.QueryOptions.Prefix; prefix != "" {
iter, err = state.AllocsByIDPrefix(ws, namespace, prefix)
opts = paginator.StructsTokenizerOptions{
WithID: true,
}
} else if namespace != structs.AllNamespacesSentinel {
iter, err = state.AllocsByNamespaceOrdered(ws, namespace, sort)
opts = paginator.StructsTokenizerOptions{
WithCreateIndex: true,
WithID: true,
}
} else {
iter, err = state.Allocs(ws)
iter, err = state.Allocs(ws, sort)
opts = paginator.StructsTokenizerOptions{
WithCreateIndex: true,
WithID: true,
}
}
if err != nil {
return err
}
var allocs []*structs.AllocListStub
for raw := iter.Next(); raw != nil; raw = iter.Next() {
alloc := raw.(*structs.Allocation)
if allowedNSes != nil && !allowedNSes[alloc.Namespace] {
continue
}
allocs = append(allocs, alloc.Stub(args.Fields))
tokenizer := paginator.NewStructsTokenizer(iter, opts)
filters := []paginator.Filter{
paginator.NamespaceFilter{
AllowableNamespaces: allowableNamespaces,
},
}
reply.Allocations = allocs
var stubs []*structs.AllocListStub
paginator, err := paginator.NewPaginator(iter, tokenizer, filters, args.QueryOptions,
func(raw interface{}) error {
allocation := raw.(*structs.Allocation)
stubs = append(stubs, allocation.Stub(args.Fields))
return nil
})
if err != nil {
return structs.NewErrRPCCodedf(
http.StatusBadRequest, "failed to create result paginator: %v", err)
}
nextToken, err := paginator.Page()
if err != nil {
return structs.NewErrRPCCodedf(
http.StatusBadRequest, "failed to read result page: %v", err)
}
reply.QueryMeta.NextToken = nextToken
reply.Allocations = stubs
}
// Use the last index that affected the jobs table
// Use the last index that affected the allocs table
index, err := state.Index("allocs")
if err != nil {
return err

View File

@ -74,6 +74,330 @@ func TestAllocEndpoint_List(t *testing.T) {
require.Equal(t, uint64(1000), resp2.Index)
require.Len(t, resp2.Allocations, 1)
require.Equal(t, alloc.ID, resp2.Allocations[0].ID)
// Lookup allocations with a filter
get = &structs.AllocListRequest{
QueryOptions: structs.QueryOptions{
Region: "global",
Namespace: structs.DefaultNamespace,
Filter: "TaskGroup == web",
},
}
var resp3 structs.AllocListResponse
require.NoError(t, msgpackrpc.CallWithCodec(codec, "Alloc.List", get, &resp3))
require.Equal(t, uint64(1000), resp3.Index)
require.Len(t, resp3.Allocations, 1)
require.Equal(t, alloc.ID, resp3.Allocations[0].ID)
}
func TestAllocEndpoint_List_PaginationFiltering(t *testing.T) {
t.Parallel()
s1, _, cleanupS1 := TestACLServer(t, nil)
defer cleanupS1()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
// create a set of allocs and field values to filter on. these are in the order
// that the state store will return them from the iterator (sorted by create
// index), for ease of writing tests.
mocks := []struct {
ids []string
namespace string
group string
}{
{ids: []string{"aaaa1111-3350-4b4b-d185-0e1992ed43e9"}}, // 0
{ids: []string{"aaaaaa22-3350-4b4b-d185-0e1992ed43e9"}}, // 1
{ids: []string{"aaaaaa33-3350-4b4b-d185-0e1992ed43e9"}, namespace: "non-default"}, // 2
{ids: []string{"aaaaaaaa-3350-4b4b-d185-0e1992ed43e9"}, group: "bar"}, // 3
{ids: []string{"aaaaaabb-3350-4b4b-d185-0e1992ed43e9"}, group: "goo"}, // 4
{ids: []string{"aaaaaacc-3350-4b4b-d185-0e1992ed43e9"}}, // 5
{ids: []string{"aaaaaadd-3350-4b4b-d185-0e1992ed43e9"}, group: "bar"}, // 6
{ids: []string{"aaaaaaee-3350-4b4b-d185-0e1992ed43e9"}, group: "goo"}, // 7
{ids: []string{"aaaaaaff-3350-4b4b-d185-0e1992ed43e9"}, group: "bar"}, // 8
{ids: []string{"00000111-3350-4b4b-d185-0e1992ed43e9"}}, // 9
{ids: []string{ // 10
"00000222-3350-4b4b-d185-0e1992ed43e9",
"00000333-3350-4b4b-d185-0e1992ed43e9",
}},
{}, // 11, index missing
{ids: []string{"bbbb1111-3350-4b4b-d185-0e1992ed43e9"}}, // 12
}
state := s1.fsm.State()
var allocs []*structs.Allocation
for i, m := range mocks {
allocsInTx := []*structs.Allocation{}
for _, id := range m.ids {
alloc := mock.Alloc()
alloc.ID = id
if m.namespace != "" {
alloc.Namespace = m.namespace
}
if m.group != "" {
alloc.TaskGroup = m.group
}
allocs = append(allocs, alloc)
allocsInTx = append(allocsInTx, alloc)
}
// other fields
index := 1000 + uint64(i)
require.NoError(t, state.UpsertAllocs(structs.MsgTypeTestSetup, index, allocsInTx))
}
require.NoError(t, state.UpsertNamespaces(1099, []*structs.Namespace{
{Name: "non-default"},
}))
aclToken := mock.CreatePolicyAndToken(t,
state, 1100, "test-valid-read",
mock.NamespacePolicy("*", "read", nil),
).SecretID
cases := []struct {
name string
namespace string
prefix string
nextToken string
pageSize int32
filter string
expIDs []string
expNextToken string
expErr string
}{
{
name: "test01 size-2 page-1 ns-default",
pageSize: 2,
expIDs: []string{ // first two items
"aaaa1111-3350-4b4b-d185-0e1992ed43e9",
"aaaaaa22-3350-4b4b-d185-0e1992ed43e9",
},
expNextToken: "1003.aaaaaaaa-3350-4b4b-d185-0e1992ed43e9", // next one in default ns
},
{
name: "test02 size-2 page-1 ns-default with-prefix",
prefix: "aaaa",
pageSize: 2,
expIDs: []string{
"aaaa1111-3350-4b4b-d185-0e1992ed43e9",
"aaaaaa22-3350-4b4b-d185-0e1992ed43e9",
},
expNextToken: "aaaaaaaa-3350-4b4b-d185-0e1992ed43e9",
},
{
name: "test03 size-2 page-2 ns-default",
pageSize: 2,
nextToken: "1003.aaaaaaaa-3350-4b4b-d185-0e1992ed43e9",
expNextToken: "1005.aaaaaacc-3350-4b4b-d185-0e1992ed43e9",
expIDs: []string{
"aaaaaaaa-3350-4b4b-d185-0e1992ed43e9",
"aaaaaabb-3350-4b4b-d185-0e1992ed43e9",
},
},
{
name: "test04 size-2 page-2 ns-default with prefix",
prefix: "aaaa",
pageSize: 2,
nextToken: "aaaaaabb-3350-4b4b-d185-0e1992ed43e9",
expNextToken: "aaaaaadd-3350-4b4b-d185-0e1992ed43e9",
expIDs: []string{
"aaaaaabb-3350-4b4b-d185-0e1992ed43e9",
"aaaaaacc-3350-4b4b-d185-0e1992ed43e9",
},
},
{
name: "test05 go-bexpr filter",
filter: `TaskGroup == "goo"`,
nextToken: "",
expIDs: []string{
"aaaaaabb-3350-4b4b-d185-0e1992ed43e9",
"aaaaaaee-3350-4b4b-d185-0e1992ed43e9",
},
},
{
name: "test06 go-bexpr filter with pagination",
filter: `TaskGroup == "bar"`,
pageSize: 2,
expNextToken: "1008.aaaaaaff-3350-4b4b-d185-0e1992ed43e9",
expIDs: []string{
"aaaaaaaa-3350-4b4b-d185-0e1992ed43e9",
"aaaaaadd-3350-4b4b-d185-0e1992ed43e9",
},
},
{
name: "test07 go-bexpr filter namespace",
namespace: "non-default",
filter: `ID contains "aaa"`,
expIDs: []string{
"aaaaaa33-3350-4b4b-d185-0e1992ed43e9",
},
},
{
name: "test08 go-bexpr wrong namespace",
namespace: "default",
filter: `Namespace == "non-default"`,
expIDs: []string(nil),
},
{
name: "test09 go-bexpr invalid expression",
filter: `NotValid`,
expErr: "failed to read filter expression",
},
{
name: "test10 go-bexpr invalid field",
filter: `InvalidField == "value"`,
expErr: "error finding value in datum",
},
{
name: "test11 non-lexicographic order",
pageSize: 1,
nextToken: "1009.00000111-3350-4b4b-d185-0e1992ed43e9",
expNextToken: "1010.00000222-3350-4b4b-d185-0e1992ed43e9",
expIDs: []string{
"00000111-3350-4b4b-d185-0e1992ed43e9",
},
},
{
name: "test12 same index",
pageSize: 1,
nextToken: "1010.00000222-3350-4b4b-d185-0e1992ed43e9",
expNextToken: "1010.00000333-3350-4b4b-d185-0e1992ed43e9",
expIDs: []string{
"00000222-3350-4b4b-d185-0e1992ed43e9",
},
},
{
name: "test13 missing index",
pageSize: 1,
nextToken: "1011.e9522802-0cd8-4b1d-9c9e-ab3d97938371",
expIDs: []string{
"bbbb1111-3350-4b4b-d185-0e1992ed43e9",
},
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
var req = &structs.AllocListRequest{
QueryOptions: structs.QueryOptions{
Region: "global",
Namespace: tc.namespace,
Prefix: tc.prefix,
PerPage: tc.pageSize,
NextToken: tc.nextToken,
Filter: tc.filter,
},
Fields: &structs.AllocStubFields{
Resources: false,
TaskStates: false,
},
}
req.AuthToken = aclToken
var resp structs.AllocListResponse
err := msgpackrpc.CallWithCodec(codec, "Alloc.List", req, &resp)
if tc.expErr == "" {
require.NoError(t, err)
} else {
require.Contains(t, err, tc.expErr)
}
var gotIDs []string
for _, alloc := range resp.Allocations {
gotIDs = append(gotIDs, alloc.ID)
}
require.Equal(t, tc.expIDs, gotIDs)
require.Equal(t, tc.expNextToken, resp.QueryMeta.NextToken)
})
}
}
func TestAllocEndpoint_List_order(t *testing.T) {
t.Parallel()
s1, cleanupS1 := TestServer(t, nil)
defer cleanupS1()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
// Create register requests
uuid1 := uuid.Generate()
alloc1 := mock.Alloc()
alloc1.ID = uuid1
uuid2 := uuid.Generate()
alloc2 := mock.Alloc()
alloc2.ID = uuid2
uuid3 := uuid.Generate()
alloc3 := mock.Alloc()
alloc3.ID = uuid3
err := s1.fsm.State().UpsertAllocs(structs.MsgTypeTestSetup, 1000, []*structs.Allocation{alloc1})
require.NoError(t, err)
err = s1.fsm.State().UpsertAllocs(structs.MsgTypeTestSetup, 1001, []*structs.Allocation{alloc2})
require.NoError(t, err)
err = s1.fsm.State().UpsertAllocs(structs.MsgTypeTestSetup, 1002, []*structs.Allocation{alloc3})
require.NoError(t, err)
// update alloc2 again so we can later assert create index order did not change
err = s1.fsm.State().UpsertAllocs(structs.MsgTypeTestSetup, 1003, []*structs.Allocation{alloc2})
require.NoError(t, err)
t.Run("default", func(t *testing.T) {
// Lookup the allocations in the default order (oldest first)
get := &structs.AllocListRequest{
QueryOptions: structs.QueryOptions{
Region: "global",
Namespace: "*",
},
}
var resp structs.AllocListResponse
err = msgpackrpc.CallWithCodec(codec, "Alloc.List", get, &resp)
require.NoError(t, err)
require.Equal(t, uint64(1003), resp.Index)
require.Len(t, resp.Allocations, 3)
// Assert returned order is by CreateIndex (ascending)
require.Equal(t, uint64(1000), resp.Allocations[0].CreateIndex)
require.Equal(t, uuid1, resp.Allocations[0].ID)
require.Equal(t, uint64(1001), resp.Allocations[1].CreateIndex)
require.Equal(t, uuid2, resp.Allocations[1].ID)
require.Equal(t, uint64(1002), resp.Allocations[2].CreateIndex)
require.Equal(t, uuid3, resp.Allocations[2].ID)
})
t.Run("reverse", func(t *testing.T) {
// Lookup the allocations in reverse order (newest first)
get := &structs.AllocListRequest{
QueryOptions: structs.QueryOptions{
Region: "global",
Namespace: "*",
Reverse: true,
},
}
var resp structs.AllocListResponse
err = msgpackrpc.CallWithCodec(codec, "Alloc.List", get, &resp)
require.NoError(t, err)
require.Equal(t, uint64(1003), resp.Index)
require.Len(t, resp.Allocations, 3)
// Assert returned order is by CreateIndex (descending)
require.Equal(t, uint64(1002), resp.Allocations[0].CreateIndex)
require.Equal(t, uuid3, resp.Allocations[0].ID)
require.Equal(t, uint64(1001), resp.Allocations[1].CreateIndex)
require.Equal(t, uuid2, resp.Allocations[1].ID)
require.Equal(t, uint64(1000), resp.Allocations[2].CreateIndex)
require.Equal(t, uuid1, resp.Allocations[2].ID)
})
}
func TestAllocEndpoint_List_Fields(t *testing.T) {
@ -327,18 +651,23 @@ func TestAllocEndpoint_List_AllNamespaces_OSS(t *testing.T) {
require.NoError(t, state.UpsertNamespaces(900, []*structs.Namespace{ns1, ns2}))
// Create the allocations
uuid1 := uuid.Generate()
alloc1 := mock.Alloc()
alloc1.ID = "a" + alloc1.ID[1:]
alloc1.ID = uuid1
alloc1.Namespace = ns1.Name
uuid2 := uuid.Generate()
alloc2 := mock.Alloc()
alloc2.ID = "b" + alloc2.ID[1:]
alloc2.ID = uuid2
alloc2.Namespace = ns2.Name
summary1 := mock.JobSummary(alloc1.JobID)
summary2 := mock.JobSummary(alloc2.JobID)
require.NoError(t, state.UpsertJobSummary(999, summary1))
require.NoError(t, state.UpsertJobSummary(999, summary2))
require.NoError(t, state.UpsertAllocs(structs.MsgTypeTestSetup, 1000, []*structs.Allocation{alloc1, alloc2}))
require.NoError(t, state.UpsertJobSummary(1000, summary1))
require.NoError(t, state.UpsertJobSummary(1001, summary2))
require.NoError(t, state.UpsertAllocs(structs.MsgTypeTestSetup, 1002, []*structs.Allocation{alloc1}))
require.NoError(t, state.UpsertAllocs(structs.MsgTypeTestSetup, 1003, []*structs.Allocation{alloc2}))
t.Run("looking up all allocations", func(t *testing.T) {
get := &structs.AllocListRequest{
@ -349,7 +678,7 @@ func TestAllocEndpoint_List_AllNamespaces_OSS(t *testing.T) {
}
var resp structs.AllocListResponse
require.NoError(t, msgpackrpc.CallWithCodec(codec, "Alloc.List", get, &resp))
require.Equal(t, uint64(1000), resp.Index)
require.Equal(t, uint64(1003), resp.Index)
require.Len(t, resp.Allocations, 2)
require.ElementsMatch(t,
[]string{resp.Allocations[0].ID, resp.Allocations[1].ID},
@ -367,26 +696,23 @@ func TestAllocEndpoint_List_AllNamespaces_OSS(t *testing.T) {
}
var resp structs.AllocListResponse
require.NoError(t, msgpackrpc.CallWithCodec(codec, "Alloc.List", get, &resp))
require.Equal(t, uint64(1000), resp.Index)
require.Equal(t, uint64(1003), resp.Index)
require.Len(t, resp.Allocations, 1)
require.Equal(t, alloc1.ID, resp.Allocations[0].ID)
require.Equal(t, alloc1.Namespace, resp.Allocations[0].Namespace)
})
t.Run("looking up allocations with mismatch prefix", func(t *testing.T) {
// allocations were constructed above to have prefix starting with "a" or "b"
badPrefix := "cc"
get := &structs.AllocListRequest{
QueryOptions: structs.QueryOptions{
Region: "global",
Namespace: "*",
Prefix: badPrefix,
Prefix: "000000", // unlikely to match
},
}
var resp structs.AllocListResponse
require.NoError(t, msgpackrpc.CallWithCodec(codec, "Alloc.List", get, &resp))
require.Equal(t, uint64(1000), resp.Index)
require.Equal(t, uint64(1003), resp.Index)
require.Empty(t, resp.Allocations)
})
}

View File

@ -545,7 +545,7 @@ func (c *CoreScheduler) nodeReap(eval *structs.Evaluation, nodeIDs []string) err
func (c *CoreScheduler) deploymentGC(eval *structs.Evaluation) error {
// Iterate over the deployments
ws := memdb.NewWatchSet()
iter, err := c.snap.Deployments(ws, false)
iter, err := c.snap.Deployments(ws, state.SortDefault)
if err != nil {
return err
}

View File

@ -3,6 +3,7 @@ package nomad
import (
"errors"
"fmt"
"net/http"
"time"
metrics "github.com/armon/go-metrics"
@ -12,6 +13,7 @@ import (
"github.com/hashicorp/nomad/acl"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/state/paginator"
"github.com/hashicorp/nomad/nomad/structs"
)
@ -136,40 +138,65 @@ func (v *CSIVolume) List(args *structs.CSIVolumeListRequest, reply *structs.CSIV
} else {
iter, err = snap.CSIVolumes(ws)
}
if err != nil {
return err
}
tokenizer := paginator.NewStructsTokenizer(
iter,
paginator.StructsTokenizerOptions{
WithNamespace: true,
WithID: true,
},
)
volFilter := paginator.GenericFilter{
Allow: func(raw interface{}) (bool, error) {
vol := raw.(*structs.CSIVolume)
// Remove (possibly again) by PluginID to handle passing both
// NodeID and PluginID
if args.PluginID != "" && args.PluginID != vol.PluginID {
return false, nil
}
// Remove by Namespace, since CSIVolumesByNodeID hasn't used
// the Namespace yet
if ns != structs.AllNamespacesSentinel && vol.Namespace != ns {
return false, nil
}
return true, nil
},
}
filters := []paginator.Filter{volFilter}
// Collect results, filter by ACL access
vs := []*structs.CSIVolListStub{}
for {
raw := iter.Next()
if raw == nil {
break
}
vol := raw.(*structs.CSIVolume)
paginator, err := paginator.NewPaginator(iter, tokenizer, filters, args.QueryOptions,
func(raw interface{}) error {
vol := raw.(*structs.CSIVolume)
// Remove (possibly again) by PluginID to handle passing both
// NodeID and PluginID
if args.PluginID != "" && args.PluginID != vol.PluginID {
continue
}
vol, err := snap.CSIVolumeDenormalizePlugins(ws, vol.Copy())
if err != nil {
return err
}
// Remove by Namespace, since CSIVolumesByNodeID hasn't used
// the Namespace yet
if ns != structs.AllNamespacesSentinel && vol.Namespace != ns {
continue
}
vol, err := snap.CSIVolumeDenormalizePlugins(ws, vol.Copy())
if err != nil {
return err
}
vs = append(vs, vol.Stub())
vs = append(vs, vol.Stub())
return nil
})
if err != nil {
return structs.NewErrRPCCodedf(
http.StatusBadRequest, "failed to create result paginator: %v", err)
}
nextToken, err := paginator.Page()
if err != nil {
return structs.NewErrRPCCodedf(
http.StatusBadRequest, "failed to read result page: %v", err)
}
reply.QueryMeta.NextToken = nextToken
reply.Volumes = vs
return v.srv.replySetIndex(csiVolumeTable, &reply.QueryMeta)
}}

View File

@ -753,6 +753,200 @@ func TestCSIVolumeEndpoint_ListAllNamespaces(t *testing.T) {
require.Equal(t, structs.DefaultNamespace, resp2.Volumes[0].Namespace)
}
func TestCSIVolumeEndpoint_List_PaginationFiltering(t *testing.T) {
t.Parallel()
s1, cleanupS1 := TestServer(t, nil)
defer cleanupS1()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
nonDefaultNS := "non-default"
// create a set of volumes. these are in the order that the state store
// will return them from the iterator (sorted by create index), for ease of
// writing tests
mocks := []struct {
id string
namespace string
}{
{id: "vol-01"}, // 0
{id: "vol-02"}, // 1
{id: "vol-03", namespace: nonDefaultNS}, // 2
{id: "vol-04"}, // 3
{id: "vol-05"}, // 4
{id: "vol-06"}, // 5
{id: "vol-07"}, // 6
{id: "vol-08"}, // 7
{}, // 9, missing volume
{id: "vol-10"}, // 10
}
state := s1.fsm.State()
plugin := mock.CSIPlugin()
// Create namespaces.
err := state.UpsertNamespaces(999, []*structs.Namespace{{Name: nonDefaultNS}})
require.NoError(t, err)
for i, m := range mocks {
if m.id == "" {
continue
}
volume := mock.CSIVolume(plugin)
volume.ID = m.id
if m.namespace != "" { // defaults to "default"
volume.Namespace = m.namespace
}
index := 1000 + uint64(i)
require.NoError(t, state.CSIVolumeRegister(index, []*structs.CSIVolume{volume}))
}
cases := []struct {
name string
namespace string
prefix string
filter string
nextToken string
pageSize int32
expectedNextToken string
expectedIDs []string
expectedError string
}{
{
name: "test01 size-2 page-1 default NS",
pageSize: 2,
expectedNextToken: "default.vol-04",
expectedIDs: []string{
"vol-01",
"vol-02",
},
},
{
name: "test02 size-2 page-1 default NS with prefix",
prefix: "vol",
pageSize: 2,
expectedNextToken: "default.vol-04",
expectedIDs: []string{
"vol-01",
"vol-02",
},
},
{
name: "test03 size-2 page-2 default NS",
pageSize: 2,
nextToken: "default.vol-04",
expectedNextToken: "default.vol-06",
expectedIDs: []string{
"vol-04",
"vol-05",
},
},
{
name: "test04 size-2 page-2 default NS with prefix",
prefix: "vol",
pageSize: 2,
nextToken: "default.vol-04",
expectedNextToken: "default.vol-06",
expectedIDs: []string{
"vol-04",
"vol-05",
},
},
{
name: "test05 no valid results with filters and prefix",
prefix: "cccc",
pageSize: 2,
nextToken: "",
expectedIDs: []string{},
},
{
name: "test06 go-bexpr filter",
namespace: "*",
filter: `ID matches "^vol-0[123]"`,
expectedIDs: []string{
"vol-01",
"vol-02",
"vol-03",
},
},
{
name: "test07 go-bexpr filter with pagination",
namespace: "*",
filter: `ID matches "^vol-0[123]"`,
pageSize: 2,
expectedNextToken: "non-default.vol-03",
expectedIDs: []string{
"vol-01",
"vol-02",
},
},
{
name: "test08 go-bexpr filter in namespace",
namespace: "non-default",
filter: `Provider == "com.hashicorp:mock"`,
expectedIDs: []string{
"vol-03",
},
},
{
name: "test09 go-bexpr wrong namespace",
namespace: "default",
filter: `Namespace == "non-default"`,
expectedIDs: []string{},
},
{
name: "test10 go-bexpr invalid expression",
filter: `NotValid`,
expectedError: "failed to read filter expression",
},
{
name: "test11 go-bexpr invalid field",
filter: `InvalidField == "value"`,
expectedError: "error finding value in datum",
},
{
name: "test14 missing volume",
pageSize: 1,
nextToken: "default.vol-09",
expectedIDs: []string{
"vol-10",
},
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
req := &structs.CSIVolumeListRequest{
QueryOptions: structs.QueryOptions{
Region: "global",
Namespace: tc.namespace,
Prefix: tc.prefix,
Filter: tc.filter,
PerPage: tc.pageSize,
NextToken: tc.nextToken,
},
}
var resp structs.CSIVolumeListResponse
err := msgpackrpc.CallWithCodec(codec, "CSIVolume.List", req, &resp)
if tc.expectedError == "" {
require.NoError(t, err)
} else {
require.Error(t, err)
require.Contains(t, err.Error(), tc.expectedError)
return
}
gotIDs := []string{}
for _, deployment := range resp.Volumes {
gotIDs = append(gotIDs, deployment.ID)
}
require.Equal(t, tc.expectedIDs, gotIDs, "unexpected page of volumes")
require.Equal(t, tc.expectedNextToken, resp.QueryMeta.NextToken, "unexpected NextToken")
})
}
}
func TestCSIVolumeEndpoint_Create(t *testing.T) {
t.Parallel()
var err error

View File

@ -10,33 +10,10 @@ import (
memdb "github.com/hashicorp/go-memdb"
"github.com/hashicorp/nomad/acl"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/state/paginator"
"github.com/hashicorp/nomad/nomad/structs"
)
// DeploymentPaginationIterator is a wrapper over a go-memdb iterator that
// implements the paginator Iterator interface.
type DeploymentPaginationIterator struct {
iter memdb.ResultIterator
byCreateIndex bool
}
func (it DeploymentPaginationIterator) Next() (string, interface{}) {
raw := it.iter.Next()
if raw == nil {
return "", nil
}
d := raw.(*structs.Deployment)
token := d.ID
// prefix the pagination token by CreateIndex to keep it properly sorted.
if it.byCreateIndex {
token = fmt.Sprintf("%v-%v", d.CreateIndex, d.ID)
}
return token, d
}
// Deployment endpoint is used for manipulating deployments
type Deployment struct {
srv *Server
@ -426,6 +403,7 @@ func (d *Deployment) List(args *structs.DeploymentListRequest, reply *structs.De
}
// Setup the blocking query
sort := state.SortOption(args.Reverse)
opts := blockingOptions{
queryOpts: &args.QueryOptions,
queryMeta: &reply.QueryMeta,
@ -433,26 +411,34 @@ func (d *Deployment) List(args *structs.DeploymentListRequest, reply *structs.De
// Capture all the deployments
var err error
var iter memdb.ResultIterator
var deploymentIter DeploymentPaginationIterator
var opts paginator.StructsTokenizerOptions
if prefix := args.QueryOptions.Prefix; prefix != "" {
iter, err = store.DeploymentsByIDPrefix(ws, namespace, prefix)
deploymentIter.byCreateIndex = false
opts = paginator.StructsTokenizerOptions{
WithID: true,
}
} else if namespace != structs.AllNamespacesSentinel {
iter, err = store.DeploymentsByNamespaceOrdered(ws, namespace, args.Ascending)
deploymentIter.byCreateIndex = true
iter, err = store.DeploymentsByNamespaceOrdered(ws, namespace, sort)
opts = paginator.StructsTokenizerOptions{
WithCreateIndex: true,
WithID: true,
}
} else {
iter, err = store.Deployments(ws, args.Ascending)
deploymentIter.byCreateIndex = true
iter, err = store.Deployments(ws, sort)
opts = paginator.StructsTokenizerOptions{
WithCreateIndex: true,
WithID: true,
}
}
if err != nil {
return err
}
deploymentIter.iter = iter
tokenizer := paginator.NewStructsTokenizer(iter, opts)
var deploys []*structs.Deployment
paginator, err := state.NewPaginator(deploymentIter, args.QueryOptions,
paginator, err := paginator.NewPaginator(iter, tokenizer, nil, args.QueryOptions,
func(raw interface{}) error {
deploy := raw.(*structs.Deployment)
deploys = append(deploys, deploy)

View File

@ -1066,13 +1066,12 @@ func TestDeploymentEndpoint_List_order(t *testing.T) {
err = s1.fsm.State().UpsertDeployment(1003, dep2)
require.NoError(t, err)
t.Run("ascending", func(t *testing.T) {
t.Run("default", func(t *testing.T) {
// Lookup the deployments in chronological order (oldest first)
get := &structs.DeploymentListRequest{
QueryOptions: structs.QueryOptions{
Region: "global",
Namespace: "*",
Ascending: true,
},
}
@ -1093,13 +1092,13 @@ func TestDeploymentEndpoint_List_order(t *testing.T) {
require.Equal(t, uuid3, resp.Deployments[2].ID)
})
t.Run("descending", func(t *testing.T) {
t.Run("reverse", func(t *testing.T) {
// Lookup the deployments in reverse chronological order (newest first)
get := &structs.DeploymentListRequest{
QueryOptions: structs.QueryOptions{
Region: "global",
Namespace: "*",
Ascending: false,
Reverse: true,
},
}
@ -1312,7 +1311,7 @@ func TestDeploymentEndpoint_List_Pagination(t *testing.T) {
{
name: "test01 size-2 page-1 default NS",
pageSize: 2,
expectedNextToken: "1003-aaaaaaaa-3350-4b4b-d185-0e1992ed43e9",
expectedNextToken: "1003.aaaaaaaa-3350-4b4b-d185-0e1992ed43e9",
expectedIDs: []string{
"aaaa1111-3350-4b4b-d185-0e1992ed43e9",
"aaaaaa22-3350-4b4b-d185-0e1992ed43e9",
@ -1331,8 +1330,8 @@ func TestDeploymentEndpoint_List_Pagination(t *testing.T) {
{
name: "test03 size-2 page-2 default NS",
pageSize: 2,
nextToken: "1003-aaaaaaaa-3350-4b4b-d185-0e1992ed43e9",
expectedNextToken: "1005-aaaaaacc-3350-4b4b-d185-0e1992ed43e9",
nextToken: "1003.aaaaaaaa-3350-4b4b-d185-0e1992ed43e9",
expectedNextToken: "1005.aaaaaacc-3350-4b4b-d185-0e1992ed43e9",
expectedIDs: []string{
"aaaaaaaa-3350-4b4b-d185-0e1992ed43e9",
"aaaaaabb-3350-4b4b-d185-0e1992ed43e9",
@ -1353,8 +1352,8 @@ func TestDeploymentEndpoint_List_Pagination(t *testing.T) {
name: "test05 size-2 page-2 all namespaces",
namespace: "*",
pageSize: 2,
nextToken: "1002-aaaaaa33-3350-4b4b-d185-0e1992ed43e9",
expectedNextToken: "1004-aaaaaabb-3350-4b4b-d185-0e1992ed43e9",
nextToken: "1002.aaaaaa33-3350-4b4b-d185-0e1992ed43e9",
expectedNextToken: "1004.aaaaaabb-3350-4b4b-d185-0e1992ed43e9",
expectedIDs: []string{
"aaaaaa33-3350-4b4b-d185-0e1992ed43e9",
"aaaaaaaa-3350-4b4b-d185-0e1992ed43e9",
@ -1382,7 +1381,7 @@ func TestDeploymentEndpoint_List_Pagination(t *testing.T) {
namespace: "*",
filter: `ID matches "^a+[123]"`,
pageSize: 2,
expectedNextToken: "1002-aaaaaa33-3350-4b4b-d185-0e1992ed43e9",
expectedNextToken: "1002.aaaaaa33-3350-4b4b-d185-0e1992ed43e9",
expectedIDs: []string{
"aaaa1111-3350-4b4b-d185-0e1992ed43e9",
"aaaaaa22-3350-4b4b-d185-0e1992ed43e9",
@ -1415,8 +1414,8 @@ func TestDeploymentEndpoint_List_Pagination(t *testing.T) {
{
name: "test13 non-lexicographic order",
pageSize: 1,
nextToken: "1007-00000111-3350-4b4b-d185-0e1992ed43e9",
expectedNextToken: "1009-bbbb1111-3350-4b4b-d185-0e1992ed43e9",
nextToken: "1007.00000111-3350-4b4b-d185-0e1992ed43e9",
expectedNextToken: "1009.bbbb1111-3350-4b4b-d185-0e1992ed43e9",
expectedIDs: []string{
"00000111-3350-4b4b-d185-0e1992ed43e9",
},
@ -1424,7 +1423,7 @@ func TestDeploymentEndpoint_List_Pagination(t *testing.T) {
{
name: "test14 missing index",
pageSize: 1,
nextToken: "1008-e9522802-0cd8-4b1d-9c9e-ab3d97938371",
nextToken: "1008.e9522802-0cd8-4b1d-9c9e-ab3d97938371",
expectedIDs: []string{
"bbbb1111-3350-4b4b-d185-0e1992ed43e9",
},
@ -1441,7 +1440,6 @@ func TestDeploymentEndpoint_List_Pagination(t *testing.T) {
Filter: tc.filter,
PerPage: tc.pageSize,
NextToken: tc.nextToken,
Ascending: true, // counting up is easier to think about
},
}
req.AuthToken = aclToken

View File

@ -202,9 +202,9 @@ func (w *Watcher) getDeploys(ctx context.Context, minIndex uint64) ([]*structs.D
}
// getDeploysImpl retrieves all deployments from the passed state store.
func (w *Watcher) getDeploysImpl(ws memdb.WatchSet, state *state.StateStore) (interface{}, uint64, error) {
func (w *Watcher) getDeploysImpl(ws memdb.WatchSet, store *state.StateStore) (interface{}, uint64, error) {
iter, err := state.Deployments(ws, false)
iter, err := store.Deployments(ws, state.SortDefault)
if err != nil {
return nil, 0, err
}
@ -220,7 +220,7 @@ func (w *Watcher) getDeploysImpl(ws memdb.WatchSet, state *state.StateStore) (in
}
// Use the last index that affected the deployment table
index, err := state.Index("deployment")
index, err := store.Index("deployment")
if err != nil {
return nil, 0, err
}

View File

@ -931,9 +931,9 @@ func TestDrainer_MultipleNSes_ServiceOnly(t *testing.T) {
}
// Wait for the two allocations to be placed
state := s1.State()
store := s1.State()
testutil.WaitForResult(func() (bool, error) {
iter, err := state.Allocs(nil)
iter, err := store.Allocs(nil, state.SortDefault)
if err != nil {
return false, err
}
@ -974,11 +974,11 @@ func TestDrainer_MultipleNSes_ServiceOnly(t *testing.T) {
errCh := make(chan error, 2)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go allocPromoter(errCh, ctx, state, codec, n1.ID, s1.logger)
go allocPromoter(errCh, ctx, state, codec, n2.ID, s1.logger)
go allocPromoter(errCh, ctx, store, codec, n1.ID, s1.logger)
go allocPromoter(errCh, ctx, store, codec, n2.ID, s1.logger)
testutil.WaitForResult(func() (bool, error) {
allocs, err := state.AllocsByNode(nil, n2.ID)
allocs, err := store.AllocsByNode(nil, n2.ID)
if err != nil {
return false, err
}
@ -992,7 +992,7 @@ func TestDrainer_MultipleNSes_ServiceOnly(t *testing.T) {
if err := checkAllocPromoter(errCh); err != nil {
return false, err
}
node, err := state.NodeByID(nil, n1.ID)
node, err := store.NodeByID(nil, n1.ID)
if err != nil {
return false, err
}
@ -1002,7 +1002,7 @@ func TestDrainer_MultipleNSes_ServiceOnly(t *testing.T) {
})
// Check we got the right events
node, err := state.NodeByID(nil, n1.ID)
node, err := store.NodeByID(nil, n1.ID)
require.NoError(err)
// sometimes test gets a duplicate node drain complete event
require.GreaterOrEqualf(len(node.Events), 3, "unexpected number of events: %v", node.Events)

View File

@ -12,6 +12,7 @@ import (
"github.com/hashicorp/nomad/acl"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/state/paginator"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/scheduler"
)
@ -21,30 +22,6 @@ const (
DefaultDequeueTimeout = time.Second
)
// EvalPaginationIterator is a wrapper over a go-memdb iterator that implements
// the paginator Iterator interface.
type EvalPaginationIterator struct {
iter memdb.ResultIterator
byCreateIndex bool
}
func (it EvalPaginationIterator) Next() (string, interface{}) {
raw := it.iter.Next()
if raw == nil {
return "", nil
}
eval := raw.(*structs.Evaluation)
token := eval.ID
// prefix the pagination token by CreateIndex to keep it properly sorted.
if it.byCreateIndex {
token = fmt.Sprintf("%v-%v", eval.CreateIndex, eval.ID)
}
return token, eval
}
// Eval endpoint is used for eval interactions
type Eval struct {
srv *Server
@ -431,6 +408,7 @@ func (e *Eval) List(args *structs.EvalListRequest, reply *structs.EvalListRespon
}
// Setup the blocking query
sort := state.SortOption(args.Reverse)
opts := blockingOptions{
queryOpts: &args.QueryOptions,
queryMeta: &reply.QueryMeta,
@ -438,17 +416,25 @@ func (e *Eval) List(args *structs.EvalListRequest, reply *structs.EvalListRespon
// Scan all the evaluations
var err error
var iter memdb.ResultIterator
var evalIter EvalPaginationIterator
var opts paginator.StructsTokenizerOptions
if prefix := args.QueryOptions.Prefix; prefix != "" {
iter, err = store.EvalsByIDPrefix(ws, namespace, prefix)
evalIter.byCreateIndex = false
opts = paginator.StructsTokenizerOptions{
WithID: true,
}
} else if namespace != structs.AllNamespacesSentinel {
iter, err = store.EvalsByNamespaceOrdered(ws, namespace, args.Ascending)
evalIter.byCreateIndex = true
iter, err = store.EvalsByNamespaceOrdered(ws, namespace, sort)
opts = paginator.StructsTokenizerOptions{
WithCreateIndex: true,
WithID: true,
}
} else {
iter, err = store.Evals(ws, args.Ascending)
evalIter.byCreateIndex = true
iter, err = store.Evals(ws, sort)
opts = paginator.StructsTokenizerOptions{
WithCreateIndex: true,
WithID: true,
}
}
if err != nil {
return err
@ -460,10 +446,11 @@ func (e *Eval) List(args *structs.EvalListRequest, reply *structs.EvalListRespon
}
return false
})
evalIter.iter = iter
tokenizer := paginator.NewStructsTokenizer(iter, opts)
var evals []*structs.Evaluation
paginator, err := state.NewPaginator(evalIter, args.QueryOptions,
paginator, err := paginator.NewPaginator(iter, tokenizer, nil, args.QueryOptions,
func(raw interface{}) error {
eval := raw.(*structs.Evaluation)
evals = append(evals, eval)

View File

@ -751,40 +751,12 @@ func TestEvalEndpoint_List_order(t *testing.T) {
err = s1.fsm.State().UpsertEvals(structs.MsgTypeTestSetup, 1003, []*structs.Evaluation{eval2})
require.NoError(t, err)
t.Run("descending", func(t *testing.T) {
// Lookup the evaluations in reverse chronological order
t.Run("default", func(t *testing.T) {
// Lookup the evaluations in the default order (oldest first)
get := &structs.EvalListRequest{
QueryOptions: structs.QueryOptions{
Region: "global",
Namespace: "*",
Ascending: false,
},
}
var resp structs.EvalListResponse
err = msgpackrpc.CallWithCodec(codec, "Eval.List", get, &resp)
require.NoError(t, err)
require.Equal(t, uint64(1003), resp.Index)
require.Len(t, resp.Evaluations, 3)
// Assert returned order is by CreateIndex (descending)
require.Equal(t, uint64(1002), resp.Evaluations[0].CreateIndex)
require.Equal(t, uuid3, resp.Evaluations[0].ID)
require.Equal(t, uint64(1001), resp.Evaluations[1].CreateIndex)
require.Equal(t, uuid2, resp.Evaluations[1].ID)
require.Equal(t, uint64(1000), resp.Evaluations[2].CreateIndex)
require.Equal(t, uuid1, resp.Evaluations[2].ID)
})
t.Run("ascending", func(t *testing.T) {
// Lookup the evaluations in reverse chronological order (newest first)
get := &structs.EvalListRequest{
QueryOptions: structs.QueryOptions{
Region: "global",
Namespace: "*",
Ascending: true,
},
}
@ -805,13 +777,13 @@ func TestEvalEndpoint_List_order(t *testing.T) {
require.Equal(t, uuid3, resp.Evaluations[2].ID)
})
t.Run("descending", func(t *testing.T) {
// Lookup the evaluations in chronological order (oldest first)
t.Run("reverse", func(t *testing.T) {
// Lookup the evaluations in reverse order (newest first)
get := &structs.EvalListRequest{
QueryOptions: structs.QueryOptions{
Region: "global",
Namespace: "*",
Ascending: false,
Reverse: true,
},
}
@ -831,7 +803,6 @@ func TestEvalEndpoint_List_order(t *testing.T) {
require.Equal(t, uint64(1000), resp.Evaluations[2].CreateIndex)
require.Equal(t, uuid1, resp.Evaluations[2].ID)
})
}
func TestEvalEndpoint_ListAllNamespaces(t *testing.T) {
@ -1084,7 +1055,7 @@ func TestEvalEndpoint_List_PaginationFiltering(t *testing.T) {
"aaaa1111-3350-4b4b-d185-0e1992ed43e9",
"aaaaaa22-3350-4b4b-d185-0e1992ed43e9",
},
expectedNextToken: "1003-aaaaaaaa-3350-4b4b-d185-0e1992ed43e9", // next one in default namespace
expectedNextToken: "1003.aaaaaaaa-3350-4b4b-d185-0e1992ed43e9", // next one in default namespace
},
{
name: "test02 size-2 page-1 default NS with prefix",
@ -1099,8 +1070,8 @@ func TestEvalEndpoint_List_PaginationFiltering(t *testing.T) {
{
name: "test03 size-2 page-2 default NS",
pageSize: 2,
nextToken: "1003-aaaaaaaa-3350-4b4b-d185-0e1992ed43e9",
expectedNextToken: "1005-aaaaaacc-3350-4b4b-d185-0e1992ed43e9",
nextToken: "1003.aaaaaaaa-3350-4b4b-d185-0e1992ed43e9",
expectedNextToken: "1005.aaaaaacc-3350-4b4b-d185-0e1992ed43e9",
expectedIDs: []string{
"aaaaaaaa-3350-4b4b-d185-0e1992ed43e9",
"aaaaaabb-3350-4b4b-d185-0e1992ed43e9",
@ -1123,7 +1094,7 @@ func TestEvalEndpoint_List_PaginationFiltering(t *testing.T) {
filterJobID: "example",
filterStatus: "pending",
// aaaaaaaa, bb, and cc are filtered by status
expectedNextToken: "1006-aaaaaadd-3350-4b4b-d185-0e1992ed43e9",
expectedNextToken: "1006.aaaaaadd-3350-4b4b-d185-0e1992ed43e9",
expectedIDs: []string{
"aaaa1111-3350-4b4b-d185-0e1992ed43e9",
"aaaaaa22-3350-4b4b-d185-0e1992ed43e9",
@ -1159,7 +1130,7 @@ func TestEvalEndpoint_List_PaginationFiltering(t *testing.T) {
pageSize: 3, // reads off the end
filterJobID: "example",
filterStatus: "pending",
nextToken: "1003-aaaaaaaa-3350-4b4b-d185-0e1992ed43e9",
nextToken: "1003.aaaaaaaa-3350-4b4b-d185-0e1992ed43e9",
expectedNextToken: "",
expectedIDs: []string{
"aaaaaadd-3350-4b4b-d185-0e1992ed43e9",
@ -1183,8 +1154,8 @@ func TestEvalEndpoint_List_PaginationFiltering(t *testing.T) {
name: "test10 size-2 page-2 all namespaces",
namespace: "*",
pageSize: 2,
nextToken: "1002-aaaaaa33-3350-4b4b-d185-0e1992ed43e9",
expectedNextToken: "1004-aaaaaabb-3350-4b4b-d185-0e1992ed43e9",
nextToken: "1002.aaaaaa33-3350-4b4b-d185-0e1992ed43e9",
expectedNextToken: "1004.aaaaaabb-3350-4b4b-d185-0e1992ed43e9",
expectedIDs: []string{
"aaaaaa33-3350-4b4b-d185-0e1992ed43e9",
"aaaaaaaa-3350-4b4b-d185-0e1992ed43e9",
@ -1228,7 +1199,7 @@ func TestEvalEndpoint_List_PaginationFiltering(t *testing.T) {
name: "test16 go-bexpr filter with pagination",
filter: `JobID == "example"`,
pageSize: 2,
expectedNextToken: "1003-aaaaaaaa-3350-4b4b-d185-0e1992ed43e9",
expectedNextToken: "1003.aaaaaaaa-3350-4b4b-d185-0e1992ed43e9",
expectedIDs: []string{
"aaaa1111-3350-4b4b-d185-0e1992ed43e9",
"aaaaaa22-3350-4b4b-d185-0e1992ed43e9",
@ -1267,8 +1238,8 @@ func TestEvalEndpoint_List_PaginationFiltering(t *testing.T) {
{
name: "test22 non-lexicographic order",
pageSize: 1,
nextToken: "1009-00000111-3350-4b4b-d185-0e1992ed43e9",
expectedNextToken: "1010-00000222-3350-4b4b-d185-0e1992ed43e9",
nextToken: "1009.00000111-3350-4b4b-d185-0e1992ed43e9",
expectedNextToken: "1010.00000222-3350-4b4b-d185-0e1992ed43e9",
expectedIDs: []string{
"00000111-3350-4b4b-d185-0e1992ed43e9",
},
@ -1276,8 +1247,8 @@ func TestEvalEndpoint_List_PaginationFiltering(t *testing.T) {
{
name: "test23 same index",
pageSize: 1,
nextToken: "1010-00000222-3350-4b4b-d185-0e1992ed43e9",
expectedNextToken: "1010-00000333-3350-4b4b-d185-0e1992ed43e9",
nextToken: "1010.00000222-3350-4b4b-d185-0e1992ed43e9",
expectedNextToken: "1010.00000333-3350-4b4b-d185-0e1992ed43e9",
expectedIDs: []string{
"00000222-3350-4b4b-d185-0e1992ed43e9",
},
@ -1285,7 +1256,7 @@ func TestEvalEndpoint_List_PaginationFiltering(t *testing.T) {
{
name: "test24 missing index",
pageSize: 1,
nextToken: "1011-e9522802-0cd8-4b1d-9c9e-ab3d97938371",
nextToken: "1011.e9522802-0cd8-4b1d-9c9e-ab3d97938371",
expectedIDs: []string{
"bbbb1111-3350-4b4b-d185-0e1992ed43e9",
},
@ -1304,7 +1275,6 @@ func TestEvalEndpoint_List_PaginationFiltering(t *testing.T) {
PerPage: tc.pageSize,
NextToken: tc.nextToken,
Filter: tc.filter,
Ascending: true, // counting up is easier to think about
},
}
req.AuthToken = aclToken

View File

@ -1704,16 +1704,16 @@ func (n *nomadFSM) Restore(old io.ReadCloser) error {
// failLeakedDeployments is used to fail deployments that do not have a job.
// This state is a broken invariant that should not occur since 0.8.X.
func (n *nomadFSM) failLeakedDeployments(state *state.StateStore) error {
func (n *nomadFSM) failLeakedDeployments(store *state.StateStore) error {
// Scan for deployments that are referencing a job that no longer exists.
// This could happen if multiple deployments were created for a given job
// and thus the older deployment leaks and then the job is removed.
iter, err := state.Deployments(nil, false)
iter, err := store.Deployments(nil, state.SortDefault)
if err != nil {
return fmt.Errorf("failed to query deployments: %v", err)
}
dindex, err := state.Index("deployment")
dindex, err := store.Index("deployment")
if err != nil {
return fmt.Errorf("couldn't fetch index of deployments table: %v", err)
}
@ -1733,7 +1733,7 @@ func (n *nomadFSM) failLeakedDeployments(state *state.StateStore) error {
}
// Find the job
job, err := state.JobByID(nil, d.Namespace, d.JobID)
job, err := store.JobByID(nil, d.Namespace, d.JobID)
if err != nil {
return fmt.Errorf("failed to lookup job %s from deployment %q: %v", d.JobID, d.ID, err)
}
@ -1747,7 +1747,7 @@ func (n *nomadFSM) failLeakedDeployments(state *state.StateStore) error {
failed := d.Copy()
failed.Status = structs.DeploymentStatusCancelled
failed.StatusDescription = structs.DeploymentStatusDescriptionStoppedJob
if err := state.UpsertDeployment(dindex, failed); err != nil {
if err := store.UpsertDeployment(dindex, failed); err != nil {
return fmt.Errorf("failed to mark leaked deployment %q as failed: %v", failed.ID, err)
}
}
@ -2099,7 +2099,7 @@ func (s *nomadSnapshot) persistAllocs(sink raft.SnapshotSink,
encoder *codec.Encoder) error {
// Get all the allocations
ws := memdb.NewWatchSet()
allocs, err := s.snap.Allocs(ws)
allocs, err := s.snap.Allocs(ws, state.SortDefault)
if err != nil {
return err
}
@ -2250,7 +2250,7 @@ func (s *nomadSnapshot) persistDeployments(sink raft.SnapshotSink,
encoder *codec.Encoder) error {
// Get all the jobs
ws := memdb.NewWatchSet()
deployments, err := s.snap.Deployments(ws, false)
deployments, err := s.snap.Deployments(ws, state.SortDefault)
if err != nil {
return err
}
@ -2306,7 +2306,7 @@ func (s *nomadSnapshot) persistACLTokens(sink raft.SnapshotSink,
encoder *codec.Encoder) error {
// Get all the policies
ws := memdb.NewWatchSet()
tokens, err := s.snap.ACLTokens(ws)
tokens, err := s.snap.ACLTokens(ws, state.SortDefault)
if err != nil {
return err
}

View File

@ -3,6 +3,7 @@ package nomad
import (
"context"
"fmt"
"net/http"
"sort"
"strings"
"time"
@ -20,6 +21,7 @@ import (
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/state/paginator"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/scheduler"
)
@ -1339,15 +1341,29 @@ func (j *Job) List(args *structs.JobListRequest, reply *structs.JobListResponse)
}
defer metrics.MeasureSince([]string{"nomad", "job", "list"}, time.Now())
if args.RequestNamespace() == structs.AllNamespacesSentinel {
return j.listAllNamespaces(args, reply)
}
namespace := args.RequestNamespace()
var allow func(string) bool
// Check for list-job permissions
if aclObj, err := j.srv.ResolveToken(args.AuthToken); err != nil {
aclObj, err := j.srv.ResolveToken(args.AuthToken)
switch {
case err != nil:
return err
} else if aclObj != nil && !aclObj.AllowNsOp(args.RequestNamespace(), acl.NamespaceCapabilityListJobs) {
case aclObj == nil:
allow = func(string) bool {
return true
}
case namespace == structs.AllNamespacesSentinel:
allow = func(ns string) bool {
return aclObj.AllowNsOp(ns, acl.NamespaceCapabilityListJobs)
}
case !aclObj.AllowNsOp(namespace, acl.NamespaceCapabilityListJobs):
return structs.ErrPermissionDenied
default:
allow = func(string) bool {
return true
}
}
// Setup the blocking query
@ -1358,106 +1374,65 @@ func (j *Job) List(args *structs.JobListRequest, reply *structs.JobListResponse)
// Capture all the jobs
var err error
var iter memdb.ResultIterator
if prefix := args.QueryOptions.Prefix; prefix != "" {
iter, err = state.JobsByIDPrefix(ws, args.RequestNamespace(), prefix)
} else {
iter, err = state.JobsByNamespace(ws, args.RequestNamespace())
}
if err != nil {
return err
}
var jobs []*structs.JobListStub
for {
raw := iter.Next()
if raw == nil {
break
}
job := raw.(*structs.Job)
summary, err := state.JobSummaryByID(ws, args.RequestNamespace(), job.ID)
if err != nil {
return fmt.Errorf("unable to look up summary for job: %v", job.ID)
}
jobs = append(jobs, job.Stub(summary))
}
reply.Jobs = jobs
// Use the last index that affected the jobs table or summary
jindex, err := state.Index("jobs")
if err != nil {
return err
}
sindex, err := state.Index("job_summary")
if err != nil {
return err
}
reply.Index = helper.Uint64Max(jindex, sindex)
// Set the query response
j.srv.setQueryMeta(&reply.QueryMeta)
return nil
}}
return j.srv.blockingRPC(&opts)
}
// listAllNamespaces lists all jobs across all namespaces
func (j *Job) listAllNamespaces(args *structs.JobListRequest, reply *structs.JobListResponse) error {
// Check for list-job permissions
aclObj, err := j.srv.ResolveToken(args.AuthToken)
if err != nil {
return err
}
prefix := args.QueryOptions.Prefix
allow := func(ns string) bool {
return aclObj.AllowNsOp(ns, acl.NamespaceCapabilityListJobs)
}
// Setup the blocking query
opts := blockingOptions{
queryOpts: &args.QueryOptions,
queryMeta: &reply.QueryMeta,
run: func(ws memdb.WatchSet, state *state.StateStore) error {
// check if user has permission to all namespaces
allowedNSes, err := allowedNSes(aclObj, state, allow)
allowableNamespaces, err := allowedNSes(aclObj, state, allow)
if err == structs.ErrPermissionDenied {
// return empty jobs if token isn't authorized for any
// namespace, matching other endpoints
reply.Jobs = []*structs.JobListStub{}
return nil
reply.Jobs = make([]*structs.JobListStub, 0)
} else if err != nil {
return err
}
// Capture all the jobs
iter, err := state.Jobs(ws)
if err != nil {
return err
}
var jobs []*structs.JobListStub
for {
raw := iter.Next()
if raw == nil {
break
} else {
if prefix := args.QueryOptions.Prefix; prefix != "" {
iter, err = state.JobsByIDPrefix(ws, namespace, prefix)
} else if namespace != structs.AllNamespacesSentinel {
iter, err = state.JobsByNamespace(ws, namespace)
} else {
iter, err = state.Jobs(ws)
}
job := raw.(*structs.Job)
if allowedNSes != nil && !allowedNSes[job.Namespace] {
// not permitted to this name namespace
continue
}
if prefix != "" && !strings.HasPrefix(job.ID, prefix) {
continue
}
summary, err := state.JobSummaryByID(ws, job.Namespace, job.ID)
if err != nil {
return fmt.Errorf("unable to look up summary for job: %v", job.ID)
return err
}
stub := job.Stub(summary)
jobs = append(jobs, stub)
tokenizer := paginator.NewStructsTokenizer(
iter,
paginator.StructsTokenizerOptions{
WithNamespace: true,
WithID: true,
},
)
filters := []paginator.Filter{
paginator.NamespaceFilter{
AllowableNamespaces: allowableNamespaces,
},
}
var jobs []*structs.JobListStub
paginator, err := paginator.NewPaginator(iter, tokenizer, filters, args.QueryOptions,
func(raw interface{}) error {
job := raw.(*structs.Job)
summary, err := state.JobSummaryByID(ws, namespace, job.ID)
if err != nil {
return fmt.Errorf("unable to look up summary for job: %v", job.ID)
}
jobs = append(jobs, job.Stub(summary))
return nil
})
if err != nil {
return structs.NewErrRPCCodedf(
http.StatusBadRequest, "failed to create result paginator: %v", err)
}
nextToken, err := paginator.Page()
if err != nil {
return structs.NewErrRPCCodedf(
http.StatusBadRequest, "failed to read result page: %v", err)
}
reply.QueryMeta.NextToken = nextToken
reply.Jobs = jobs
}
reply.Jobs = jobs
// Use the last index that affected the jobs table or summary
jindex, err := state.Index("jobs")
@ -1475,7 +1450,6 @@ func (j *Job) listAllNamespaces(args *structs.JobListRequest, reply *structs.Job
return nil
}}
return j.srv.blockingRPC(&opts)
}
// Allocations is used to list the allocations for a job

View File

@ -5151,6 +5151,184 @@ func TestJobEndpoint_ListJobs_Blocking(t *testing.T) {
}
}
func TestJobEndpoint_ListJobs_PaginationFiltering(t *testing.T) {
t.Parallel()
s1, _, cleanupS1 := TestACLServer(t, nil)
defer cleanupS1()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
// create a set of jobs. these are in the order that the state store will
// return them from the iterator (sorted by key) for ease of writing tests
mocks := []struct {
name string
namespace string
status string
}{
{name: "job-01"}, // 0
{name: "job-02"}, // 1
{name: "job-03", namespace: "non-default"}, // 2
{name: "job-04"}, // 3
{name: "job-05", status: structs.JobStatusRunning}, // 4
{name: "job-06", status: structs.JobStatusRunning}, // 5
{}, // 6, missing job
{name: "job-08"}, // 7
{name: "job-03", namespace: "other"}, // 8, same name but in another namespace
}
state := s1.fsm.State()
require.NoError(t, state.UpsertNamespaces(999, []*structs.Namespace{{Name: "non-default"}, {Name: "other"}}))
for i, m := range mocks {
if m.name == "" {
continue
}
index := 1000 + uint64(i)
job := mock.Job()
job.ID = m.name
job.Name = m.name
job.Status = m.status
if m.namespace != "" { // defaults to "default"
job.Namespace = m.namespace
}
job.CreateIndex = index
require.NoError(t, state.UpsertJob(structs.MsgTypeTestSetup, index, job))
}
aclToken := mock.CreatePolicyAndToken(t, state, 1100, "test-valid-read",
mock.NamespacePolicy("*", "read", nil)).
SecretID
cases := []struct {
name string
namespace string
prefix string
filter string
nextToken string
pageSize int32
expectedNextToken string
expectedIDs []string
expectedError string
}{
{
name: "test01 size-2 page-1 default NS",
pageSize: 2,
expectedNextToken: "default.job-04",
expectedIDs: []string{"job-01", "job-02"},
},
{
name: "test02 size-2 page-1 default NS with prefix",
prefix: "job",
pageSize: 2,
expectedNextToken: "default.job-04",
expectedIDs: []string{"job-01", "job-02"},
},
{
name: "test03 size-2 page-2 default NS",
pageSize: 2,
nextToken: "default.job-04",
expectedNextToken: "default.job-06",
expectedIDs: []string{"job-04", "job-05"},
},
{
name: "test04 size-2 page-2 default NS with prefix",
prefix: "job",
pageSize: 2,
nextToken: "default.job-04",
expectedNextToken: "default.job-06",
expectedIDs: []string{"job-04", "job-05"},
},
{
name: "test05 no valid results with filters and prefix",
prefix: "not-job",
pageSize: 2,
nextToken: "",
expectedIDs: []string{},
},
{
name: "test06 go-bexpr filter",
namespace: "*",
filter: `Name matches "job-0[123]"`,
expectedIDs: []string{"job-01", "job-02", "job-03", "job-03"},
},
{
name: "test07 go-bexpr filter with pagination",
namespace: "*",
filter: `Name matches "job-0[123]"`,
pageSize: 2,
expectedNextToken: "non-default.job-03",
expectedIDs: []string{"job-01", "job-02"},
},
{
name: "test08 go-bexpr filter in namespace",
namespace: "non-default",
filter: `Status == "pending"`,
expectedIDs: []string{"job-03"},
},
{
name: "test09 go-bexpr invalid expression",
filter: `NotValid`,
expectedError: "failed to read filter expression",
},
{
name: "test10 go-bexpr invalid field",
filter: `InvalidField == "value"`,
expectedError: "error finding value in datum",
},
{
name: "test11 missing index",
pageSize: 1,
nextToken: "default.job-07",
expectedIDs: []string{
"job-08",
},
},
{
name: "test12 same name but different NS",
namespace: "*",
pageSize: 1,
filter: `Name == "job-03"`,
expectedNextToken: "other.job-03",
expectedIDs: []string{
"job-03",
},
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
req := &structs.JobListRequest{
QueryOptions: structs.QueryOptions{
Region: "global",
Namespace: tc.namespace,
Prefix: tc.prefix,
Filter: tc.filter,
PerPage: tc.pageSize,
NextToken: tc.nextToken,
},
}
req.AuthToken = aclToken
var resp structs.JobListResponse
err := msgpackrpc.CallWithCodec(codec, "Job.List", req, &resp)
if tc.expectedError == "" {
require.NoError(t, err)
} else {
require.Error(t, err)
require.Contains(t, err.Error(), tc.expectedError)
return
}
gotIDs := []string{}
for _, job := range resp.Jobs {
gotIDs = append(gotIDs, job.ID)
}
require.Equal(t, tc.expectedIDs, gotIDs, "unexpected page of jobs")
require.Equal(t, tc.expectedNextToken, resp.QueryMeta.NextToken, "unexpected NextToken")
})
}
}
func TestJobEndpoint_Allocations(t *testing.T) {
t.Parallel()

View File

@ -394,42 +394,42 @@ func wildcard(namespace string) bool {
return namespace == structs.AllNamespacesSentinel
}
func getFuzzyResourceIterator(context structs.Context, aclObj *acl.ACL, namespace string, ws memdb.WatchSet, state *state.StateStore) (memdb.ResultIterator, error) {
func getFuzzyResourceIterator(context structs.Context, aclObj *acl.ACL, namespace string, ws memdb.WatchSet, store *state.StateStore) (memdb.ResultIterator, error) {
switch context {
case structs.Jobs:
if wildcard(namespace) {
iter, err := state.Jobs(ws)
iter, err := store.Jobs(ws)
return nsCapIterFilter(iter, err, aclObj)
}
return state.JobsByNamespace(ws, namespace)
return store.JobsByNamespace(ws, namespace)
case structs.Allocs:
if wildcard(namespace) {
iter, err := state.Allocs(ws)
iter, err := store.Allocs(ws, state.SortDefault)
return nsCapIterFilter(iter, err, aclObj)
}
return state.AllocsByNamespace(ws, namespace)
return store.AllocsByNamespace(ws, namespace)
case structs.Nodes:
if wildcard(namespace) {
iter, err := state.Nodes(ws)
iter, err := store.Nodes(ws)
return nsCapIterFilter(iter, err, aclObj)
}
return state.Nodes(ws)
return store.Nodes(ws)
case structs.Plugins:
if wildcard(namespace) {
iter, err := state.CSIPlugins(ws)
iter, err := store.CSIPlugins(ws)
return nsCapIterFilter(iter, err, aclObj)
}
return state.CSIPlugins(ws)
return store.CSIPlugins(ws)
case structs.Namespaces:
iter, err := state.Namespaces(ws)
iter, err := store.Namespaces(ws)
return nsCapIterFilter(iter, err, aclObj)
default:
return getEnterpriseFuzzyResourceIter(context, aclObj, namespace, ws, state)
return getEnterpriseFuzzyResourceIter(context, aclObj, namespace, ws, store)
}
}

View File

@ -0,0 +1,41 @@
package paginator
// Filter is the interface that must be implemented to skip values when using
// the Paginator.
type Filter interface {
// Evaluate returns true if the element should be added to the page.
Evaluate(interface{}) (bool, error)
}
// GenericFilter wraps a function that can be used to provide simple or in
// scope filtering.
type GenericFilter struct {
Allow func(interface{}) (bool, error)
}
func (f GenericFilter) Evaluate(raw interface{}) (bool, error) {
return f.Allow(raw)
}
// NamespaceFilter skips elements with a namespace value that is not in the
// allowable set.
type NamespaceFilter struct {
AllowableNamespaces map[string]bool
}
func (f NamespaceFilter) Evaluate(raw interface{}) (bool, error) {
if raw == nil {
return false, nil
}
item, _ := raw.(NamespaceGetter)
namespace := item.GetNamespace()
if f.AllowableNamespaces == nil {
return true, nil
}
if f.AllowableNamespaces[namespace] {
return true, nil
}
return false, nil
}

View File

@ -1,15 +1,109 @@
package state
package paginator
import (
"testing"
"time"
"github.com/hashicorp/go-bexpr"
memdb "github.com/hashicorp/go-memdb"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/stretchr/testify/require"
)
func TestGenericFilter(t *testing.T) {
t.Parallel()
ids := []string{"0", "1", "2", "3", "4", "5", "6", "7", "8", "9"}
filters := []Filter{GenericFilter{
Allow: func(raw interface{}) (bool, error) {
result := raw.(*mockObject)
return result.id > "5", nil
},
}}
iter := newTestIterator(ids)
tokenizer := testTokenizer{}
opts := structs.QueryOptions{
PerPage: 3,
}
results := []string{}
paginator, err := NewPaginator(iter, tokenizer, filters, opts,
func(raw interface{}) error {
result := raw.(*mockObject)
results = append(results, result.id)
return nil
},
)
require.NoError(t, err)
nextToken, err := paginator.Page()
require.NoError(t, err)
expected := []string{"6", "7", "8"}
require.Equal(t, "9", nextToken)
require.Equal(t, expected, results)
}
func TestNamespaceFilter(t *testing.T) {
t.Parallel()
mocks := []*mockObject{
{namespace: "default"},
{namespace: "dev"},
{namespace: "qa"},
{namespace: "region-1"},
}
cases := []struct {
name string
allowable map[string]bool
expected []string
}{
{
name: "nil map",
expected: []string{"default", "dev", "qa", "region-1"},
},
{
name: "allow default",
allowable: map[string]bool{"default": true},
expected: []string{"default"},
},
{
name: "allow multiple",
allowable: map[string]bool{"default": true, "dev": false, "qa": true},
expected: []string{"default", "qa"},
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
filters := []Filter{NamespaceFilter{
AllowableNamespaces: tc.allowable,
}}
iter := newTestIteratorWithMocks(mocks)
tokenizer := testTokenizer{}
opts := structs.QueryOptions{
PerPage: int32(len(mocks)),
}
results := []string{}
paginator, err := NewPaginator(iter, tokenizer, filters, opts,
func(raw interface{}) error {
result := raw.(*mockObject)
results = append(results, result.namespace)
return nil
},
)
require.NoError(t, err)
nextToken, err := paginator.Page()
require.NoError(t, err)
require.Equal(t, "", nextToken)
require.Equal(t, tc.expected, results)
})
}
}
func BenchmarkEvalListFilter(b *testing.B) {
const evalCount = 100_000
@ -76,9 +170,10 @@ func BenchmarkEvalListFilter(b *testing.B) {
for i := 0; i < b.N; i++ {
iter, _ := state.EvalsByNamespace(nil, structs.DefaultNamespace)
evalIter := evalPaginationIterator{iter}
tokenizer := NewStructsTokenizer(iter, StructsTokenizerOptions{WithID: true})
var evals []*structs.Evaluation
paginator, err := NewPaginator(evalIter, opts, func(raw interface{}) error {
paginator, err := NewPaginator(iter, tokenizer, nil, opts, func(raw interface{}) error {
eval := raw.(*structs.Evaluation)
evals = append(evals, eval)
return nil
@ -100,9 +195,10 @@ func BenchmarkEvalListFilter(b *testing.B) {
for i := 0; i < b.N; i++ {
iter, _ := state.Evals(nil, false)
evalIter := evalPaginationIterator{iter}
tokenizer := NewStructsTokenizer(iter, StructsTokenizerOptions{WithID: true})
var evals []*structs.Evaluation
paginator, err := NewPaginator(evalIter, opts, func(raw interface{}) error {
paginator, err := NewPaginator(iter, tokenizer, nil, opts, func(raw interface{}) error {
eval := raw.(*structs.Evaluation)
evals = append(evals, eval)
return nil
@ -137,9 +233,10 @@ func BenchmarkEvalListFilter(b *testing.B) {
for i := 0; i < b.N; i++ {
iter, _ := state.EvalsByNamespace(nil, structs.DefaultNamespace)
evalIter := evalPaginationIterator{iter}
tokenizer := NewStructsTokenizer(iter, StructsTokenizerOptions{WithID: true})
var evals []*structs.Evaluation
paginator, err := NewPaginator(evalIter, opts, func(raw interface{}) error {
paginator, err := NewPaginator(iter, tokenizer, nil, opts, func(raw interface{}) error {
eval := raw.(*structs.Evaluation)
evals = append(evals, eval)
return nil
@ -175,9 +272,10 @@ func BenchmarkEvalListFilter(b *testing.B) {
for i := 0; i < b.N; i++ {
iter, _ := state.Evals(nil, false)
evalIter := evalPaginationIterator{iter}
tokenizer := NewStructsTokenizer(iter, StructsTokenizerOptions{WithID: true})
var evals []*structs.Evaluation
paginator, err := NewPaginator(evalIter, opts, func(raw interface{}) error {
paginator, err := NewPaginator(iter, tokenizer, nil, opts, func(raw interface{}) error {
eval := raw.(*structs.Evaluation)
evals = append(evals, eval)
return nil
@ -193,12 +291,12 @@ func BenchmarkEvalListFilter(b *testing.B) {
// -----------------
// BENCHMARK HELPER FUNCTIONS
func setupPopulatedState(b *testing.B, evalCount int) *StateStore {
func setupPopulatedState(b *testing.B, evalCount int) *state.StateStore {
evals := generateEvals(evalCount)
index := uint64(0)
var err error
state := TestStateStore(b)
state := state.TestStateStore(b)
for _, eval := range evals {
index++
err = state.UpsertEvals(
@ -235,17 +333,3 @@ func generateEval(i int, ns string) *structs.Evaluation {
ModifyTime: now,
}
}
type evalPaginationIterator struct {
iter memdb.ResultIterator
}
func (it evalPaginationIterator) Next() (string, interface{}) {
raw := it.iter.Next()
if raw == nil {
return "", nil
}
eval := raw.(*structs.Evaluation)
return eval.ID, eval
}

View File

@ -1,4 +1,4 @@
package state
package paginator
import (
"fmt"
@ -7,39 +7,37 @@ import (
"github.com/hashicorp/nomad/nomad/structs"
)
// Iterator is the interface that must be implemented to use the Paginator.
// Iterator is the interface that must be implemented to supply data to the
// Paginator.
type Iterator interface {
// Next returns the next element to be considered for pagination along with
// a token string used to uniquely identify elements in the iteration.
// Next returns the next element to be considered for pagination.
// The page will end if nil is returned.
// Tokens should have a stable order and the order must match the paginator
// ascending property.
Next() (string, interface{})
Next() interface{}
}
// Paginator is an iterator over a memdb.ResultIterator that returns
// only the expected number of pages.
// Paginator wraps an iterator and returns only the expected number of pages.
type Paginator struct {
iter Iterator
tokenizer Tokenizer
filters []Filter
perPage int32
itemCount int32
seekingToken string
nextToken string
ascending bool
reverse bool
nextTokenFound bool
pageErr error
// filterEvaluator is used to filter results using go-bexpr. It's nil if
// no filter expression is defined.
filterEvaluator *bexpr.Evaluator
// appendFunc is the function the caller should use to append raw
// entries to the results set. The object is guaranteed to be
// non-nil.
appendFunc func(interface{}) error
}
func NewPaginator(iter Iterator, opts structs.QueryOptions, appendFunc func(interface{}) error) (*Paginator, error) {
// NewPaginator returns a new Paginator.
func NewPaginator(iter Iterator, tokenizer Tokenizer, filters []Filter,
opts structs.QueryOptions, appendFunc func(interface{}) error) (*Paginator, error) {
var evaluator *bexpr.Evaluator
var err error
@ -48,21 +46,23 @@ func NewPaginator(iter Iterator, opts structs.QueryOptions, appendFunc func(inte
if err != nil {
return nil, fmt.Errorf("failed to read filter expression: %v", err)
}
filters = append(filters, evaluator)
}
return &Paginator{
iter: iter,
perPage: opts.PerPage,
seekingToken: opts.NextToken,
ascending: opts.Ascending,
nextTokenFound: opts.NextToken == "",
filterEvaluator: evaluator,
appendFunc: appendFunc,
iter: iter,
tokenizer: tokenizer,
filters: filters,
perPage: opts.PerPage,
seekingToken: opts.NextToken,
reverse: opts.Reverse,
nextTokenFound: opts.NextToken == "",
appendFunc: appendFunc,
}, nil
}
// Page populates a page by running the append function
// over all results. Returns the next token
// over all results. Returns the next token.
func (p *Paginator) Page() (string, error) {
DONE:
for {
@ -84,34 +84,36 @@ DONE:
}
func (p *Paginator) next() (interface{}, paginatorState) {
token, raw := p.iter.Next()
raw := p.iter.Next()
if raw == nil {
p.nextToken = ""
return nil, paginatorComplete
}
token := p.tokenizer.GetToken(raw)
// have we found the token we're seeking (if any)?
p.nextToken = token
var passedToken bool
if p.ascending {
passedToken = token < p.seekingToken
} else {
if p.reverse {
passedToken = token > p.seekingToken
} else {
passedToken = token < p.seekingToken
}
if !p.nextTokenFound && passedToken {
return nil, paginatorSkip
}
// apply filter if defined
if p.filterEvaluator != nil {
match, err := p.filterEvaluator.Evaluate(raw)
// apply filters if defined
for _, f := range p.filters {
allow, err := f.Evaluate(raw)
if err != nil {
p.pageErr = err
return nil, paginatorComplete
}
if !match {
if !allow {
return nil, paginatorSkip
}
}

View File

@ -1,4 +1,4 @@
package state
package paginator
import (
"errors"
@ -58,14 +58,14 @@ func TestPaginator(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
iter := newTestIterator(ids)
results := []string{}
tokenizer := testTokenizer{}
opts := structs.QueryOptions{
PerPage: tc.perPage,
NextToken: tc.nextToken,
}
paginator, err := NewPaginator(iter,
structs.QueryOptions{
PerPage: tc.perPage,
NextToken: tc.nextToken,
Ascending: true,
},
results := []string{}
paginator, err := NewPaginator(iter, tokenizer, nil, opts,
func(raw interface{}) error {
if tc.expectedError != "" {
return errors.New(tc.expectedError)
@ -94,27 +94,32 @@ func TestPaginator(t *testing.T) {
// helpers for pagination tests
// implements memdb.ResultIterator interface
// implements Iterator interface
type testResultIterator struct {
results chan interface{}
}
func (i testResultIterator) Next() (string, interface{}) {
func (i testResultIterator) Next() interface{} {
select {
case raw := <-i.results:
if raw == nil {
return "", nil
return nil
}
m := raw.(*mockObject)
return m.id, m
return m
default:
return "", nil
return nil
}
}
type mockObject struct {
id string
id string
namespace string
}
func (m *mockObject) GetNamespace() string {
return m.namespace
}
func newTestIterator(ids []string) testResultIterator {
@ -124,3 +129,18 @@ func newTestIterator(ids []string) testResultIterator {
}
return iter
}
func newTestIteratorWithMocks(mocks []*mockObject) testResultIterator {
iter := testResultIterator{results: make(chan interface{}, 20)}
for _, m := range mocks {
iter.results <- m
}
return iter
}
// implements Tokenizer interface
type testTokenizer struct{}
func (t testTokenizer) GetToken(raw interface{}) string {
return raw.(*mockObject).id
}

View File

@ -0,0 +1,82 @@
package paginator
import (
"fmt"
"strings"
)
// Tokenizer is the interface that must be implemented to provide pagination
// tokens to the Paginator.
type Tokenizer interface {
// GetToken returns the pagination token for the given element.
GetToken(interface{}) string
}
// IDGetter is the interface that must be implemented by structs that need to
// have their ID as part of the pagination token.
type IDGetter interface {
GetID() string
}
// NamespaceGetter is the interface that must be implemented by structs that
// need to have their Namespace as part of the pagination token.
type NamespaceGetter interface {
GetNamespace() string
}
// CreateIndexGetter is the interface that must be implemented by structs that
// need to have their CreateIndex as part of the pagination token.
type CreateIndexGetter interface {
GetCreateIndex() uint64
}
// StructsTokenizerOptions is the configuration provided to a StructsTokenizer.
type StructsTokenizerOptions struct {
WithCreateIndex bool
WithNamespace bool
WithID bool
}
// StructsTokenizer is an pagination token generator that can create different
// formats of pagination tokens based on common fields found in the structs
// package.
type StructsTokenizer struct {
opts StructsTokenizerOptions
}
// NewStructsTokenizer returns a new StructsTokenizer.
func NewStructsTokenizer(it Iterator, opts StructsTokenizerOptions) StructsTokenizer {
return StructsTokenizer{
opts: opts,
}
}
func (it StructsTokenizer) GetToken(raw interface{}) string {
if raw == nil {
return ""
}
parts := []string{}
if it.opts.WithCreateIndex {
token := raw.(CreateIndexGetter).GetCreateIndex()
parts = append(parts, fmt.Sprintf("%v", token))
}
if it.opts.WithNamespace {
token := raw.(NamespaceGetter).GetNamespace()
parts = append(parts, token)
}
if it.opts.WithID {
token := raw.(IDGetter).GetID()
parts = append(parts, token)
}
// Use a character that is not part of validNamespaceName as separator to
// avoid accidentally generating collisions.
// For example, namespace `a` and job `b-c` would collide with namespace
// `a-b` and job `c` into the same token `a-b-c`, since `-` is an allowed
// character in namespace.
return strings.Join(parts, ".")
}

View File

@ -0,0 +1,67 @@
package paginator
import (
"fmt"
"testing"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/stretchr/testify/require"
)
func TestStructsTokenizer(t *testing.T) {
j := mock.Job()
cases := []struct {
name string
opts StructsTokenizerOptions
expected string
}{
{
name: "ID",
opts: StructsTokenizerOptions{
WithID: true,
},
expected: fmt.Sprintf("%v", j.ID),
},
{
name: "Namespace.ID",
opts: StructsTokenizerOptions{
WithNamespace: true,
WithID: true,
},
expected: fmt.Sprintf("%v.%v", j.Namespace, j.ID),
},
{
name: "CreateIndex.Namespace.ID",
opts: StructsTokenizerOptions{
WithCreateIndex: true,
WithNamespace: true,
WithID: true,
},
expected: fmt.Sprintf("%v.%v.%v", j.CreateIndex, j.Namespace, j.ID),
},
{
name: "CreateIndex.ID",
opts: StructsTokenizerOptions{
WithCreateIndex: true,
WithID: true,
},
expected: fmt.Sprintf("%v.%v", j.CreateIndex, j.ID),
},
{
name: "CreateIndex.Namespace",
opts: StructsTokenizerOptions{
WithCreateIndex: true,
WithNamespace: true,
},
expected: fmt.Sprintf("%v.%v", j.CreateIndex, j.Namespace),
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
tokenizer := StructsTokenizer{opts: tc.opts}
require.Equal(t, tc.expected, tokenizer.GetToken(j))
})
}
}

View File

@ -530,7 +530,7 @@ func allocTableSchema() *memdb.TableSchema {
return &memdb.TableSchema{
Name: "allocs",
Indexes: map[string]*memdb.IndexSchema{
// Primary index is a UUID
// id index is used for direct lookup of allocation by ID.
"id": {
Name: "id",
AllowMissing: false,
@ -540,6 +540,26 @@ func allocTableSchema() *memdb.TableSchema {
},
},
// create index is used for listing allocations, ordering them by
// creation chronology. (Use a reverse iterator for newest first).
"create": {
Name: "create",
AllowMissing: false,
Unique: true,
Indexer: &memdb.CompoundIndex{
Indexes: []memdb.Indexer{
&memdb.UintFieldIndex{
Field: "CreateIndex",
},
&memdb.StringFieldIndex{
Field: "ID",
},
},
},
},
// namespace is used to lookup evaluations by namespace.
// todo(shoenig): i think we can deprecate this and other like it
"namespace": {
Name: "namespace",
AllowMissing: false,
@ -549,6 +569,31 @@ func allocTableSchema() *memdb.TableSchema {
},
},
// namespace_create index is used to lookup evaluations by namespace
// in their original chronological order based on CreateIndex.
//
// Use a prefix iterator (namespace_prefix) on a Namespace to iterate
// those evaluations in order of CreateIndex.
"namespace_create": {
Name: "namespace_create",
AllowMissing: false,
Unique: true,
Indexer: &memdb.CompoundIndex{
AllowMissing: false,
Indexes: []memdb.Indexer{
&memdb.StringFieldIndex{
Field: "Namespace",
},
&memdb.UintFieldIndex{
Field: "CreateIndex",
},
&memdb.StringFieldIndex{
Field: "ID",
},
},
},
},
// Node index is used to lookup allocations by node
"node": {
Name: "node",
@ -728,6 +773,21 @@ func aclTokenTableSchema() *memdb.TableSchema {
Field: "AccessorID",
},
},
"create": {
Name: "create",
AllowMissing: false,
Unique: true,
Indexer: &memdb.CompoundIndex{
Indexes: []memdb.Indexer{
&memdb.UintFieldIndex{
Field: "CreateIndex",
},
&memdb.StringFieldIndex{
Field: "AccessorID",
},
},
},
},
"secret": {
Name: "secret",
AllowMissing: false,

View File

@ -22,6 +22,19 @@ import (
// This can be a read or write transaction.
type Txn = *txn
// SortOption represents how results can be sorted.
type SortOption bool
const (
// SortDefault indicates that the result should be returned using the
// default go-memdb ResultIterator order.
SortDefault SortOption = false
// SortReverse indicates that the result should be returned using the
// reversed go-memdb ResultIterator order.
SortReverse SortOption = true
)
const (
// NodeRegisterEventReregistered is the message used when the node becomes
// reregistered.
@ -544,16 +557,17 @@ func (s *StateStore) upsertDeploymentImpl(index uint64, deployment *structs.Depl
return nil
}
func (s *StateStore) Deployments(ws memdb.WatchSet, ascending bool) (memdb.ResultIterator, error) {
func (s *StateStore) Deployments(ws memdb.WatchSet, sort SortOption) (memdb.ResultIterator, error) {
txn := s.db.ReadTxn()
var it memdb.ResultIterator
var err error
if ascending {
it, err = txn.Get("deployment", "create")
} else {
switch sort {
case SortReverse:
it, err = txn.GetReverse("deployment", "create")
default:
it, err = txn.Get("deployment", "create")
}
if err != nil {
@ -578,7 +592,7 @@ func (s *StateStore) DeploymentsByNamespace(ws memdb.WatchSet, namespace string)
return iter, nil
}
func (s *StateStore) DeploymentsByNamespaceOrdered(ws memdb.WatchSet, namespace string, ascending bool) (memdb.ResultIterator, error) {
func (s *StateStore) DeploymentsByNamespaceOrdered(ws memdb.WatchSet, namespace string, sort SortOption) (memdb.ResultIterator, error) {
txn := s.db.ReadTxn()
var (
@ -587,10 +601,11 @@ func (s *StateStore) DeploymentsByNamespaceOrdered(ws memdb.WatchSet, namespace
exact = terminate(namespace)
)
if ascending {
it, err = txn.Get("deployment", "namespace_create_prefix", exact)
} else {
switch sort {
case SortReverse:
it, err = txn.GetReverse("deployment", "namespace_create_prefix", exact)
default:
it, err = txn.Get("deployment", "namespace_create_prefix", exact)
}
if err != nil {
@ -1919,8 +1934,13 @@ func (s *StateStore) JobByIDTxn(ws memdb.WatchSet, namespace, id string, txn Txn
return nil, nil
}
// JobsByIDPrefix is used to lookup a job by prefix
// JobsByIDPrefix is used to lookup a job by prefix. If querying all namespaces
// the prefix will not be filtered by an index.
func (s *StateStore) JobsByIDPrefix(ws memdb.WatchSet, namespace, id string) (memdb.ResultIterator, error) {
if namespace == structs.AllNamespacesSentinel {
return s.jobsByIDPrefixAllNamespaces(ws, id)
}
txn := s.db.ReadTxn()
iter, err := txn.Get("jobs", "id_prefix", namespace, id)
@ -1933,6 +1953,30 @@ func (s *StateStore) JobsByIDPrefix(ws memdb.WatchSet, namespace, id string) (me
return iter, nil
}
func (s *StateStore) jobsByIDPrefixAllNamespaces(ws memdb.WatchSet, prefix string) (memdb.ResultIterator, error) {
txn := s.db.ReadTxn()
// Walk the entire jobs table
iter, err := txn.Get("jobs", "id")
if err != nil {
return nil, err
}
ws.Add(iter.WatchCh())
// Filter the iterator by ID prefix
f := func(raw interface{}) bool {
job, ok := raw.(*structs.Job)
if !ok {
return true
}
return !strings.HasPrefix(job.ID, prefix)
}
wrap := memdb.NewFilterIterator(iter, f)
return wrap, nil
}
// JobVersionsByID returns all the tracked versions of a job.
func (s *StateStore) JobVersionsByID(ws memdb.WatchSet, namespace, id string) ([]*structs.Job, error) {
txn := s.db.ReadTxn()
@ -3188,17 +3232,18 @@ func (s *StateStore) EvalsByJob(ws memdb.WatchSet, namespace, jobID string) ([]*
}
// Evals returns an iterator over all the evaluations in ascending or descending
// order of CreationIndex as determined by the ascending parameter.
func (s *StateStore) Evals(ws memdb.WatchSet, ascending bool) (memdb.ResultIterator, error) {
// order of CreationIndex as determined by the reverse parameter.
func (s *StateStore) Evals(ws memdb.WatchSet, sort SortOption) (memdb.ResultIterator, error) {
txn := s.db.ReadTxn()
var it memdb.ResultIterator
var err error
if ascending {
it, err = txn.Get("evals", "create")
} else {
switch sort {
case SortReverse:
it, err = txn.GetReverse("evals", "create")
default:
it, err = txn.Get("evals", "create")
}
if err != nil {
@ -3227,7 +3272,7 @@ func (s *StateStore) EvalsByNamespace(ws memdb.WatchSet, namespace string) (memd
return it, nil
}
func (s *StateStore) EvalsByNamespaceOrdered(ws memdb.WatchSet, namespace string, ascending bool) (memdb.ResultIterator, error) {
func (s *StateStore) EvalsByNamespaceOrdered(ws memdb.WatchSet, namespace string, sort SortOption) (memdb.ResultIterator, error) {
txn := s.db.ReadTxn()
var (
@ -3236,10 +3281,11 @@ func (s *StateStore) EvalsByNamespaceOrdered(ws memdb.WatchSet, namespace string
exact = terminate(namespace)
)
if ascending {
it, err = txn.Get("evals", "namespace_create_prefix", exact)
} else {
switch sort {
case SortReverse:
it, err = txn.GetReverse("evals", "namespace_create_prefix", exact)
default:
it, err = txn.Get("evals", "namespace_create_prefix", exact)
}
if err != nil {
@ -3609,6 +3655,10 @@ func allocNamespaceFilter(namespace string) func(interface{}) bool {
return true
}
if namespace == structs.AllNamespacesSentinel {
return false
}
return alloc.Namespace != namespace
}
}
@ -3766,19 +3816,52 @@ func (s *StateStore) AllocsByDeployment(ws memdb.WatchSet, deploymentID string)
return out, nil
}
// Allocs returns an iterator over all the evaluations
func (s *StateStore) Allocs(ws memdb.WatchSet) (memdb.ResultIterator, error) {
// Allocs returns an iterator over all the evaluations.
func (s *StateStore) Allocs(ws memdb.WatchSet, sort SortOption) (memdb.ResultIterator, error) {
txn := s.db.ReadTxn()
// Walk the entire table
iter, err := txn.Get("allocs", "id")
var it memdb.ResultIterator
var err error
switch sort {
case SortReverse:
it, err = txn.GetReverse("allocs", "create")
default:
it, err = txn.Get("allocs", "create")
}
if err != nil {
return nil, err
}
ws.Add(iter.WatchCh())
ws.Add(it.WatchCh())
return iter, nil
return it, nil
}
func (s *StateStore) AllocsByNamespaceOrdered(ws memdb.WatchSet, namespace string, sort SortOption) (memdb.ResultIterator, error) {
txn := s.db.ReadTxn()
var (
it memdb.ResultIterator
err error
exact = terminate(namespace)
)
switch sort {
case SortReverse:
it, err = txn.GetReverse("allocs", "namespace_create_prefix", exact)
default:
it, err = txn.Get("allocs", "namespace_create_prefix", exact)
}
if err != nil {
return nil, err
}
ws.Add(it.WatchCh())
return it, nil
}
// AllocsByNamespace returns an iterator over all the allocations in the
@ -5464,14 +5547,22 @@ func (s *StateStore) ACLTokenByAccessorIDPrefix(ws memdb.WatchSet, prefix string
}
// ACLTokens returns an iterator over all the tokens
func (s *StateStore) ACLTokens(ws memdb.WatchSet) (memdb.ResultIterator, error) {
func (s *StateStore) ACLTokens(ws memdb.WatchSet, sort SortOption) (memdb.ResultIterator, error) {
txn := s.db.ReadTxn()
// Walk the entire table
iter, err := txn.Get("acl_token", "id")
var iter memdb.ResultIterator
var err error
switch sort {
case SortReverse:
iter, err = txn.GetReverse("acl_token", "create")
default:
iter, err = txn.Get("acl_token", "create")
}
if err != nil {
return nil, err
}
ws.Add(iter.WatchCh())
return iter, nil
}

View File

@ -670,7 +670,7 @@ func TestStateStore_Deployments(t *testing.T) {
}
ws := memdb.NewWatchSet()
it, err := state.Deployments(ws, true)
it, err := state.Deployments(ws, SortDefault)
require.NoError(t, err)
var out []*structs.Deployment
@ -5432,7 +5432,7 @@ func TestStateStore_Allocs(t *testing.T) {
}
ws := memdb.NewWatchSet()
iter, err := state.Allocs(ws)
iter, err := state.Allocs(ws, SortDefault)
if err != nil {
t.Fatalf("err: %v", err)
}
@ -5480,7 +5480,7 @@ func TestStateStore_Allocs_PrevAlloc(t *testing.T) {
require.Nil(err)
ws := memdb.NewWatchSet()
iter, err := state.Allocs(ws)
iter, err := state.Allocs(ws, SortDefault)
require.Nil(err)
var out []*structs.Allocation
@ -7508,7 +7508,7 @@ func TestStateStore_BootstrapACLTokens(t *testing.T) {
t.Fatalf("expected error")
}
iter, err := state.ACLTokens(nil)
iter, err := state.ACLTokens(nil, SortDefault)
if err != nil {
t.Fatalf("err: %v", err)
}
@ -7602,7 +7602,7 @@ func TestStateStore_UpsertACLTokens(t *testing.T) {
assert.Equal(t, nil, err)
assert.Equal(t, tk2, out)
iter, err := state.ACLTokens(ws)
iter, err := state.ACLTokens(ws, SortDefault)
if err != nil {
t.Fatalf("err: %v", err)
}
@ -7669,7 +7669,7 @@ func TestStateStore_DeleteACLTokens(t *testing.T) {
t.Fatalf("bad: %#v", out)
}
iter, err := state.ACLTokens(ws)
iter, err := state.ACLTokens(ws, SortDefault)
if err != nil {
t.Fatalf("err: %v", err)
}

View File

@ -318,6 +318,32 @@ type CSIVolume struct {
ModifyIndex uint64
}
// GetID implements the IDGetter interface, required for pagination.
func (v *CSIVolume) GetID() string {
if v == nil {
return ""
}
return v.ID
}
// GetNamespace implements the NamespaceGetter interface, required for
// pagination.
func (v *CSIVolume) GetNamespace() string {
if v == nil {
return ""
}
return v.Namespace
}
// GetCreateIndex implements the CreateIndexGetter interface, required for
// pagination.
func (v *CSIVolume) GetCreateIndex() uint64 {
if v == nil {
return 0
}
return v.CreateIndex
}
// CSIVolListStub is partial representation of a CSI Volume for inclusion in lists
type CSIVolListStub struct {
ID string

View File

@ -273,8 +273,8 @@ type QueryOptions struct {
// previous response.
NextToken string
// Ascending is used to have results sorted in ascending chronological order.
Ascending bool
// Reverse is used to reverse the default order of list results.
Reverse bool
InternalRpcInfo
}
@ -4188,6 +4188,33 @@ func (j *Job) NamespacedID() NamespacedID {
}
}
// GetID implements the IDGetter interface, required for pagination.
func (j *Job) GetID() string {
if j == nil {
return ""
}
return j.ID
}
// GetNamespace implements the NamespaceGetter interface, required for
// pagination and filtering namespaces in endpoints that support glob namespace
// requests using tokens with limited access.
func (j *Job) GetNamespace() string {
if j == nil {
return ""
}
return j.Namespace
}
// GetCreateIndex implements the CreateIndexGetter interface, required for
// pagination.
func (j *Job) GetCreateIndex() uint64 {
if j == nil {
return 0
}
return j.CreateIndex
}
// Canonicalize is used to canonicalize fields in the Job. This should be
// called when registering a Job.
func (j *Job) Canonicalize() {
@ -9078,6 +9105,15 @@ func (d *Deployment) GetID() string {
return d.ID
}
// GetCreateIndex implements the CreateIndexGetter interface, required for
// pagination.
func (d *Deployment) GetCreateIndex() uint64 {
if d == nil {
return 0
}
return d.CreateIndex
}
// HasPlacedCanaries returns whether the deployment has placed canaries
func (d *Deployment) HasPlacedCanaries() bool {
if d == nil || len(d.TaskGroups) == 0 {
@ -9467,6 +9503,33 @@ type Allocation struct {
ModifyTime int64
}
// GetID implements the IDGetter interface, required for pagination.
func (a *Allocation) GetID() string {
if a == nil {
return ""
}
return a.ID
}
// GetNamespace implements the NamespaceGetter interface, required for
// pagination and filtering namespaces in endpoints that support glob namespace
// requests using tokens with limited access.
func (a *Allocation) GetNamespace() string {
if a == nil {
return ""
}
return a.Namespace
}
// GetCreateIndex implements the CreateIndexGetter interface, required for
// pagination.
func (a *Allocation) GetCreateIndex() uint64 {
if a == nil {
return 0
}
return a.CreateIndex
}
// ConsulNamespace returns the Consul namespace of the task group associated
// with this allocation.
func (a *Allocation) ConsulNamespace() string {
@ -10569,6 +10632,23 @@ type Evaluation struct {
ModifyTime int64
}
// GetID implements the IDGetter interface, required for pagination.
func (e *Evaluation) GetID() string {
if e == nil {
return ""
}
return e.ID
}
// GetCreateIndex implements the CreateIndexGetter interface, required for
// pagination.
func (e *Evaluation) GetCreateIndex() uint64 {
if e == nil {
return 0
}
return e.CreateIndex
}
// TerminalStatus returns if the current status is terminal and
// will no longer transition.
func (e *Evaluation) TerminalStatus() bool {
@ -11339,6 +11419,23 @@ type ACLToken struct {
ModifyIndex uint64
}
// GetID implements the IDGetter interface, required for pagination.
func (a *ACLToken) GetID() string {
if a == nil {
return ""
}
return a.AccessorID
}
// GetCreateIndex implements the CreateIndexGetter interface, required for
// pagination.
func (a *ACLToken) GetCreateIndex() uint64 {
if a == nil {
return 0
}
return a.CreateIndex
}
func (a *ACLToken) Copy() *ACLToken {
c := new(ACLToken)
*c = *a