node pools: implement HTTP API to list jobs in pool (#17431)

Implements the HTTP API associated with the `NodePool.ListJobs` RPC, including
the `api` package for the public API and documentation.

Update the `NodePool.ListJobs` RPC to fix the missing handling of the special
"all" pool.
This commit is contained in:
Tim Gross 2023-06-06 11:40:13 -04:00 committed by GitHub
parent 2420c93179
commit c0f2295510
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 271 additions and 15 deletions

View File

@ -5,6 +5,7 @@ package api
import (
"errors"
"fmt"
"net/url"
)
@ -88,6 +89,18 @@ func (n *NodePools) Delete(name string, w *WriteOptions) (*WriteMeta, error) {
return wm, nil
}
// ListJobs is used to list all the jobs in a node pool.
func (n *NodePools) ListJobs(poolName string, q *QueryOptions) ([]*JobListStub, *QueryMeta, error) {
var resp []*JobListStub
qm, err := n.client.query(
fmt.Sprintf("/v1/node/pool/%s/jobs", url.PathEscape(poolName)),
&resp, q)
if err != nil {
return nil, nil, err
}
return resp, qm, nil
}
// NodePool is used to serialize a node pool.
type NodePool struct {
Name string `hcl:"name,label"`

View File

@ -28,6 +28,9 @@ func (s *HTTPServer) NodePoolSpecificRequest(resp http.ResponseWriter, req *http
case strings.HasSuffix(path, "/nodes"):
poolName := strings.TrimSuffix(path, "/nodes")
return s.nodePoolNodesList(resp, req, poolName)
case strings.HasSuffix(path, "/jobs"):
poolName := strings.TrimSuffix(path, "/jobs")
return s.nodePoolJobList(resp, req, poolName)
default:
return s.nodePoolCRUD(resp, req, path)
}
@ -125,6 +128,10 @@ func (s *HTTPServer) nodePoolDelete(resp http.ResponseWriter, req *http.Request,
}
func (s *HTTPServer) nodePoolNodesList(resp http.ResponseWriter, req *http.Request, poolName string) (interface{}, error) {
if req.Method != http.MethodGet {
return nil, CodedError(http.StatusMethodNotAllowed, ErrInvalidMethod)
}
args := structs.NodePoolNodesRequest{
Name: poolName,
}
@ -157,3 +164,44 @@ func (s *HTTPServer) nodePoolNodesList(resp http.ResponseWriter, req *http.Reque
}
return out.Nodes, nil
}
func (s *HTTPServer) nodePoolJobList(resp http.ResponseWriter, req *http.Request, poolName string) (any, error) {
if req.Method != http.MethodGet {
return nil, CodedError(http.StatusMethodNotAllowed, ErrInvalidMethod)
}
args := structs.NodePoolJobsRequest{
Name: poolName,
}
if s.parse(resp, req, &args.Region, &args.QueryOptions) {
return nil, nil
}
if args.Prefix != "" {
// the prefix argument is ambiguous for this endpoint (does it refer to
// the node pool name or the job names like /v1/jobs?) so the RPC
// handler ignores it
return nil, CodedError(http.StatusBadRequest, "prefix argument not allowed")
}
// Parse meta query param
args.Fields = &structs.JobStubFields{}
jobMeta, err := parseBool(req, "meta")
if err != nil {
return nil, err
}
if jobMeta != nil {
args.Fields.Meta = *jobMeta
}
var out structs.NodePoolJobsResponse
if err := s.agent.RPC("NodePool.ListJobs", &args, &out); err != nil {
return nil, err
}
setMeta(resp, &out.QueryMeta)
if out.Jobs == nil {
out.Jobs = make([]*structs.JobListStub, 0)
}
return out.Jobs, nil
}

View File

@ -420,3 +420,68 @@ func TestHTTP_NodePool_NodesList(t *testing.T) {
}
})
}
func TestHTTP_NodePool_JobsList(t *testing.T) {
ci.Parallel(t)
httpTest(t, nil, func(s *TestAgent) {
pool1, pool2 := mock.NodePool(), mock.NodePool()
npUpReq := structs.NodePoolUpsertRequest{
NodePools: []*structs.NodePool{pool1, pool2},
}
var npUpResp structs.GenericResponse
err := s.Agent.RPC("NodePool.UpsertNodePools", &npUpReq, &npUpResp)
must.NoError(t, err)
for _, poolName := range []string{pool1.Name, "default"} {
for i := 0; i < 2; i++ {
job := mock.MinJob()
job.NodePool = poolName
jobRegReq := structs.JobRegisterRequest{
Job: job,
WriteRequest: structs.WriteRequest{
Region: "global",
Namespace: structs.DefaultNamespace,
},
}
var jobRegResp structs.JobRegisterResponse
must.NoError(t, s.Agent.RPC("Job.Register", &jobRegReq, &jobRegResp))
}
}
// Make HTTP request to occupied pool
req, err := http.NewRequest(http.MethodGet,
fmt.Sprintf("/v1/node/pool/%s/jobs", pool1.Name), nil)
must.NoError(t, err)
respW := httptest.NewRecorder()
obj, err := s.Server.NodePoolSpecificRequest(respW, req)
must.NoError(t, err)
must.SliceLen(t, 2, obj.([]*structs.JobListStub))
// Verify response index.
gotIndex, err := strconv.ParseUint(respW.HeaderMap.Get("X-Nomad-Index"), 10, 64)
must.NoError(t, err)
must.NonZero(t, gotIndex)
// Make HTTP request to empty pool
req, err = http.NewRequest(http.MethodGet,
fmt.Sprintf("/v1/node/pool/%s/jobs", pool2.Name), nil)
must.NoError(t, err)
respW = httptest.NewRecorder()
obj, err = s.Server.NodePoolSpecificRequest(respW, req)
must.NoError(t, err)
must.SliceLen(t, 0, obj.([]*structs.JobListStub))
// Make HTTP request to the "all"" pool
req, err = http.NewRequest(http.MethodGet, "/v1/node/pool/all/jobs", nil)
must.NoError(t, err)
respW = httptest.NewRecorder()
obj, err = s.Server.NodePoolSpecificRequest(respW, req)
must.NoError(t, err)
must.SliceLen(t, 4, obj.([]*structs.JobListStub))
})
}

View File

@ -312,21 +312,30 @@ func (n *NodePool) ListJobs(args *structs.NodePoolJobsRequest, reply *structs.No
},
}
if namespace == structs.AllNamespacesSentinel {
iter, err = store.JobsByPool(ws, args.Name)
if args.Name == structs.NodePoolAll {
if namespace == structs.AllNamespacesSentinel {
iter, err = store.Jobs(ws)
} else {
iter, err = store.JobsByNamespace(ws, namespace)
}
} else {
iter, err = store.JobsByNamespace(ws, namespace)
filters = append(filters,
paginator.GenericFilter{
Allow: func(raw interface{}) (bool, error) {
job := raw.(*structs.Job)
if job == nil || job.NodePool != args.Name {
return false, nil
}
return true, nil
},
})
if namespace == structs.AllNamespacesSentinel {
iter, err = store.JobsByPool(ws, args.Name)
} else {
iter, err = store.JobsByNamespace(ws, namespace)
filters = append(filters,
paginator.GenericFilter{
Allow: func(raw interface{}) (bool, error) {
job := raw.(*structs.Job)
if job == nil || job.NodePool != args.Name {
return false, nil
}
return true, nil
},
})
}
}
if err != nil {
return err
}

View File

@ -1278,7 +1278,7 @@ func TestNodePoolEndpoint_ListJobs_Blocking(t *testing.T) {
func TestNodePoolEndpoint_ListJobs_PaginationFiltering(t *testing.T) {
ci.Parallel(t)
s1, _, cleanupS1 := TestACLServer(t, func(c *Config) {
s1, root, cleanupS1 := TestACLServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
})
defer cleanupS1()
@ -1356,12 +1356,13 @@ func TestNodePoolEndpoint_ListJobs_PaginationFiltering(t *testing.T) {
filter string
nextToken string
pageSize int32
aclToken string
expectedNextToken string
expectedIDs []string
expectedError string
}{
{
name: "test00 all dev pool default NS",
name: "test00 dev pool default NS",
pool: "dev-1",
expectedIDs: []string{"job-00", "job-01", "job-04", "job-05", "job-10"},
},
@ -1447,6 +1448,31 @@ func TestNodePoolEndpoint_ListJobs_PaginationFiltering(t *testing.T) {
nextToken: "default.job-07",
expectedIDs: []string{"job-10"},
},
{
name: "test12 all pool wildcard NS",
pool: "all",
namespace: "*",
pageSize: 4,
expectedError: "Permission denied",
},
{
name: "test13 all pool wildcard NS",
pool: "all",
namespace: "*",
aclToken: root.SecretID,
expectedIDs: []string{ // note these are sorted namespace-then-job-ID
"job-00", "job-01", "job-02", "job-04", "job-05",
"job-08", "job-10", "job-06", "job-03", "job-09",
},
},
{
name: "test14 all pool default NS",
pool: "all",
pageSize: 4,
aclToken: root.SecretID,
expectedNextToken: "default.job-05",
expectedIDs: []string{"job-00", "job-01", "job-02", "job-04"},
},
}
for _, tc := range cases {
@ -1462,6 +1488,9 @@ func TestNodePoolEndpoint_ListJobs_PaginationFiltering(t *testing.T) {
},
}
req.AuthToken = devToken.SecretID
if tc.aclToken != "" {
req.AuthToken = tc.aclToken
}
var resp structs.NodePoolJobsResponse
err := msgpackrpc.CallWithCodec(codec, "NodePool.ListJobs", req, &resp)

View File

@ -342,4 +342,96 @@ $ nomad operator api /v1/node/pool/prod-eng/nodes?os=true
]
```
# List Node Pool Jobs
This endpoint lists the jobs in a node pool.
| Method | Path | Produces |
| ------ | -------------------------------- | ------------------ |
| `GET` | `/v1/node/pool/:node_pool/jobs` | `application/json` |
The table below shows this endpoint's support for
[blocking queries](/nomad/api-docs#blocking-queries) and
[required ACLs](/nomad/api-docs#acls).
| Blocking Queries | ACL Required |
| ---------------- | ----------------------------------- |
| `YES` | `namespace:read` <br /> `node_pool:read` |
### Parameters
- `:node_pool` `(string: <required>)`- Specifies the node pool to list jobs.
- `filter` `(string: "")` - Specifies the [expression](/nomad/api-docs#filtering)
used to filter the results. Consider using pagination to reduce resource used
to serve the request.
- `meta` `(bool: false)` - If set, jobs returned will include a
[meta](/nomad/docs/job-specification/meta) field containing key-value pairs
provided in the job specification's `meta` block.
- `namespace` `(string: "default")` - Specifies the target namespace. Specifying
`*` will return all jobs across all the authorized namespaces.
- `next_token` `(string: "")` - This endpoint supports paging. The `next_token`
parameter accepts a string which identifies the next expected job. This value
can be obtained from the `X-Nomad-NextToken` header from the previous
response.
- `per_page` `(int: 0)` - Specifies a maximum number of jobs to return for this
request. If omitted, the response is not paginated. The value of the
`X-Nomad-NextToken` header of the last response can be used as the
`next_token` of the next request to fetch additional pages.
### Sample Request
```shell-session
$ nomad operator api /v1/node/pool/prod-eng/jobs
```
```shell-session
$ nomad operator api /v1/node/pool/prod-eng/jobs?namespace=prod
```
### Sample Response
```json
[
{
"ID": "example",
"ParentID": "",
"Name": "example",
"Type": "service",
"Priority": 50,
"Status": "pending",
"StatusDescription": "",
"JobSummary": {
"JobID": "example",
"Namespace": "default",
"Summary": {
"cache": {
"Queued": 1,
"Complete": 1,
"Failed": 0,
"Running": 0,
"Starting": 0,
"Lost": 0
}
},
"Children": {
"Pending": 0,
"Running": 0,
"Dead": 0
},
"CreateIndex": 52,
"ModifyIndex": 96
},
"CreateIndex": 52,
"ModifyIndex": 93,
"JobModifyIndex": 52
}
]
```
[api_scheduler_alog]: /nomad/api-docs/operator/scheduler#scheduleralgorithm