api: return sorted results in certain list endpoints

These API endpoints now return results in chronological order. They
can return results in reverse chronological order by setting the
query parameter ascending=true.

- Eval.List
- Deployment.List
This commit is contained in:
Seth Hoenig 2022-02-10 11:50:34 -06:00
parent 15205b5408
commit 40c714a681
20 changed files with 485 additions and 118 deletions

3
.changelog/12054.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:improvement
api: sort return values of evaluation and deployment list api endpoints by creation index
```

View File

@ -75,6 +75,11 @@ type QueryOptions struct {
// previous response.
NextToken string
// Ascending is used to have results sorted in ascending chronological order.
//
// Currently only supported by evaluations.List and deployments.list endpoints.
Ascending bool
// ctx is an optional context pass through to the underlying HTTP
// request layer. Use Context() and WithContext() to manage this.
ctx context.Context
@ -587,6 +592,9 @@ func (r *request) setQueryOptions(q *QueryOptions) {
if q.NextToken != "" {
r.params.Set("next_token", q.NextToken)
}
if q.Ascending {
r.params.Set("ascending", "true")
}
for k, v := range q.Params {
r.params.Set(k, v)
}

View File

@ -181,27 +181,25 @@ func TestSetQueryOptions(t *testing.T) {
WaitIndex: 1000,
WaitTime: 100 * time.Second,
AuthToken: "foobar",
Ascending: true,
}
r.setQueryOptions(q)
if r.params.Get("region") != "foo" {
t.Fatalf("bad: %v", r.params)
}
if r.params.Get("namespace") != "bar" {
t.Fatalf("bad: %v", r.params)
}
if _, ok := r.params["stale"]; !ok {
t.Fatalf("bad: %v", r.params)
}
if r.params.Get("index") != "1000" {
t.Fatalf("bad: %v", r.params)
}
if r.params.Get("wait") != "100000ms" {
t.Fatalf("bad: %v", r.params)
}
if r.token != "foobar" {
t.Fatalf("bad: %v", r.token)
try := func(key, exp string) {
result := r.params.Get(key)
require.Equal(t, exp, result)
}
// Check auth token is set
require.Equal(t, "foobar", r.token)
// Check query parameters are set
try("region", "foo")
try("namespace", "bar")
try("stale", "") // should not be present
try("index", "1000")
try("wait", "100000ms")
try("ascending", "true")
}
func TestQueryOptionsContext(t *testing.T) {

View File

@ -17,6 +17,9 @@ func (s *HTTPServer) DeploymentsRequest(resp http.ResponseWriter, req *http.Requ
return nil, nil
}
query := req.URL.Query()
args.OrderAscending = query.Get("ascending") == "true"
var out structs.DeploymentListResponse
if err := s.agent.RPC("Deployment.List", &args, &out); err != nil {
return nil, err

View File

@ -20,6 +20,7 @@ func (s *HTTPServer) EvalsRequest(resp http.ResponseWriter, req *http.Request) (
query := req.URL.Query()
args.FilterEvalStatus = query.Get("status")
args.FilterJobID = query.Get("job")
args.OrderAscending = query.Get("ascending") == "true"
var out structs.EvalListResponse
if err := s.agent.RPC("Eval.List", &args, &out); err != nil {

View File

@ -192,8 +192,8 @@ func StateAsMap(state *state.StateStore) map[string][]interface{} {
"Allocs": toArray(state.Allocs(nil)),
"CSIPlugins": toArray(state.CSIPlugins(nil)),
"CSIVolumes": toArray(state.CSIVolumes(nil)),
"Deployments": toArray(state.Deployments(nil)),
"Evals": toArray(state.Evals(nil)),
"Deployments": toArray(state.Deployments(nil, false)),
"Evals": toArray(state.Evals(nil, false)),
"Indexes": toArray(state.Indexes()),
"JobSummaries": toArray(state.JobSummaries(nil)),
"JobVersions": toArray(state.JobVersions(nil)),

View File

@ -231,7 +231,7 @@ func (c *CoreScheduler) partitionJobReap(jobs []*structs.Job, leaderACL string)
func (c *CoreScheduler) evalGC(eval *structs.Evaluation) error {
// Iterate over the evaluations
ws := memdb.NewWatchSet()
iter, err := c.snap.Evals(ws)
iter, err := c.snap.Evals(ws, false)
if err != nil {
return err
}
@ -545,7 +545,7 @@ func (c *CoreScheduler) nodeReap(eval *structs.Evaluation, nodeIDs []string) err
func (c *CoreScheduler) deploymentGC(eval *structs.Evaluation) error {
// Iterate over the deployments
ws := memdb.NewWatchSet()
iter, err := c.snap.Deployments(ws)
iter, err := c.snap.Deployments(ws, false)
if err != nil {
return err
}

View File

@ -7,7 +7,6 @@ import (
metrics "github.com/armon/go-metrics"
log "github.com/hashicorp/go-hclog"
memdb "github.com/hashicorp/go-memdb"
"github.com/hashicorp/nomad/acl"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
@ -391,11 +390,13 @@ func (d *Deployment) List(args *structs.DeploymentListRequest, reply *structs.De
}
defer metrics.MeasureSince([]string{"nomad", "deployment", "list"}, time.Now())
namespace := args.RequestNamespace()
// Check namespace read-job permissions against request namespace since
// results are filtered by request namespace.
if aclObj, err := d.srv.ResolveToken(args.AuthToken); err != nil {
return err
} else if aclObj != nil && !aclObj.AllowNsOp(args.RequestNamespace(), acl.NamespaceCapabilityReadJob) {
} else if aclObj != nil && !aclObj.AllowNsOp(namespace, acl.NamespaceCapabilityReadJob) {
return structs.ErrPermissionDenied
}
@ -407,12 +408,13 @@ func (d *Deployment) List(args *structs.DeploymentListRequest, reply *structs.De
// Capture all the deployments
var err error
var iter memdb.ResultIterator
if prefix := args.QueryOptions.Prefix; prefix != "" {
iter, err = store.DeploymentsByIDPrefix(ws, args.RequestNamespace(), prefix)
} else if args.RequestNamespace() == structs.AllNamespacesSentinel {
iter, err = store.Deployments(ws)
iter, err = store.DeploymentsByIDPrefix(ws, namespace, prefix)
} else if namespace != structs.AllNamespacesSentinel {
iter, err = store.DeploymentsByNamespaceOrdered(ws, namespace, args.OrderAscending)
} else {
iter, err = store.DeploymentsByNamespace(ws, args.RequestNamespace())
iter, err = store.Deployments(ws, args.OrderAscending)
}
if err != nil {
return err

View File

@ -8,6 +8,7 @@ import (
msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/hashicorp/nomad/acl"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
@ -1031,6 +1032,95 @@ func TestDeploymentEndpoint_List(t *testing.T) {
assert.Len(resp.Deployments, 2, "Deployments")
}
func TestDeploymentEndpoint_List_order(t *testing.T) {
t.Parallel()
s1, cleanupS1 := TestServer(t, nil)
defer cleanupS1()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
// Create register requests
uuid1 := uuid.Generate()
dep1 := mock.Deployment()
dep1.ID = uuid1
uuid2 := uuid.Generate()
dep2 := mock.Deployment()
dep2.ID = uuid2
uuid3 := uuid.Generate()
dep3 := mock.Deployment()
dep3.ID = uuid3
err := s1.fsm.State().UpsertDeployment(1000, dep1)
require.NoError(t, err)
err = s1.fsm.State().UpsertDeployment(1001, dep2)
require.NoError(t, err)
err = s1.fsm.State().UpsertDeployment(1002, dep3)
require.NoError(t, err)
// update dep2 again so we can later assert create index order did not change
err = s1.fsm.State().UpsertDeployment(1003, dep2)
require.NoError(t, err)
t.Run("ascending", func(t *testing.T) {
// Lookup the deployments in chronological order (oldest first)
get := &structs.DeploymentListRequest{
QueryOptions: structs.QueryOptions{
Region: "global",
Namespace: "*",
},
OrderAscending: true,
}
var resp structs.DeploymentListResponse
err = msgpackrpc.CallWithCodec(codec, "Deployment.List", get, &resp)
require.NoError(t, err)
require.Equal(t, uint64(1003), resp.Index)
require.Len(t, resp.Deployments, 3)
// Assert returned order is by CreateIndex (ascending)
require.Equal(t, uint64(1000), resp.Deployments[0].CreateIndex)
require.Equal(t, uuid1, resp.Deployments[0].ID)
require.Equal(t, uint64(1001), resp.Deployments[1].CreateIndex)
require.Equal(t, uuid2, resp.Deployments[1].ID)
require.Equal(t, uint64(1002), resp.Deployments[2].CreateIndex)
require.Equal(t, uuid3, resp.Deployments[2].ID)
})
t.Run("descending", func(t *testing.T) {
// Lookup the deployments in reverse chronological order (newest first)
get := &structs.DeploymentListRequest{
QueryOptions: structs.QueryOptions{
Region: "global",
Namespace: "*",
},
OrderAscending: false,
}
var resp structs.DeploymentListResponse
err = msgpackrpc.CallWithCodec(codec, "Deployment.List", get, &resp)
require.NoError(t, err)
require.Equal(t, uint64(1003), resp.Index)
require.Len(t, resp.Deployments, 3)
// Assert returned order is by CreateIndex (descending)
require.Equal(t, uint64(1002), resp.Deployments[0].CreateIndex)
require.Equal(t, uuid3, resp.Deployments[0].ID)
require.Equal(t, uint64(1001), resp.Deployments[1].CreateIndex)
require.Equal(t, uuid2, resp.Deployments[1].ID)
require.Equal(t, uint64(1000), resp.Deployments[2].CreateIndex)
require.Equal(t, uuid1, resp.Deployments[2].ID)
})
}
func TestDeploymentEndpoint_List_ACL(t *testing.T) {
t.Parallel()
@ -1174,23 +1264,23 @@ func TestDeploymentEndpoint_List_Pagination(t *testing.T) {
jobID string
status string
}{
{id: "aaaa1111-3350-4b4b-d185-0e1992ed43e9"},
{id: "aaaaaa22-3350-4b4b-d185-0e1992ed43e9"},
{id: "aaaaaa33-3350-4b4b-d185-0e1992ed43e9", namespace: "non-default"},
{id: "aaaaaaaa-3350-4b4b-d185-0e1992ed43e9"},
{id: "aaaaaabb-3350-4b4b-d185-0e1992ed43e9"},
{id: "aaaaaacc-3350-4b4b-d185-0e1992ed43e9"},
{id: "aaaaaadd-3350-4b4b-d185-0e1992ed43e9"},
{id: "aaaa1111-3350-4b4b-d185-0e1992ed43e9"}, // 0
{id: "aaaaaa22-3350-4b4b-d185-0e1992ed43e9"}, // 1
{id: "aaaaaa33-3350-4b4b-d185-0e1992ed43e9", namespace: "non-default"}, // 2
{id: "aaaaaaaa-3350-4b4b-d185-0e1992ed43e9"}, // 3
{id: "aaaaaabb-3350-4b4b-d185-0e1992ed43e9"}, // 4
{id: "aaaaaacc-3350-4b4b-d185-0e1992ed43e9"}, // 5
{id: "aaaaaadd-3350-4b4b-d185-0e1992ed43e9"}, // 6
}
state := s1.fsm.State()
index := uint64(1000)
for _, m := range mocks {
index++
for i, m := range mocks {
index := 1000 + uint64(i)
deployment := mock.Deployment()
deployment.Status = structs.DeploymentStatusCancelled
deployment.ID = m.id
deployment.CreateIndex = index
if m.namespace != "" { // defaults to "default"
deployment.Namespace = m.namespace
}
@ -1262,6 +1352,7 @@ func TestDeploymentEndpoint_List_Pagination(t *testing.T) {
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
req := &structs.DeploymentListRequest{
OrderAscending: true, // counting up is easier to think about
QueryOptions: structs.QueryOptions{
Region: "global",
Namespace: tc.namespace,

View File

@ -204,7 +204,7 @@ func (w *Watcher) getDeploys(ctx context.Context, minIndex uint64) ([]*structs.D
// getDeploysImpl retrieves all deployments from the passed state store.
func (w *Watcher) getDeploysImpl(ws memdb.WatchSet, state *state.StateStore) (interface{}, uint64, error) {
iter, err := state.Deployments(ws)
iter, err := state.Deployments(ws, false)
if err != nil {
return nil, 0, err
}

View File

@ -382,17 +382,18 @@ func (e *Eval) Reap(args *structs.EvalDeleteRequest,
}
// List is used to get a list of the evaluations in the system
func (e *Eval) List(args *structs.EvalListRequest,
reply *structs.EvalListResponse) error {
func (e *Eval) List(args *structs.EvalListRequest, reply *structs.EvalListResponse) error {
if done, err := e.srv.forward("Eval.List", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"nomad", "eval", "list"}, time.Now())
namespace := args.RequestNamespace()
// Check for read-job permissions
if aclObj, err := e.srv.ResolveToken(args.AuthToken); err != nil {
return err
} else if aclObj != nil && !aclObj.AllowNsOp(args.RequestNamespace(), acl.NamespaceCapabilityReadJob) {
} else if aclObj != nil && !aclObj.AllowNsOp(namespace, acl.NamespaceCapabilityReadJob) {
return structs.ErrPermissionDenied
}
@ -404,12 +405,13 @@ func (e *Eval) List(args *structs.EvalListRequest,
// Scan all the evaluations
var err error
var iter memdb.ResultIterator
if args.RequestNamespace() == structs.AllNamespacesSentinel {
iter, err = store.Evals(ws)
} else if prefix := args.QueryOptions.Prefix; prefix != "" {
iter, err = store.EvalsByIDPrefix(ws, args.RequestNamespace(), prefix)
if prefix := args.QueryOptions.Prefix; prefix != "" {
iter, err = store.EvalsByIDPrefix(ws, namespace, prefix)
} else if namespace != structs.AllNamespacesSentinel {
iter, err = store.EvalsByNamespaceOrdered(ws, namespace, args.OrderAscending)
} else {
iter, err = store.EvalsByNamespace(ws, args.RequestNamespace())
iter, err = store.Evals(ws, args.OrderAscending)
}
if err != nil {
return err

View File

@ -715,6 +715,122 @@ func TestEvalEndpoint_List(t *testing.T) {
if len(resp2.Evaluations) != 1 {
t.Fatalf("bad: %#v", resp2.Evaluations)
}
}
func TestEvalEndpoint_List_order(t *testing.T) {
t.Parallel()
s1, cleanupS1 := TestServer(t, nil)
defer cleanupS1()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
// Create register requests
uuid1 := uuid.Generate()
eval1 := mock.Eval()
eval1.ID = uuid1
uuid2 := uuid.Generate()
eval2 := mock.Eval()
eval2.ID = uuid2
uuid3 := uuid.Generate()
eval3 := mock.Eval()
eval3.ID = uuid3
err := s1.fsm.State().UpsertEvals(structs.MsgTypeTestSetup, 1000, []*structs.Evaluation{eval1})
require.NoError(t, err)
err = s1.fsm.State().UpsertEvals(structs.MsgTypeTestSetup, 1001, []*structs.Evaluation{eval2})
require.NoError(t, err)
err = s1.fsm.State().UpsertEvals(structs.MsgTypeTestSetup, 1002, []*structs.Evaluation{eval3})
require.NoError(t, err)
// update eval2 again so we can later assert create index order did not change
err = s1.fsm.State().UpsertEvals(structs.MsgTypeTestSetup, 1003, []*structs.Evaluation{eval2})
require.NoError(t, err)
t.Run("descending", func(t *testing.T) {
// Lookup the evaluations in reverse chronological order
get := &structs.EvalListRequest{
QueryOptions: structs.QueryOptions{
Region: "global",
Namespace: "*",
},
OrderAscending: false,
}
var resp structs.EvalListResponse
err = msgpackrpc.CallWithCodec(codec, "Eval.List", get, &resp)
require.NoError(t, err)
require.Equal(t, uint64(1003), resp.Index)
require.Len(t, resp.Evaluations, 3)
// Assert returned order is by CreateIndex (descending)
require.Equal(t, uint64(1002), resp.Evaluations[0].CreateIndex)
require.Equal(t, uuid3, resp.Evaluations[0].ID)
require.Equal(t, uint64(1001), resp.Evaluations[1].CreateIndex)
require.Equal(t, uuid2, resp.Evaluations[1].ID)
require.Equal(t, uint64(1000), resp.Evaluations[2].CreateIndex)
require.Equal(t, uuid1, resp.Evaluations[2].ID)
})
t.Run("ascending", func(t *testing.T) {
// Lookup the evaluations in reverse chronological order (newest first)
get := &structs.EvalListRequest{
QueryOptions: structs.QueryOptions{
Region: "global",
Namespace: "*",
},
OrderAscending: true,
}
var resp structs.EvalListResponse
err = msgpackrpc.CallWithCodec(codec, "Eval.List", get, &resp)
require.NoError(t, err)
require.Equal(t, uint64(1003), resp.Index)
require.Len(t, resp.Evaluations, 3)
// Assert returned order is by CreateIndex (ascending)
require.Equal(t, uint64(1000), resp.Evaluations[0].CreateIndex)
require.Equal(t, uuid1, resp.Evaluations[0].ID)
require.Equal(t, uint64(1001), resp.Evaluations[1].CreateIndex)
require.Equal(t, uuid2, resp.Evaluations[1].ID)
require.Equal(t, uint64(1002), resp.Evaluations[2].CreateIndex)
require.Equal(t, uuid3, resp.Evaluations[2].ID)
})
t.Run("descending", func(t *testing.T) {
// Lookup the evaluations in chronological order (oldest first)
get := &structs.EvalListRequest{
QueryOptions: structs.QueryOptions{
Region: "global",
Namespace: "*",
},
OrderAscending: false,
}
var resp structs.EvalListResponse
err = msgpackrpc.CallWithCodec(codec, "Eval.List", get, &resp)
require.NoError(t, err)
require.Equal(t, uint64(1003), resp.Index)
require.Len(t, resp.Evaluations, 3)
// Assert returned order is by CreateIndex (descending)
require.Equal(t, uint64(1002), resp.Evaluations[0].CreateIndex)
require.Equal(t, uuid3, resp.Evaluations[0].ID)
require.Equal(t, uint64(1001), resp.Evaluations[1].CreateIndex)
require.Equal(t, uuid2, resp.Evaluations[1].ID)
require.Equal(t, uint64(1000), resp.Evaluations[2].CreateIndex)
require.Equal(t, uuid1, resp.Evaluations[2].ID)
})
}
@ -895,26 +1011,28 @@ func TestEvalEndpoint_List_PaginationFiltering(t *testing.T) {
// create a set of evals and field values to filter on. these are
// in the order that the state store will return them from the
// iterator (sorted by key), for ease of writing tests
// iterator (sorted by create index), for ease of writing tests
mocks := []struct {
id string
namespace string
jobID string
status string
}{
{id: "aaaa1111-3350-4b4b-d185-0e1992ed43e9", jobID: "example"},
{id: "aaaaaa22-3350-4b4b-d185-0e1992ed43e9", jobID: "example"},
{id: "aaaaaa33-3350-4b4b-d185-0e1992ed43e9", namespace: "non-default"},
{id: "aaaaaaaa-3350-4b4b-d185-0e1992ed43e9", jobID: "example", status: "blocked"},
{id: "aaaaaabb-3350-4b4b-d185-0e1992ed43e9"},
{id: "aaaaaacc-3350-4b4b-d185-0e1992ed43e9"},
{id: "aaaaaadd-3350-4b4b-d185-0e1992ed43e9", jobID: "example"},
{id: "aaaaaaee-3350-4b4b-d185-0e1992ed43e9", jobID: "example"},
{id: "aaaaaaff-3350-4b4b-d185-0e1992ed43e9"},
{id: "aaaa1111-3350-4b4b-d185-0e1992ed43e9", jobID: "example"}, // 0
{id: "aaaaaa22-3350-4b4b-d185-0e1992ed43e9", jobID: "example"}, // 1
{id: "aaaaaa33-3350-4b4b-d185-0e1992ed43e9", namespace: "non-default"}, // 2
{id: "aaaaaaaa-3350-4b4b-d185-0e1992ed43e9", jobID: "example", status: "blocked"}, // 3
{id: "aaaaaabb-3350-4b4b-d185-0e1992ed43e9"}, // 4
{id: "aaaaaacc-3350-4b4b-d185-0e1992ed43e9"}, // 5
{id: "aaaaaadd-3350-4b4b-d185-0e1992ed43e9", jobID: "example"}, // 6
{id: "aaaaaaee-3350-4b4b-d185-0e1992ed43e9", jobID: "example"}, // 7
{id: "aaaaaaff-3350-4b4b-d185-0e1992ed43e9"}, // 8
}
mockEvals := []*structs.Evaluation{}
for _, m := range mocks {
state := s1.fsm.State()
var evals []*structs.Evaluation
for i, m := range mocks {
eval := mock.Eval()
eval.ID = m.id
if m.namespace != "" { // defaults to "default"
@ -926,12 +1044,11 @@ func TestEvalEndpoint_List_PaginationFiltering(t *testing.T) {
if m.status != "" { // defaults to "pending"
eval.Status = m.status
}
mockEvals = append(mockEvals, eval)
evals = append(evals, eval)
index := 1000 + uint64(i)
require.NoError(t, state.UpsertEvals(structs.MsgTypeTestSetup, index, []*structs.Evaluation{eval}))
}
state := s1.fsm.State()
require.NoError(t, state.UpsertEvals(structs.MsgTypeTestSetup, 1000, mockEvals))
aclToken := mock.CreatePolicyAndToken(t, state, 1100, "test-valid-read",
mock.NamespacePolicy(structs.DefaultNamespace, "read", nil)).
SecretID
@ -948,13 +1065,13 @@ func TestEvalEndpoint_List_PaginationFiltering(t *testing.T) {
expectedIDs []string
}{
{
name: "test01 size-2 page-1 default NS",
pageSize: 2,
expectedNextToken: "aaaaaaaa-3350-4b4b-d185-0e1992ed43e9",
expectedIDs: []string{
name: "test01 size-2 page-1 default NS",
pageSize: 2,
expectedIDs: []string{ // first two items
"aaaa1111-3350-4b4b-d185-0e1992ed43e9",
"aaaaaa22-3350-4b4b-d185-0e1992ed43e9",
},
expectedNextToken: "aaaaaaaa-3350-4b4b-d185-0e1992ed43e9", // next one in default namespace
},
{
name: "test02 size-2 page-1 default NS with prefix",
@ -1025,8 +1142,8 @@ func TestEvalEndpoint_List_PaginationFiltering(t *testing.T) {
},
},
{
name: "test08 size-2 page-2 filter skip nextToken",
pageSize: 3, // reads off the end
name: "test08 size-2 page-2 filter skip nextToken", //
pageSize: 3, // reads off the end
filterJobID: "example",
filterStatus: "pending",
nextToken: "aaaaaaaa-3350-4b4b-d185-0e1992ed43e9",
@ -1084,6 +1201,7 @@ func TestEvalEndpoint_List_PaginationFiltering(t *testing.T) {
req := &structs.EvalListRequest{
FilterJobID: tc.filterJobID,
FilterEvalStatus: tc.filterStatus,
OrderAscending: true, // counting up is easier to think about
QueryOptions: structs.QueryOptions{
Region: "global",
Namespace: tc.namespace,

View File

@ -1708,7 +1708,7 @@ func (n *nomadFSM) failLeakedDeployments(state *state.StateStore) error {
// Scan for deployments that are referencing a job that no longer exists.
// This could happen if multiple deployments were created for a given job
// and thus the older deployment leaks and then the job is removed.
iter, err := state.Deployments(nil)
iter, err := state.Deployments(nil, false)
if err != nil {
return fmt.Errorf("failed to query deployments: %v", err)
}
@ -2071,7 +2071,7 @@ func (s *nomadSnapshot) persistEvals(sink raft.SnapshotSink,
encoder *codec.Encoder) error {
// Get all the evaluations
ws := memdb.NewWatchSet()
evals, err := s.snap.Evals(ws)
evals, err := s.snap.Evals(ws, false)
if err != nil {
return err
}
@ -2250,7 +2250,7 @@ func (s *nomadSnapshot) persistDeployments(sink raft.SnapshotSink,
encoder *codec.Encoder) error {
// Get all the jobs
ws := memdb.NewWatchSet()
deployments, err := s.snap.Deployments(ws)
deployments, err := s.snap.Deployments(ws, false)
if err != nil {
return err
}

View File

@ -503,7 +503,7 @@ func diffNamespaces(state *state.StateStore, minIndex uint64, remoteList []*stru
func (s *Server) restoreEvals() error {
// Get an iterator over every evaluation
ws := memdb.NewWatchSet()
iter, err := s.fsm.State().Evals(ws)
iter, err := s.fsm.State().Evals(ws, false)
if err != nil {
return fmt.Errorf("failed to get evaluations: %v", err)
}

View File

@ -303,6 +303,7 @@ func deploymentSchema() *memdb.TableSchema {
return &memdb.TableSchema{
Name: "deployment",
Indexes: map[string]*memdb.IndexSchema{
// id index is used for direct lookup of an deployment by ID.
"id": {
Name: "id",
AllowMissing: false,
@ -312,6 +313,20 @@ func deploymentSchema() *memdb.TableSchema {
},
},
// create index is used for listing deploy, ordering them by
// creation chronology. (Use a reverse iterator for newest first).
//
// There may be more than one deployment per CreateIndex.
"create": {
Name: "create",
AllowMissing: false,
Unique: false,
Indexer: &memdb.UintFieldIndex{
Field: "CreateIndex",
},
},
// namespace is used to lookup evaluations by namespace.
"namespace": {
Name: "namespace",
AllowMissing: false,
@ -321,7 +336,31 @@ func deploymentSchema() *memdb.TableSchema {
},
},
// Job index is used to lookup deployments by job
// namespace_create index is used to lookup deployments by namespace
// in their original chronological order based on CreateIndex.
//
// Use a prefix iterator (namespace_create_prefix) to iterate deployments
// of a Namespace in order of CreateIndex.
//
// There may be more than one deployment per CreateIndex.
"namespace_create": {
Name: "namespace_create",
AllowMissing: false,
Unique: false,
Indexer: &memdb.CompoundIndex{
AllowMissing: false,
Indexes: []memdb.Indexer{
&memdb.StringFieldIndex{
Field: "Namespace",
},
&memdb.UintFieldIndex{
Field: "CreateIndex",
},
},
},
},
// job index is used to lookup deployments by job
"job": {
Name: "job",
AllowMissing: false,
@ -384,7 +423,7 @@ func evalTableSchema() *memdb.TableSchema {
return &memdb.TableSchema{
Name: "evals",
Indexes: map[string]*memdb.IndexSchema{
// Primary index is used for direct lookup.
// id index is used for direct lookup of an evaluation by ID.
"id": {
Name: "id",
AllowMissing: false,
@ -394,16 +433,18 @@ func evalTableSchema() *memdb.TableSchema {
},
},
"namespace": {
Name: "namespace",
// create index is used for listing evaluations, ordering them by
// creation chronology. (Use a reverse iterator for newest first).
"create": {
Name: "create",
AllowMissing: false,
Unique: false,
Indexer: &memdb.StringFieldIndex{
Field: "Namespace",
Indexer: &memdb.UintFieldIndex{
Field: "CreateIndex",
},
},
// Job index is used to lookup allocations by job
// job index is used to lookup evaluations by job ID.
"job": {
Name: "job",
AllowMissing: false,
@ -426,6 +467,38 @@ func evalTableSchema() *memdb.TableSchema {
},
},
},
// namespace is used to lookup evaluations by namespace.
"namespace": {
Name: "namespace",
AllowMissing: false,
Unique: false,
Indexer: &memdb.StringFieldIndex{
Field: "Namespace",
},
},
// namespace_create index is used to lookup evaluations by namespace
// in their original chronological order based on CreateIndex.
//
// Use a prefix iterator (namespace_prefix) on a Namespace to iterate
// those evaluations in order of CreateIndex.
"namespace_create": {
Name: "namespace_create",
AllowMissing: false,
Unique: false,
Indexer: &memdb.CompoundIndex{
AllowMissing: false,
Indexes: []memdb.Indexer{
&memdb.StringFieldIndex{
Field: "Namespace",
},
&memdb.UintFieldIndex{
Field: "CreateIndex",
},
},
},
},
},
}
}

View File

@ -32,6 +32,14 @@ const (
NodeRegisterEventReregistered = "Node re-registered"
)
// terminate appends the go-memdb terminator character to s.
//
// We can then use the result for exact matches during prefix
// scans over compound indexes that start with s.
func terminate(s string) string {
return s + "\x00"
}
// IndexEntry is used with the "index" table
// for managing the latest Raft index affecting a table.
type IndexEntry struct {
@ -536,17 +544,25 @@ func (s *StateStore) upsertDeploymentImpl(index uint64, deployment *structs.Depl
return nil
}
func (s *StateStore) Deployments(ws memdb.WatchSet) (memdb.ResultIterator, error) {
func (s *StateStore) Deployments(ws memdb.WatchSet, ascending bool) (memdb.ResultIterator, error) {
txn := s.db.ReadTxn()
// Walk the entire deployments table
iter, err := txn.Get("deployment", "id")
var it memdb.ResultIterator
var err error
if ascending {
it, err = txn.Get("deployment", "create")
} else {
it, err = txn.GetReverse("deployment", "create")
}
if err != nil {
return nil, err
}
ws.Add(iter.WatchCh())
return iter, nil
ws.Add(it.WatchCh())
return it, nil
}
func (s *StateStore) DeploymentsByNamespace(ws memdb.WatchSet, namespace string) (memdb.ResultIterator, error) {
@ -562,6 +578,30 @@ func (s *StateStore) DeploymentsByNamespace(ws memdb.WatchSet, namespace string)
return iter, nil
}
func (s *StateStore) DeploymentsByNamespaceOrdered(ws memdb.WatchSet, namespace string, ascending bool) (memdb.ResultIterator, error) {
txn := s.db.ReadTxn()
var (
it memdb.ResultIterator
err error
exact = terminate(namespace)
)
if ascending {
it, err = txn.Get("deployment", "namespace_create_prefix", exact)
} else {
it, err = txn.GetReverse("deployment", "namespace_create_prefix", exact)
}
if err != nil {
return nil, err
}
ws.Add(it.WatchCh())
return it, nil
}
func (s *StateStore) DeploymentsByIDPrefix(ws memdb.WatchSet, namespace, deploymentID string) (memdb.ResultIterator, error) {
txn := s.db.ReadTxn()
@ -3112,35 +3152,68 @@ func (s *StateStore) EvalsByJob(ws memdb.WatchSet, namespace, jobID string) ([]*
return out, nil
}
// Evals returns an iterator over all the evaluations
func (s *StateStore) Evals(ws memdb.WatchSet) (memdb.ResultIterator, error) {
// Evals returns an iterator over all the evaluations in ascending or descending
// order of CreationIndex as determined by the ascending parameter.
func (s *StateStore) Evals(ws memdb.WatchSet, ascending bool) (memdb.ResultIterator, error) {
txn := s.db.ReadTxn()
// Walk the entire table
iter, err := txn.Get("evals", "id")
var it memdb.ResultIterator
var err error
if ascending {
it, err = txn.Get("evals", "create")
} else {
it, err = txn.GetReverse("evals", "create")
}
if err != nil {
return nil, err
}
ws.Add(iter.WatchCh())
ws.Add(it.WatchCh())
return iter, nil
return it, nil
}
// EvalsByNamespace returns an iterator over all the evaluations in the given
// namespace
// EvalsByNamespace returns an iterator over all evaluations in no particular
// order.
//
// todo(shoenig): can this be removed?
func (s *StateStore) EvalsByNamespace(ws memdb.WatchSet, namespace string) (memdb.ResultIterator, error) {
txn := s.db.ReadTxn()
// Walk the entire table
iter, err := txn.Get("evals", "namespace", namespace)
it, err := txn.Get("evals", "namespace", namespace)
if err != nil {
return nil, err
}
ws.Add(iter.WatchCh())
ws.Add(it.WatchCh())
return iter, nil
return it, nil
}
func (s *StateStore) EvalsByNamespaceOrdered(ws memdb.WatchSet, namespace string, ascending bool) (memdb.ResultIterator, error) {
txn := s.db.ReadTxn()
var (
it memdb.ResultIterator
err error
exact = terminate(namespace)
)
if ascending {
it, err = txn.Get("evals", "namespace_create_prefix", exact)
} else {
it, err = txn.GetReverse("evals", "namespace_create_prefix", exact)
}
if err != nil {
return nil, err
}
ws.Add(it.WatchCh())
return it, nil
}
// UpdateAllocsFromClient is used to update an allocation based on input

View File

@ -666,39 +666,24 @@ func TestStateStore_Deployments(t *testing.T) {
deployments = append(deployments, deployment)
err := state.UpsertDeployment(1000+uint64(i), deployment)
if err != nil {
t.Fatalf("err: %v", err)
}
require.NoError(t, err)
}
ws := memdb.NewWatchSet()
iter, err := state.Deployments(ws)
if err != nil {
t.Fatalf("err: %v", err)
}
it, err := state.Deployments(ws, true)
require.NoError(t, err)
var out []*structs.Deployment
for {
raw := iter.Next()
raw := it.Next()
if raw == nil {
break
}
out = append(out, raw.(*structs.Deployment))
}
lessThan := func(i, j int) bool {
return deployments[i].ID < deployments[j].ID
}
sort.Slice(deployments, lessThan)
sort.Slice(out, lessThan)
if !reflect.DeepEqual(deployments, out) {
t.Fatalf("bad: %#v %#v", deployments, out)
}
if watchFired(ws) {
t.Fatalf("bad")
}
require.Equal(t, deployments, out)
require.False(t, watchFired(ws))
}
func TestStateStore_DeploymentsByIDPrefix(t *testing.T) {
@ -3827,7 +3812,7 @@ func TestStateStore_Evals(t *testing.T) {
}
ws := memdb.NewWatchSet()
iter, err := state.Evals(ws)
iter, err := state.Evals(ws, false)
if err != nil {
t.Fatalf("err: %v", err)
}

View File

@ -859,6 +859,7 @@ type EvalDequeueRequest struct {
type EvalListRequest struct {
FilterJobID string
FilterEvalStatus string
OrderAscending bool
QueryOptions
}
@ -1093,6 +1094,7 @@ type GenericRequest struct {
// DeploymentListRequest is used to list the deployments
type DeploymentListRequest struct {
OrderAscending bool
QueryOptions
}

View File

@ -46,6 +46,10 @@ The table below shows this endpoint's support for
used as the `last_token` of the next request to fetch additional
pages.
- `ascending` `(bool: false)` - Specifies the list of returned deployments should
be sorted in chronological order (oldest evaluations first). By default deployments
are returned sorted in reverse chronological order (newest deployments first).
### Sample Request
```shell-session

View File

@ -53,6 +53,10 @@ The table below shows this endpoint's support for
namespace. Specifying `*` will return all evaluations across all
authorized namespaces.
- `ascending` `(bool: false)` - Specifies the list of returned evaluations should
be sorted in chronological order (oldest evaluations first). By default evaluations
are returned sorted in reverse chronological order (newest evaluations first).
### Sample Request
```shell-session