diff --git a/.changelog/12054.txt b/.changelog/12054.txt new file mode 100644 index 000000000..441f687b5 --- /dev/null +++ b/.changelog/12054.txt @@ -0,0 +1,3 @@ +```release-note:improvement +api: sort return values of evaluation and deployment list api endpoints by creation index +``` diff --git a/api/api.go b/api/api.go index d8b90cc8d..1dfc19205 100644 --- a/api/api.go +++ b/api/api.go @@ -75,6 +75,11 @@ type QueryOptions struct { // previous response. NextToken string + // Ascending is used to have results sorted in ascending chronological order. + // + // Currently only supported by evaluations.List and deployments.list endpoints. + Ascending bool + // ctx is an optional context pass through to the underlying HTTP // request layer. Use Context() and WithContext() to manage this. ctx context.Context @@ -587,6 +592,9 @@ func (r *request) setQueryOptions(q *QueryOptions) { if q.NextToken != "" { r.params.Set("next_token", q.NextToken) } + if q.Ascending { + r.params.Set("ascending", "true") + } for k, v := range q.Params { r.params.Set(k, v) } diff --git a/api/api_test.go b/api/api_test.go index 3f83e716c..0f503c624 100644 --- a/api/api_test.go +++ b/api/api_test.go @@ -181,27 +181,25 @@ func TestSetQueryOptions(t *testing.T) { WaitIndex: 1000, WaitTime: 100 * time.Second, AuthToken: "foobar", + Ascending: true, } r.setQueryOptions(q) - if r.params.Get("region") != "foo" { - t.Fatalf("bad: %v", r.params) - } - if r.params.Get("namespace") != "bar" { - t.Fatalf("bad: %v", r.params) - } - if _, ok := r.params["stale"]; !ok { - t.Fatalf("bad: %v", r.params) - } - if r.params.Get("index") != "1000" { - t.Fatalf("bad: %v", r.params) - } - if r.params.Get("wait") != "100000ms" { - t.Fatalf("bad: %v", r.params) - } - if r.token != "foobar" { - t.Fatalf("bad: %v", r.token) + try := func(key, exp string) { + result := r.params.Get(key) + require.Equal(t, exp, result) } + + // Check auth token is set + require.Equal(t, "foobar", r.token) + + // Check query parameters are set + try("region", "foo") + try("namespace", "bar") + try("stale", "") // should not be present + try("index", "1000") + try("wait", "100000ms") + try("ascending", "true") } func TestQueryOptionsContext(t *testing.T) { diff --git a/command/agent/deployment_endpoint.go b/command/agent/deployment_endpoint.go index 58829adce..729f95fd3 100644 --- a/command/agent/deployment_endpoint.go +++ b/command/agent/deployment_endpoint.go @@ -17,6 +17,9 @@ func (s *HTTPServer) DeploymentsRequest(resp http.ResponseWriter, req *http.Requ return nil, nil } + query := req.URL.Query() + args.OrderAscending = query.Get("ascending") == "true" + var out structs.DeploymentListResponse if err := s.agent.RPC("Deployment.List", &args, &out); err != nil { return nil, err diff --git a/command/agent/eval_endpoint.go b/command/agent/eval_endpoint.go index a51c9e940..3494b8508 100644 --- a/command/agent/eval_endpoint.go +++ b/command/agent/eval_endpoint.go @@ -20,6 +20,7 @@ func (s *HTTPServer) EvalsRequest(resp http.ResponseWriter, req *http.Request) ( query := req.URL.Query() args.FilterEvalStatus = query.Get("status") args.FilterJobID = query.Get("job") + args.OrderAscending = query.Get("ascending") == "true" var out structs.EvalListResponse if err := s.agent.RPC("Eval.List", &args, &out); err != nil { diff --git a/helper/raftutil/fsm.go b/helper/raftutil/fsm.go index 3410b5ffb..2d27f7972 100644 --- a/helper/raftutil/fsm.go +++ b/helper/raftutil/fsm.go @@ -192,8 +192,8 @@ func StateAsMap(state *state.StateStore) map[string][]interface{} { "Allocs": toArray(state.Allocs(nil)), "CSIPlugins": toArray(state.CSIPlugins(nil)), "CSIVolumes": toArray(state.CSIVolumes(nil)), - "Deployments": toArray(state.Deployments(nil)), - "Evals": toArray(state.Evals(nil)), + "Deployments": toArray(state.Deployments(nil, false)), + "Evals": toArray(state.Evals(nil, false)), "Indexes": toArray(state.Indexes()), "JobSummaries": toArray(state.JobSummaries(nil)), "JobVersions": toArray(state.JobVersions(nil)), diff --git a/nomad/core_sched.go b/nomad/core_sched.go index 17925ecc7..e4dbaf82a 100644 --- a/nomad/core_sched.go +++ b/nomad/core_sched.go @@ -231,7 +231,7 @@ func (c *CoreScheduler) partitionJobReap(jobs []*structs.Job, leaderACL string) func (c *CoreScheduler) evalGC(eval *structs.Evaluation) error { // Iterate over the evaluations ws := memdb.NewWatchSet() - iter, err := c.snap.Evals(ws) + iter, err := c.snap.Evals(ws, false) if err != nil { return err } @@ -545,7 +545,7 @@ func (c *CoreScheduler) nodeReap(eval *structs.Evaluation, nodeIDs []string) err func (c *CoreScheduler) deploymentGC(eval *structs.Evaluation) error { // Iterate over the deployments ws := memdb.NewWatchSet() - iter, err := c.snap.Deployments(ws) + iter, err := c.snap.Deployments(ws, false) if err != nil { return err } diff --git a/nomad/deployment_endpoint.go b/nomad/deployment_endpoint.go index 2c18de98d..ce8c82e0b 100644 --- a/nomad/deployment_endpoint.go +++ b/nomad/deployment_endpoint.go @@ -7,7 +7,6 @@ import ( metrics "github.com/armon/go-metrics" log "github.com/hashicorp/go-hclog" memdb "github.com/hashicorp/go-memdb" - "github.com/hashicorp/nomad/acl" "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" @@ -391,11 +390,13 @@ func (d *Deployment) List(args *structs.DeploymentListRequest, reply *structs.De } defer metrics.MeasureSince([]string{"nomad", "deployment", "list"}, time.Now()) + namespace := args.RequestNamespace() + // Check namespace read-job permissions against request namespace since // results are filtered by request namespace. if aclObj, err := d.srv.ResolveToken(args.AuthToken); err != nil { return err - } else if aclObj != nil && !aclObj.AllowNsOp(args.RequestNamespace(), acl.NamespaceCapabilityReadJob) { + } else if aclObj != nil && !aclObj.AllowNsOp(namespace, acl.NamespaceCapabilityReadJob) { return structs.ErrPermissionDenied } @@ -407,12 +408,13 @@ func (d *Deployment) List(args *structs.DeploymentListRequest, reply *structs.De // Capture all the deployments var err error var iter memdb.ResultIterator + if prefix := args.QueryOptions.Prefix; prefix != "" { - iter, err = store.DeploymentsByIDPrefix(ws, args.RequestNamespace(), prefix) - } else if args.RequestNamespace() == structs.AllNamespacesSentinel { - iter, err = store.Deployments(ws) + iter, err = store.DeploymentsByIDPrefix(ws, namespace, prefix) + } else if namespace != structs.AllNamespacesSentinel { + iter, err = store.DeploymentsByNamespaceOrdered(ws, namespace, args.OrderAscending) } else { - iter, err = store.DeploymentsByNamespace(ws, args.RequestNamespace()) + iter, err = store.Deployments(ws, args.OrderAscending) } if err != nil { return err diff --git a/nomad/deployment_endpoint_test.go b/nomad/deployment_endpoint_test.go index 9868fb021..867b3e866 100644 --- a/nomad/deployment_endpoint_test.go +++ b/nomad/deployment_endpoint_test.go @@ -8,6 +8,7 @@ import ( msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc" "github.com/hashicorp/nomad/acl" "github.com/hashicorp/nomad/helper" + "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/testutil" @@ -1031,6 +1032,95 @@ func TestDeploymentEndpoint_List(t *testing.T) { assert.Len(resp.Deployments, 2, "Deployments") } +func TestDeploymentEndpoint_List_order(t *testing.T) { + t.Parallel() + + s1, cleanupS1 := TestServer(t, nil) + defer cleanupS1() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create register requests + uuid1 := uuid.Generate() + dep1 := mock.Deployment() + dep1.ID = uuid1 + + uuid2 := uuid.Generate() + dep2 := mock.Deployment() + dep2.ID = uuid2 + + uuid3 := uuid.Generate() + dep3 := mock.Deployment() + dep3.ID = uuid3 + + err := s1.fsm.State().UpsertDeployment(1000, dep1) + require.NoError(t, err) + + err = s1.fsm.State().UpsertDeployment(1001, dep2) + require.NoError(t, err) + + err = s1.fsm.State().UpsertDeployment(1002, dep3) + require.NoError(t, err) + + // update dep2 again so we can later assert create index order did not change + err = s1.fsm.State().UpsertDeployment(1003, dep2) + require.NoError(t, err) + + t.Run("ascending", func(t *testing.T) { + // Lookup the deployments in chronological order (oldest first) + get := &structs.DeploymentListRequest{ + QueryOptions: structs.QueryOptions{ + Region: "global", + Namespace: "*", + }, + OrderAscending: true, + } + + var resp structs.DeploymentListResponse + err = msgpackrpc.CallWithCodec(codec, "Deployment.List", get, &resp) + require.NoError(t, err) + require.Equal(t, uint64(1003), resp.Index) + require.Len(t, resp.Deployments, 3) + + // Assert returned order is by CreateIndex (ascending) + require.Equal(t, uint64(1000), resp.Deployments[0].CreateIndex) + require.Equal(t, uuid1, resp.Deployments[0].ID) + + require.Equal(t, uint64(1001), resp.Deployments[1].CreateIndex) + require.Equal(t, uuid2, resp.Deployments[1].ID) + + require.Equal(t, uint64(1002), resp.Deployments[2].CreateIndex) + require.Equal(t, uuid3, resp.Deployments[2].ID) + }) + + t.Run("descending", func(t *testing.T) { + // Lookup the deployments in reverse chronological order (newest first) + get := &structs.DeploymentListRequest{ + QueryOptions: structs.QueryOptions{ + Region: "global", + Namespace: "*", + }, + OrderAscending: false, + } + + var resp structs.DeploymentListResponse + err = msgpackrpc.CallWithCodec(codec, "Deployment.List", get, &resp) + require.NoError(t, err) + require.Equal(t, uint64(1003), resp.Index) + require.Len(t, resp.Deployments, 3) + + // Assert returned order is by CreateIndex (descending) + require.Equal(t, uint64(1002), resp.Deployments[0].CreateIndex) + require.Equal(t, uuid3, resp.Deployments[0].ID) + + require.Equal(t, uint64(1001), resp.Deployments[1].CreateIndex) + require.Equal(t, uuid2, resp.Deployments[1].ID) + + require.Equal(t, uint64(1000), resp.Deployments[2].CreateIndex) + require.Equal(t, uuid1, resp.Deployments[2].ID) + }) +} + func TestDeploymentEndpoint_List_ACL(t *testing.T) { t.Parallel() @@ -1174,23 +1264,23 @@ func TestDeploymentEndpoint_List_Pagination(t *testing.T) { jobID string status string }{ - {id: "aaaa1111-3350-4b4b-d185-0e1992ed43e9"}, - {id: "aaaaaa22-3350-4b4b-d185-0e1992ed43e9"}, - {id: "aaaaaa33-3350-4b4b-d185-0e1992ed43e9", namespace: "non-default"}, - {id: "aaaaaaaa-3350-4b4b-d185-0e1992ed43e9"}, - {id: "aaaaaabb-3350-4b4b-d185-0e1992ed43e9"}, - {id: "aaaaaacc-3350-4b4b-d185-0e1992ed43e9"}, - {id: "aaaaaadd-3350-4b4b-d185-0e1992ed43e9"}, + {id: "aaaa1111-3350-4b4b-d185-0e1992ed43e9"}, // 0 + {id: "aaaaaa22-3350-4b4b-d185-0e1992ed43e9"}, // 1 + {id: "aaaaaa33-3350-4b4b-d185-0e1992ed43e9", namespace: "non-default"}, // 2 + {id: "aaaaaaaa-3350-4b4b-d185-0e1992ed43e9"}, // 3 + {id: "aaaaaabb-3350-4b4b-d185-0e1992ed43e9"}, // 4 + {id: "aaaaaacc-3350-4b4b-d185-0e1992ed43e9"}, // 5 + {id: "aaaaaadd-3350-4b4b-d185-0e1992ed43e9"}, // 6 } state := s1.fsm.State() - index := uint64(1000) - for _, m := range mocks { - index++ + for i, m := range mocks { + index := 1000 + uint64(i) deployment := mock.Deployment() deployment.Status = structs.DeploymentStatusCancelled deployment.ID = m.id + deployment.CreateIndex = index if m.namespace != "" { // defaults to "default" deployment.Namespace = m.namespace } @@ -1262,6 +1352,7 @@ func TestDeploymentEndpoint_List_Pagination(t *testing.T) { for _, tc := range cases { t.Run(tc.name, func(t *testing.T) { req := &structs.DeploymentListRequest{ + OrderAscending: true, // counting up is easier to think about QueryOptions: structs.QueryOptions{ Region: "global", Namespace: tc.namespace, diff --git a/nomad/deploymentwatcher/deployments_watcher.go b/nomad/deploymentwatcher/deployments_watcher.go index d42128321..8743f2b12 100644 --- a/nomad/deploymentwatcher/deployments_watcher.go +++ b/nomad/deploymentwatcher/deployments_watcher.go @@ -204,7 +204,7 @@ func (w *Watcher) getDeploys(ctx context.Context, minIndex uint64) ([]*structs.D // getDeploysImpl retrieves all deployments from the passed state store. func (w *Watcher) getDeploysImpl(ws memdb.WatchSet, state *state.StateStore) (interface{}, uint64, error) { - iter, err := state.Deployments(ws) + iter, err := state.Deployments(ws, false) if err != nil { return nil, 0, err } diff --git a/nomad/eval_endpoint.go b/nomad/eval_endpoint.go index 8a48e27c1..1c6408c52 100644 --- a/nomad/eval_endpoint.go +++ b/nomad/eval_endpoint.go @@ -382,17 +382,18 @@ func (e *Eval) Reap(args *structs.EvalDeleteRequest, } // List is used to get a list of the evaluations in the system -func (e *Eval) List(args *structs.EvalListRequest, - reply *structs.EvalListResponse) error { +func (e *Eval) List(args *structs.EvalListRequest, reply *structs.EvalListResponse) error { if done, err := e.srv.forward("Eval.List", args, args, reply); done { return err } defer metrics.MeasureSince([]string{"nomad", "eval", "list"}, time.Now()) + namespace := args.RequestNamespace() + // Check for read-job permissions if aclObj, err := e.srv.ResolveToken(args.AuthToken); err != nil { return err - } else if aclObj != nil && !aclObj.AllowNsOp(args.RequestNamespace(), acl.NamespaceCapabilityReadJob) { + } else if aclObj != nil && !aclObj.AllowNsOp(namespace, acl.NamespaceCapabilityReadJob) { return structs.ErrPermissionDenied } @@ -404,12 +405,13 @@ func (e *Eval) List(args *structs.EvalListRequest, // Scan all the evaluations var err error var iter memdb.ResultIterator - if args.RequestNamespace() == structs.AllNamespacesSentinel { - iter, err = store.Evals(ws) - } else if prefix := args.QueryOptions.Prefix; prefix != "" { - iter, err = store.EvalsByIDPrefix(ws, args.RequestNamespace(), prefix) + + if prefix := args.QueryOptions.Prefix; prefix != "" { + iter, err = store.EvalsByIDPrefix(ws, namespace, prefix) + } else if namespace != structs.AllNamespacesSentinel { + iter, err = store.EvalsByNamespaceOrdered(ws, namespace, args.OrderAscending) } else { - iter, err = store.EvalsByNamespace(ws, args.RequestNamespace()) + iter, err = store.Evals(ws, args.OrderAscending) } if err != nil { return err diff --git a/nomad/eval_endpoint_test.go b/nomad/eval_endpoint_test.go index 2b78c6b72..e2cef6904 100644 --- a/nomad/eval_endpoint_test.go +++ b/nomad/eval_endpoint_test.go @@ -715,6 +715,122 @@ func TestEvalEndpoint_List(t *testing.T) { if len(resp2.Evaluations) != 1 { t.Fatalf("bad: %#v", resp2.Evaluations) } +} + +func TestEvalEndpoint_List_order(t *testing.T) { + t.Parallel() + + s1, cleanupS1 := TestServer(t, nil) + defer cleanupS1() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create register requests + uuid1 := uuid.Generate() + eval1 := mock.Eval() + eval1.ID = uuid1 + + uuid2 := uuid.Generate() + eval2 := mock.Eval() + eval2.ID = uuid2 + + uuid3 := uuid.Generate() + eval3 := mock.Eval() + eval3.ID = uuid3 + + err := s1.fsm.State().UpsertEvals(structs.MsgTypeTestSetup, 1000, []*structs.Evaluation{eval1}) + require.NoError(t, err) + + err = s1.fsm.State().UpsertEvals(structs.MsgTypeTestSetup, 1001, []*structs.Evaluation{eval2}) + require.NoError(t, err) + + err = s1.fsm.State().UpsertEvals(structs.MsgTypeTestSetup, 1002, []*structs.Evaluation{eval3}) + require.NoError(t, err) + + // update eval2 again so we can later assert create index order did not change + err = s1.fsm.State().UpsertEvals(structs.MsgTypeTestSetup, 1003, []*structs.Evaluation{eval2}) + require.NoError(t, err) + + t.Run("descending", func(t *testing.T) { + // Lookup the evaluations in reverse chronological order + get := &structs.EvalListRequest{ + QueryOptions: structs.QueryOptions{ + Region: "global", + Namespace: "*", + }, + OrderAscending: false, + } + + var resp structs.EvalListResponse + err = msgpackrpc.CallWithCodec(codec, "Eval.List", get, &resp) + require.NoError(t, err) + require.Equal(t, uint64(1003), resp.Index) + require.Len(t, resp.Evaluations, 3) + + // Assert returned order is by CreateIndex (descending) + require.Equal(t, uint64(1002), resp.Evaluations[0].CreateIndex) + require.Equal(t, uuid3, resp.Evaluations[0].ID) + + require.Equal(t, uint64(1001), resp.Evaluations[1].CreateIndex) + require.Equal(t, uuid2, resp.Evaluations[1].ID) + + require.Equal(t, uint64(1000), resp.Evaluations[2].CreateIndex) + require.Equal(t, uuid1, resp.Evaluations[2].ID) + }) + + t.Run("ascending", func(t *testing.T) { + // Lookup the evaluations in reverse chronological order (newest first) + get := &structs.EvalListRequest{ + QueryOptions: structs.QueryOptions{ + Region: "global", + Namespace: "*", + }, + OrderAscending: true, + } + + var resp structs.EvalListResponse + err = msgpackrpc.CallWithCodec(codec, "Eval.List", get, &resp) + require.NoError(t, err) + require.Equal(t, uint64(1003), resp.Index) + require.Len(t, resp.Evaluations, 3) + + // Assert returned order is by CreateIndex (ascending) + require.Equal(t, uint64(1000), resp.Evaluations[0].CreateIndex) + require.Equal(t, uuid1, resp.Evaluations[0].ID) + + require.Equal(t, uint64(1001), resp.Evaluations[1].CreateIndex) + require.Equal(t, uuid2, resp.Evaluations[1].ID) + + require.Equal(t, uint64(1002), resp.Evaluations[2].CreateIndex) + require.Equal(t, uuid3, resp.Evaluations[2].ID) + }) + + t.Run("descending", func(t *testing.T) { + // Lookup the evaluations in chronological order (oldest first) + get := &structs.EvalListRequest{ + QueryOptions: structs.QueryOptions{ + Region: "global", + Namespace: "*", + }, + OrderAscending: false, + } + + var resp structs.EvalListResponse + err = msgpackrpc.CallWithCodec(codec, "Eval.List", get, &resp) + require.NoError(t, err) + require.Equal(t, uint64(1003), resp.Index) + require.Len(t, resp.Evaluations, 3) + + // Assert returned order is by CreateIndex (descending) + require.Equal(t, uint64(1002), resp.Evaluations[0].CreateIndex) + require.Equal(t, uuid3, resp.Evaluations[0].ID) + + require.Equal(t, uint64(1001), resp.Evaluations[1].CreateIndex) + require.Equal(t, uuid2, resp.Evaluations[1].ID) + + require.Equal(t, uint64(1000), resp.Evaluations[2].CreateIndex) + require.Equal(t, uuid1, resp.Evaluations[2].ID) + }) } @@ -895,26 +1011,28 @@ func TestEvalEndpoint_List_PaginationFiltering(t *testing.T) { // create a set of evals and field values to filter on. these are // in the order that the state store will return them from the - // iterator (sorted by key), for ease of writing tests + // iterator (sorted by create index), for ease of writing tests mocks := []struct { id string namespace string jobID string status string }{ - {id: "aaaa1111-3350-4b4b-d185-0e1992ed43e9", jobID: "example"}, - {id: "aaaaaa22-3350-4b4b-d185-0e1992ed43e9", jobID: "example"}, - {id: "aaaaaa33-3350-4b4b-d185-0e1992ed43e9", namespace: "non-default"}, - {id: "aaaaaaaa-3350-4b4b-d185-0e1992ed43e9", jobID: "example", status: "blocked"}, - {id: "aaaaaabb-3350-4b4b-d185-0e1992ed43e9"}, - {id: "aaaaaacc-3350-4b4b-d185-0e1992ed43e9"}, - {id: "aaaaaadd-3350-4b4b-d185-0e1992ed43e9", jobID: "example"}, - {id: "aaaaaaee-3350-4b4b-d185-0e1992ed43e9", jobID: "example"}, - {id: "aaaaaaff-3350-4b4b-d185-0e1992ed43e9"}, + {id: "aaaa1111-3350-4b4b-d185-0e1992ed43e9", jobID: "example"}, // 0 + {id: "aaaaaa22-3350-4b4b-d185-0e1992ed43e9", jobID: "example"}, // 1 + {id: "aaaaaa33-3350-4b4b-d185-0e1992ed43e9", namespace: "non-default"}, // 2 + {id: "aaaaaaaa-3350-4b4b-d185-0e1992ed43e9", jobID: "example", status: "blocked"}, // 3 + {id: "aaaaaabb-3350-4b4b-d185-0e1992ed43e9"}, // 4 + {id: "aaaaaacc-3350-4b4b-d185-0e1992ed43e9"}, // 5 + {id: "aaaaaadd-3350-4b4b-d185-0e1992ed43e9", jobID: "example"}, // 6 + {id: "aaaaaaee-3350-4b4b-d185-0e1992ed43e9", jobID: "example"}, // 7 + {id: "aaaaaaff-3350-4b4b-d185-0e1992ed43e9"}, // 8 } - mockEvals := []*structs.Evaluation{} - for _, m := range mocks { + state := s1.fsm.State() + + var evals []*structs.Evaluation + for i, m := range mocks { eval := mock.Eval() eval.ID = m.id if m.namespace != "" { // defaults to "default" @@ -926,12 +1044,11 @@ func TestEvalEndpoint_List_PaginationFiltering(t *testing.T) { if m.status != "" { // defaults to "pending" eval.Status = m.status } - mockEvals = append(mockEvals, eval) + evals = append(evals, eval) + index := 1000 + uint64(i) + require.NoError(t, state.UpsertEvals(structs.MsgTypeTestSetup, index, []*structs.Evaluation{eval})) } - state := s1.fsm.State() - require.NoError(t, state.UpsertEvals(structs.MsgTypeTestSetup, 1000, mockEvals)) - aclToken := mock.CreatePolicyAndToken(t, state, 1100, "test-valid-read", mock.NamespacePolicy(structs.DefaultNamespace, "read", nil)). SecretID @@ -948,13 +1065,13 @@ func TestEvalEndpoint_List_PaginationFiltering(t *testing.T) { expectedIDs []string }{ { - name: "test01 size-2 page-1 default NS", - pageSize: 2, - expectedNextToken: "aaaaaaaa-3350-4b4b-d185-0e1992ed43e9", - expectedIDs: []string{ + name: "test01 size-2 page-1 default NS", + pageSize: 2, + expectedIDs: []string{ // first two items "aaaa1111-3350-4b4b-d185-0e1992ed43e9", "aaaaaa22-3350-4b4b-d185-0e1992ed43e9", }, + expectedNextToken: "aaaaaaaa-3350-4b4b-d185-0e1992ed43e9", // next one in default namespace }, { name: "test02 size-2 page-1 default NS with prefix", @@ -1025,8 +1142,8 @@ func TestEvalEndpoint_List_PaginationFiltering(t *testing.T) { }, }, { - name: "test08 size-2 page-2 filter skip nextToken", - pageSize: 3, // reads off the end + name: "test08 size-2 page-2 filter skip nextToken", // + pageSize: 3, // reads off the end filterJobID: "example", filterStatus: "pending", nextToken: "aaaaaaaa-3350-4b4b-d185-0e1992ed43e9", @@ -1084,6 +1201,7 @@ func TestEvalEndpoint_List_PaginationFiltering(t *testing.T) { req := &structs.EvalListRequest{ FilterJobID: tc.filterJobID, FilterEvalStatus: tc.filterStatus, + OrderAscending: true, // counting up is easier to think about QueryOptions: structs.QueryOptions{ Region: "global", Namespace: tc.namespace, diff --git a/nomad/fsm.go b/nomad/fsm.go index 90dba3231..6fcd0a044 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -1708,7 +1708,7 @@ func (n *nomadFSM) failLeakedDeployments(state *state.StateStore) error { // Scan for deployments that are referencing a job that no longer exists. // This could happen if multiple deployments were created for a given job // and thus the older deployment leaks and then the job is removed. - iter, err := state.Deployments(nil) + iter, err := state.Deployments(nil, false) if err != nil { return fmt.Errorf("failed to query deployments: %v", err) } @@ -2071,7 +2071,7 @@ func (s *nomadSnapshot) persistEvals(sink raft.SnapshotSink, encoder *codec.Encoder) error { // Get all the evaluations ws := memdb.NewWatchSet() - evals, err := s.snap.Evals(ws) + evals, err := s.snap.Evals(ws, false) if err != nil { return err } @@ -2250,7 +2250,7 @@ func (s *nomadSnapshot) persistDeployments(sink raft.SnapshotSink, encoder *codec.Encoder) error { // Get all the jobs ws := memdb.NewWatchSet() - deployments, err := s.snap.Deployments(ws) + deployments, err := s.snap.Deployments(ws, false) if err != nil { return err } diff --git a/nomad/leader.go b/nomad/leader.go index ecffabc85..7facb66c7 100644 --- a/nomad/leader.go +++ b/nomad/leader.go @@ -503,7 +503,7 @@ func diffNamespaces(state *state.StateStore, minIndex uint64, remoteList []*stru func (s *Server) restoreEvals() error { // Get an iterator over every evaluation ws := memdb.NewWatchSet() - iter, err := s.fsm.State().Evals(ws) + iter, err := s.fsm.State().Evals(ws, false) if err != nil { return fmt.Errorf("failed to get evaluations: %v", err) } diff --git a/nomad/state/schema.go b/nomad/state/schema.go index da647970b..eb6805f04 100644 --- a/nomad/state/schema.go +++ b/nomad/state/schema.go @@ -303,6 +303,7 @@ func deploymentSchema() *memdb.TableSchema { return &memdb.TableSchema{ Name: "deployment", Indexes: map[string]*memdb.IndexSchema{ + // id index is used for direct lookup of an deployment by ID. "id": { Name: "id", AllowMissing: false, @@ -312,6 +313,20 @@ func deploymentSchema() *memdb.TableSchema { }, }, + // create index is used for listing deploy, ordering them by + // creation chronology. (Use a reverse iterator for newest first). + // + // There may be more than one deployment per CreateIndex. + "create": { + Name: "create", + AllowMissing: false, + Unique: false, + Indexer: &memdb.UintFieldIndex{ + Field: "CreateIndex", + }, + }, + + // namespace is used to lookup evaluations by namespace. "namespace": { Name: "namespace", AllowMissing: false, @@ -321,7 +336,31 @@ func deploymentSchema() *memdb.TableSchema { }, }, - // Job index is used to lookup deployments by job + // namespace_create index is used to lookup deployments by namespace + // in their original chronological order based on CreateIndex. + // + // Use a prefix iterator (namespace_create_prefix) to iterate deployments + // of a Namespace in order of CreateIndex. + // + // There may be more than one deployment per CreateIndex. + "namespace_create": { + Name: "namespace_create", + AllowMissing: false, + Unique: false, + Indexer: &memdb.CompoundIndex{ + AllowMissing: false, + Indexes: []memdb.Indexer{ + &memdb.StringFieldIndex{ + Field: "Namespace", + }, + &memdb.UintFieldIndex{ + Field: "CreateIndex", + }, + }, + }, + }, + + // job index is used to lookup deployments by job "job": { Name: "job", AllowMissing: false, @@ -384,7 +423,7 @@ func evalTableSchema() *memdb.TableSchema { return &memdb.TableSchema{ Name: "evals", Indexes: map[string]*memdb.IndexSchema{ - // Primary index is used for direct lookup. + // id index is used for direct lookup of an evaluation by ID. "id": { Name: "id", AllowMissing: false, @@ -394,16 +433,18 @@ func evalTableSchema() *memdb.TableSchema { }, }, - "namespace": { - Name: "namespace", + // create index is used for listing evaluations, ordering them by + // creation chronology. (Use a reverse iterator for newest first). + "create": { + Name: "create", AllowMissing: false, Unique: false, - Indexer: &memdb.StringFieldIndex{ - Field: "Namespace", + Indexer: &memdb.UintFieldIndex{ + Field: "CreateIndex", }, }, - // Job index is used to lookup allocations by job + // job index is used to lookup evaluations by job ID. "job": { Name: "job", AllowMissing: false, @@ -426,6 +467,38 @@ func evalTableSchema() *memdb.TableSchema { }, }, }, + + // namespace is used to lookup evaluations by namespace. + "namespace": { + Name: "namespace", + AllowMissing: false, + Unique: false, + Indexer: &memdb.StringFieldIndex{ + Field: "Namespace", + }, + }, + + // namespace_create index is used to lookup evaluations by namespace + // in their original chronological order based on CreateIndex. + // + // Use a prefix iterator (namespace_prefix) on a Namespace to iterate + // those evaluations in order of CreateIndex. + "namespace_create": { + Name: "namespace_create", + AllowMissing: false, + Unique: false, + Indexer: &memdb.CompoundIndex{ + AllowMissing: false, + Indexes: []memdb.Indexer{ + &memdb.StringFieldIndex{ + Field: "Namespace", + }, + &memdb.UintFieldIndex{ + Field: "CreateIndex", + }, + }, + }, + }, }, } } diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index f32fd3a38..7e9edeeeb 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -32,6 +32,14 @@ const ( NodeRegisterEventReregistered = "Node re-registered" ) +// terminate appends the go-memdb terminator character to s. +// +// We can then use the result for exact matches during prefix +// scans over compound indexes that start with s. +func terminate(s string) string { + return s + "\x00" +} + // IndexEntry is used with the "index" table // for managing the latest Raft index affecting a table. type IndexEntry struct { @@ -536,17 +544,25 @@ func (s *StateStore) upsertDeploymentImpl(index uint64, deployment *structs.Depl return nil } -func (s *StateStore) Deployments(ws memdb.WatchSet) (memdb.ResultIterator, error) { +func (s *StateStore) Deployments(ws memdb.WatchSet, ascending bool) (memdb.ResultIterator, error) { txn := s.db.ReadTxn() - // Walk the entire deployments table - iter, err := txn.Get("deployment", "id") + var it memdb.ResultIterator + var err error + + if ascending { + it, err = txn.Get("deployment", "create") + } else { + it, err = txn.GetReverse("deployment", "create") + } + if err != nil { return nil, err } - ws.Add(iter.WatchCh()) - return iter, nil + ws.Add(it.WatchCh()) + + return it, nil } func (s *StateStore) DeploymentsByNamespace(ws memdb.WatchSet, namespace string) (memdb.ResultIterator, error) { @@ -562,6 +578,30 @@ func (s *StateStore) DeploymentsByNamespace(ws memdb.WatchSet, namespace string) return iter, nil } +func (s *StateStore) DeploymentsByNamespaceOrdered(ws memdb.WatchSet, namespace string, ascending bool) (memdb.ResultIterator, error) { + txn := s.db.ReadTxn() + + var ( + it memdb.ResultIterator + err error + exact = terminate(namespace) + ) + + if ascending { + it, err = txn.Get("deployment", "namespace_create_prefix", exact) + } else { + it, err = txn.GetReverse("deployment", "namespace_create_prefix", exact) + } + + if err != nil { + return nil, err + } + + ws.Add(it.WatchCh()) + + return it, nil +} + func (s *StateStore) DeploymentsByIDPrefix(ws memdb.WatchSet, namespace, deploymentID string) (memdb.ResultIterator, error) { txn := s.db.ReadTxn() @@ -3112,35 +3152,68 @@ func (s *StateStore) EvalsByJob(ws memdb.WatchSet, namespace, jobID string) ([]* return out, nil } -// Evals returns an iterator over all the evaluations -func (s *StateStore) Evals(ws memdb.WatchSet) (memdb.ResultIterator, error) { +// Evals returns an iterator over all the evaluations in ascending or descending +// order of CreationIndex as determined by the ascending parameter. +func (s *StateStore) Evals(ws memdb.WatchSet, ascending bool) (memdb.ResultIterator, error) { txn := s.db.ReadTxn() - // Walk the entire table - iter, err := txn.Get("evals", "id") + var it memdb.ResultIterator + var err error + + if ascending { + it, err = txn.Get("evals", "create") + } else { + it, err = txn.GetReverse("evals", "create") + } + if err != nil { return nil, err } - ws.Add(iter.WatchCh()) + ws.Add(it.WatchCh()) - return iter, nil + return it, nil } -// EvalsByNamespace returns an iterator over all the evaluations in the given -// namespace +// EvalsByNamespace returns an iterator over all evaluations in no particular +// order. +// +// todo(shoenig): can this be removed? func (s *StateStore) EvalsByNamespace(ws memdb.WatchSet, namespace string) (memdb.ResultIterator, error) { txn := s.db.ReadTxn() - // Walk the entire table - iter, err := txn.Get("evals", "namespace", namespace) + it, err := txn.Get("evals", "namespace", namespace) if err != nil { return nil, err } - ws.Add(iter.WatchCh()) + ws.Add(it.WatchCh()) - return iter, nil + return it, nil +} + +func (s *StateStore) EvalsByNamespaceOrdered(ws memdb.WatchSet, namespace string, ascending bool) (memdb.ResultIterator, error) { + txn := s.db.ReadTxn() + + var ( + it memdb.ResultIterator + err error + exact = terminate(namespace) + ) + + if ascending { + it, err = txn.Get("evals", "namespace_create_prefix", exact) + } else { + it, err = txn.GetReverse("evals", "namespace_create_prefix", exact) + } + + if err != nil { + return nil, err + } + + ws.Add(it.WatchCh()) + + return it, nil } // UpdateAllocsFromClient is used to update an allocation based on input diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index 561765d94..80b98dfa7 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -666,39 +666,24 @@ func TestStateStore_Deployments(t *testing.T) { deployments = append(deployments, deployment) err := state.UpsertDeployment(1000+uint64(i), deployment) - if err != nil { - t.Fatalf("err: %v", err) - } + require.NoError(t, err) } ws := memdb.NewWatchSet() - iter, err := state.Deployments(ws) - if err != nil { - t.Fatalf("err: %v", err) - } + it, err := state.Deployments(ws, true) + require.NoError(t, err) var out []*structs.Deployment for { - raw := iter.Next() + raw := it.Next() if raw == nil { break } out = append(out, raw.(*structs.Deployment)) } - lessThan := func(i, j int) bool { - return deployments[i].ID < deployments[j].ID - } - sort.Slice(deployments, lessThan) - sort.Slice(out, lessThan) - - if !reflect.DeepEqual(deployments, out) { - t.Fatalf("bad: %#v %#v", deployments, out) - } - - if watchFired(ws) { - t.Fatalf("bad") - } + require.Equal(t, deployments, out) + require.False(t, watchFired(ws)) } func TestStateStore_DeploymentsByIDPrefix(t *testing.T) { @@ -3827,7 +3812,7 @@ func TestStateStore_Evals(t *testing.T) { } ws := memdb.NewWatchSet() - iter, err := state.Evals(ws) + iter, err := state.Evals(ws, false) if err != nil { t.Fatalf("err: %v", err) } diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 7051a26b4..65d9a8c6e 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -859,6 +859,7 @@ type EvalDequeueRequest struct { type EvalListRequest struct { FilterJobID string FilterEvalStatus string + OrderAscending bool QueryOptions } @@ -1093,6 +1094,7 @@ type GenericRequest struct { // DeploymentListRequest is used to list the deployments type DeploymentListRequest struct { + OrderAscending bool QueryOptions } diff --git a/website/content/api-docs/deployments.mdx b/website/content/api-docs/deployments.mdx index a608ab357..484b5003c 100644 --- a/website/content/api-docs/deployments.mdx +++ b/website/content/api-docs/deployments.mdx @@ -46,6 +46,10 @@ The table below shows this endpoint's support for used as the `last_token` of the next request to fetch additional pages. +- `ascending` `(bool: false)` - Specifies the list of returned deployments should + be sorted in chronological order (oldest evaluations first). By default deployments + are returned sorted in reverse chronological order (newest deployments first). + ### Sample Request ```shell-session diff --git a/website/content/api-docs/evaluations.mdx b/website/content/api-docs/evaluations.mdx index ab78127ea..835eacfef 100644 --- a/website/content/api-docs/evaluations.mdx +++ b/website/content/api-docs/evaluations.mdx @@ -53,6 +53,10 @@ The table below shows this endpoint's support for namespace. Specifying `*` will return all evaluations across all authorized namespaces. +- `ascending` `(bool: false)` - Specifies the list of returned evaluations should + be sorted in chronological order (oldest evaluations first). By default evaluations + are returned sorted in reverse chronological order (newest evaluations first). + ### Sample Request ```shell-session