From c0f229551039c97e346e3f3d47c35525c50d7915 Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Tue, 6 Jun 2023 11:40:13 -0400 Subject: [PATCH] node pools: implement HTTP API to list jobs in pool (#17431) Implements the HTTP API associated with the `NodePool.ListJobs` RPC, including the `api` package for the public API and documentation. Update the `NodePool.ListJobs` RPC to fix the missing handling of the special "all" pool. --- api/node_pools.go | 13 ++++ command/agent/node_pool_endpoint.go | 48 +++++++++++++ command/agent/node_pool_endpoint_test.go | 65 +++++++++++++++++ nomad/node_pool_endpoint.go | 35 +++++---- nomad/node_pool_endpoint_test.go | 33 ++++++++- website/content/api-docs/node-pools.mdx | 92 ++++++++++++++++++++++++ 6 files changed, 271 insertions(+), 15 deletions(-) diff --git a/api/node_pools.go b/api/node_pools.go index 37ecf1555..8e93cfaea 100644 --- a/api/node_pools.go +++ b/api/node_pools.go @@ -5,6 +5,7 @@ package api import ( "errors" + "fmt" "net/url" ) @@ -88,6 +89,18 @@ func (n *NodePools) Delete(name string, w *WriteOptions) (*WriteMeta, error) { return wm, nil } +// ListJobs is used to list all the jobs in a node pool. +func (n *NodePools) ListJobs(poolName string, q *QueryOptions) ([]*JobListStub, *QueryMeta, error) { + var resp []*JobListStub + qm, err := n.client.query( + fmt.Sprintf("/v1/node/pool/%s/jobs", url.PathEscape(poolName)), + &resp, q) + if err != nil { + return nil, nil, err + } + return resp, qm, nil +} + // NodePool is used to serialize a node pool. type NodePool struct { Name string `hcl:"name,label"` diff --git a/command/agent/node_pool_endpoint.go b/command/agent/node_pool_endpoint.go index 754c79318..a93a6fa7a 100644 --- a/command/agent/node_pool_endpoint.go +++ b/command/agent/node_pool_endpoint.go @@ -28,6 +28,9 @@ func (s *HTTPServer) NodePoolSpecificRequest(resp http.ResponseWriter, req *http case strings.HasSuffix(path, "/nodes"): poolName := strings.TrimSuffix(path, "/nodes") return s.nodePoolNodesList(resp, req, poolName) + case strings.HasSuffix(path, "/jobs"): + poolName := strings.TrimSuffix(path, "/jobs") + return s.nodePoolJobList(resp, req, poolName) default: return s.nodePoolCRUD(resp, req, path) } @@ -125,6 +128,10 @@ func (s *HTTPServer) nodePoolDelete(resp http.ResponseWriter, req *http.Request, } func (s *HTTPServer) nodePoolNodesList(resp http.ResponseWriter, req *http.Request, poolName string) (interface{}, error) { + if req.Method != http.MethodGet { + return nil, CodedError(http.StatusMethodNotAllowed, ErrInvalidMethod) + } + args := structs.NodePoolNodesRequest{ Name: poolName, } @@ -157,3 +164,44 @@ func (s *HTTPServer) nodePoolNodesList(resp http.ResponseWriter, req *http.Reque } return out.Nodes, nil } + +func (s *HTTPServer) nodePoolJobList(resp http.ResponseWriter, req *http.Request, poolName string) (any, error) { + if req.Method != http.MethodGet { + return nil, CodedError(http.StatusMethodNotAllowed, ErrInvalidMethod) + } + + args := structs.NodePoolJobsRequest{ + Name: poolName, + } + if s.parse(resp, req, &args.Region, &args.QueryOptions) { + return nil, nil + } + + if args.Prefix != "" { + // the prefix argument is ambiguous for this endpoint (does it refer to + // the node pool name or the job names like /v1/jobs?) so the RPC + // handler ignores it + return nil, CodedError(http.StatusBadRequest, "prefix argument not allowed") + } + + // Parse meta query param + args.Fields = &structs.JobStubFields{} + jobMeta, err := parseBool(req, "meta") + if err != nil { + return nil, err + } + if jobMeta != nil { + args.Fields.Meta = *jobMeta + } + + var out structs.NodePoolJobsResponse + if err := s.agent.RPC("NodePool.ListJobs", &args, &out); err != nil { + return nil, err + } + + setMeta(resp, &out.QueryMeta) + if out.Jobs == nil { + out.Jobs = make([]*structs.JobListStub, 0) + } + return out.Jobs, nil +} diff --git a/command/agent/node_pool_endpoint_test.go b/command/agent/node_pool_endpoint_test.go index 1efe17957..ce0843374 100644 --- a/command/agent/node_pool_endpoint_test.go +++ b/command/agent/node_pool_endpoint_test.go @@ -420,3 +420,68 @@ func TestHTTP_NodePool_NodesList(t *testing.T) { } }) } + +func TestHTTP_NodePool_JobsList(t *testing.T) { + ci.Parallel(t) + httpTest(t, nil, func(s *TestAgent) { + + pool1, pool2 := mock.NodePool(), mock.NodePool() + npUpReq := structs.NodePoolUpsertRequest{ + NodePools: []*structs.NodePool{pool1, pool2}, + } + var npUpResp structs.GenericResponse + err := s.Agent.RPC("NodePool.UpsertNodePools", &npUpReq, &npUpResp) + must.NoError(t, err) + + for _, poolName := range []string{pool1.Name, "default"} { + for i := 0; i < 2; i++ { + job := mock.MinJob() + job.NodePool = poolName + jobRegReq := structs.JobRegisterRequest{ + Job: job, + WriteRequest: structs.WriteRequest{ + Region: "global", + Namespace: structs.DefaultNamespace, + }, + } + var jobRegResp structs.JobRegisterResponse + must.NoError(t, s.Agent.RPC("Job.Register", &jobRegReq, &jobRegResp)) + } + } + + // Make HTTP request to occupied pool + req, err := http.NewRequest(http.MethodGet, + fmt.Sprintf("/v1/node/pool/%s/jobs", pool1.Name), nil) + must.NoError(t, err) + respW := httptest.NewRecorder() + + obj, err := s.Server.NodePoolSpecificRequest(respW, req) + must.NoError(t, err) + must.SliceLen(t, 2, obj.([]*structs.JobListStub)) + + // Verify response index. + gotIndex, err := strconv.ParseUint(respW.HeaderMap.Get("X-Nomad-Index"), 10, 64) + must.NoError(t, err) + must.NonZero(t, gotIndex) + + // Make HTTP request to empty pool + req, err = http.NewRequest(http.MethodGet, + fmt.Sprintf("/v1/node/pool/%s/jobs", pool2.Name), nil) + must.NoError(t, err) + respW = httptest.NewRecorder() + + obj, err = s.Server.NodePoolSpecificRequest(respW, req) + must.NoError(t, err) + must.SliceLen(t, 0, obj.([]*structs.JobListStub)) + + // Make HTTP request to the "all"" pool + req, err = http.NewRequest(http.MethodGet, "/v1/node/pool/all/jobs", nil) + must.NoError(t, err) + respW = httptest.NewRecorder() + + obj, err = s.Server.NodePoolSpecificRequest(respW, req) + must.NoError(t, err) + must.SliceLen(t, 4, obj.([]*structs.JobListStub)) + + }) +} diff --git a/nomad/node_pool_endpoint.go b/nomad/node_pool_endpoint.go index 72b7fd8fa..7219b840b 100644 --- a/nomad/node_pool_endpoint.go +++ b/nomad/node_pool_endpoint.go @@ -312,21 +312,30 @@ func (n *NodePool) ListJobs(args *structs.NodePoolJobsRequest, reply *structs.No }, } - if namespace == structs.AllNamespacesSentinel { - iter, err = store.JobsByPool(ws, args.Name) + if args.Name == structs.NodePoolAll { + if namespace == structs.AllNamespacesSentinel { + iter, err = store.Jobs(ws) + } else { + iter, err = store.JobsByNamespace(ws, namespace) + } } else { - iter, err = store.JobsByNamespace(ws, namespace) - filters = append(filters, - paginator.GenericFilter{ - Allow: func(raw interface{}) (bool, error) { - job := raw.(*structs.Job) - if job == nil || job.NodePool != args.Name { - return false, nil - } - return true, nil - }, - }) + if namespace == structs.AllNamespacesSentinel { + iter, err = store.JobsByPool(ws, args.Name) + } else { + iter, err = store.JobsByNamespace(ws, namespace) + filters = append(filters, + paginator.GenericFilter{ + Allow: func(raw interface{}) (bool, error) { + job := raw.(*structs.Job) + if job == nil || job.NodePool != args.Name { + return false, nil + } + return true, nil + }, + }) + } } + if err != nil { return err } diff --git a/nomad/node_pool_endpoint_test.go b/nomad/node_pool_endpoint_test.go index febc6a487..7d633f7d5 100644 --- a/nomad/node_pool_endpoint_test.go +++ b/nomad/node_pool_endpoint_test.go @@ -1278,7 +1278,7 @@ func TestNodePoolEndpoint_ListJobs_Blocking(t *testing.T) { func TestNodePoolEndpoint_ListJobs_PaginationFiltering(t *testing.T) { ci.Parallel(t) - s1, _, cleanupS1 := TestACLServer(t, func(c *Config) { + s1, root, cleanupS1 := TestACLServer(t, func(c *Config) { c.NumSchedulers = 0 // Prevent automatic dequeue }) defer cleanupS1() @@ -1356,12 +1356,13 @@ func TestNodePoolEndpoint_ListJobs_PaginationFiltering(t *testing.T) { filter string nextToken string pageSize int32 + aclToken string expectedNextToken string expectedIDs []string expectedError string }{ { - name: "test00 all dev pool default NS", + name: "test00 dev pool default NS", pool: "dev-1", expectedIDs: []string{"job-00", "job-01", "job-04", "job-05", "job-10"}, }, @@ -1447,6 +1448,31 @@ func TestNodePoolEndpoint_ListJobs_PaginationFiltering(t *testing.T) { nextToken: "default.job-07", expectedIDs: []string{"job-10"}, }, + { + name: "test12 all pool wildcard NS", + pool: "all", + namespace: "*", + pageSize: 4, + expectedError: "Permission denied", + }, + { + name: "test13 all pool wildcard NS", + pool: "all", + namespace: "*", + aclToken: root.SecretID, + expectedIDs: []string{ // note these are sorted namespace-then-job-ID + "job-00", "job-01", "job-02", "job-04", "job-05", + "job-08", "job-10", "job-06", "job-03", "job-09", + }, + }, + { + name: "test14 all pool default NS", + pool: "all", + pageSize: 4, + aclToken: root.SecretID, + expectedNextToken: "default.job-05", + expectedIDs: []string{"job-00", "job-01", "job-02", "job-04"}, + }, } for _, tc := range cases { @@ -1462,6 +1488,9 @@ func TestNodePoolEndpoint_ListJobs_PaginationFiltering(t *testing.T) { }, } req.AuthToken = devToken.SecretID + if tc.aclToken != "" { + req.AuthToken = tc.aclToken + } var resp structs.NodePoolJobsResponse err := msgpackrpc.CallWithCodec(codec, "NodePool.ListJobs", req, &resp) diff --git a/website/content/api-docs/node-pools.mdx b/website/content/api-docs/node-pools.mdx index 9250ba5d8..b28136dac 100644 --- a/website/content/api-docs/node-pools.mdx +++ b/website/content/api-docs/node-pools.mdx @@ -342,4 +342,96 @@ $ nomad operator api /v1/node/pool/prod-eng/nodes?os=true ] ``` + +# List Node Pool Jobs + +This endpoint lists the jobs in a node pool. + +| Method | Path | Produces | +| ------ | -------------------------------- | ------------------ | +| `GET` | `/v1/node/pool/:node_pool/jobs` | `application/json` | + +The table below shows this endpoint's support for +[blocking queries](/nomad/api-docs#blocking-queries) and +[required ACLs](/nomad/api-docs#acls). + +| Blocking Queries | ACL Required | +| ---------------- | ----------------------------------- | +| `YES` | `namespace:read`
`node_pool:read` | + +### Parameters + +- `:node_pool` `(string: )`- Specifies the node pool to list jobs. + +- `filter` `(string: "")` - Specifies the [expression](/nomad/api-docs#filtering) + used to filter the results. Consider using pagination to reduce resource used + to serve the request. + +- `meta` `(bool: false)` - If set, jobs returned will include a + [meta](/nomad/docs/job-specification/meta) field containing key-value pairs + provided in the job specification's `meta` block. + +- `namespace` `(string: "default")` - Specifies the target namespace. Specifying + `*` will return all jobs across all the authorized namespaces. + +- `next_token` `(string: "")` - This endpoint supports paging. The `next_token` + parameter accepts a string which identifies the next expected job. This value + can be obtained from the `X-Nomad-NextToken` header from the previous + response. + +- `per_page` `(int: 0)` - Specifies a maximum number of jobs to return for this + request. If omitted, the response is not paginated. The value of the + `X-Nomad-NextToken` header of the last response can be used as the + `next_token` of the next request to fetch additional pages. + +### Sample Request + +```shell-session +$ nomad operator api /v1/node/pool/prod-eng/jobs +``` + +```shell-session +$ nomad operator api /v1/node/pool/prod-eng/jobs?namespace=prod +``` + +### Sample Response + +```json +[ + { + "ID": "example", + "ParentID": "", + "Name": "example", + "Type": "service", + "Priority": 50, + "Status": "pending", + "StatusDescription": "", + "JobSummary": { + "JobID": "example", + "Namespace": "default", + "Summary": { + "cache": { + "Queued": 1, + "Complete": 1, + "Failed": 0, + "Running": 0, + "Starting": 0, + "Lost": 0 + } + }, + "Children": { + "Pending": 0, + "Running": 0, + "Dead": 0 + }, + "CreateIndex": 52, + "ModifyIndex": 96 + }, + "CreateIndex": 52, + "ModifyIndex": 93, + "JobModifyIndex": 52 + } +] +``` + [api_scheduler_alog]: /nomad/api-docs/operator/scheduler#scheduleralgorithm