node pools: implement RPC to list jobs in a given node pool (#17396)

Implements the `NodePool.ListJobs` RPC, with pagination and filtering based on
the existing `Job.List` RPC.
This commit is contained in:
Tim Gross 2023-06-05 15:36:52 -04:00 committed by GitHub
parent d1d4d22f8e
commit 2d16ec6c6f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 556 additions and 0 deletions

View File

@ -4,6 +4,8 @@
package nomad package nomad
import ( import (
"errors"
"fmt"
"net/http" "net/http"
"time" "time"
@ -252,3 +254,132 @@ func (n *NodePool) DeleteNodePools(args *structs.NodePoolDeleteRequest, reply *s
reply.Index = index reply.Index = index
return nil return nil
} }
// ListJobs is used to retrieve a list of jobs for a given node pool. It supports
// pagination and filtering.
func (n *NodePool) ListJobs(args *structs.NodePoolJobsRequest, reply *structs.NodePoolJobsResponse) error {
authErr := n.srv.Authenticate(n.ctx, args)
if done, err := n.srv.forward("NodePool.ListJobs", args, args, reply); done {
return err
}
n.srv.MeasureRPCRate("node_pool", structs.RateMetricRead, args)
if authErr != nil {
return structs.ErrPermissionDenied
}
defer metrics.MeasureSince([]string{"nomad", "node_pool", "list_jobs"}, time.Now())
// Resolve ACL token and verify it has read capability for the pool.
aclObj, err := n.srv.ResolveACL(args)
if err != nil {
return err
}
if !aclObj.AllowNodePoolOperation(args.Name, acl.NodePoolCapabilityRead) {
return structs.ErrPermissionDenied
}
allowNsFunc := aclObj.AllowNsOpFunc(acl.NamespaceCapabilityListJobs)
namespace := args.RequestNamespace()
// Setup the blocking query. This largely mirrors the Jobs.List RPC but with
// an additional paginator filter for the node pool.
opts := blockingOptions{
queryOpts: &args.QueryOptions,
queryMeta: &reply.QueryMeta,
run: func(ws memdb.WatchSet, store *state.StateStore) error {
// ensure the node pool exists
pool, err := store.NodePoolByName(ws, args.Name)
if err != nil {
return err
}
if pool == nil {
return nil
}
var iter memdb.ResultIterator
// Get the namespaces the user is allowed to access.
allowableNamespaces, err := allowedNSes(aclObj, store, allowNsFunc)
if errors.Is(err, structs.ErrPermissionDenied) {
// return empty jobs if token isn't authorized for any
// namespace, matching other endpoints
reply.Jobs = make([]*structs.JobListStub, 0)
} else if err != nil {
return err
} else {
filters := []paginator.Filter{
paginator.NamespaceFilter{
AllowableNamespaces: allowableNamespaces,
},
}
if namespace == structs.AllNamespacesSentinel {
iter, err = store.JobsByPool(ws, args.Name)
} else {
iter, err = store.JobsByNamespace(ws, namespace)
filters = append(filters,
paginator.GenericFilter{
Allow: func(raw interface{}) (bool, error) {
job := raw.(*structs.Job)
if job == nil || job.NodePool != args.Name {
return false, nil
}
return true, nil
},
})
}
if err != nil {
return err
}
tokenizer := paginator.NewStructsTokenizer(
iter,
paginator.StructsTokenizerOptions{
WithNamespace: true,
WithID: true,
},
)
var jobs []*structs.JobListStub
paginator, err := paginator.NewPaginator(iter, tokenizer, filters, args.QueryOptions,
func(raw interface{}) error {
job := raw.(*structs.Job)
summary, err := store.JobSummaryByID(ws, job.Namespace, job.ID)
if err != nil || summary == nil {
return fmt.Errorf("unable to look up summary for job: %v", job.ID)
}
jobs = append(jobs, job.Stub(summary, args.Fields))
return nil
})
if err != nil {
return structs.NewErrRPCCodedf(
http.StatusBadRequest, "failed to create result paginator: %v", err)
}
nextToken, err := paginator.Page()
if err != nil {
return structs.NewErrRPCCodedf(
http.StatusBadRequest, "failed to read result page: %v", err)
}
reply.QueryMeta.NextToken = nextToken
reply.Jobs = jobs
}
// Use the last index that affected the jobs table or summary
jindex, err := store.Index("jobs")
if err != nil {
return err
}
sindex, err := store.Index("job_summary")
if err != nil {
return err
}
reply.Index = helper.Max(jindex, sindex)
// Set the query response
n.srv.setQueryMeta(&reply.QueryMeta)
return nil
}}
return n.srv.blockingRPC(&opts)
}

View File

@ -10,8 +10,10 @@ import (
"github.com/google/go-cmp/cmp/cmpopts" "github.com/google/go-cmp/cmp/cmpopts"
"github.com/hashicorp/go-memdb" "github.com/hashicorp/go-memdb"
"github.com/hashicorp/go-set"
msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc" msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/hashicorp/nomad/ci" "github.com/hashicorp/nomad/ci"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil" "github.com/hashicorp/nomad/testutil"
@ -1095,3 +1097,390 @@ func TestNodePoolEndpoint_DeleteNodePools_ACL(t *testing.T) {
}) })
} }
} }
func TestNodePoolEndpoint_ListJobs_ACLs(t *testing.T) {
ci.Parallel(t)
s1, root, cleanupS1 := TestACLServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
})
defer cleanupS1()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
store := s1.fsm.State()
index := uint64(1000)
var err error
// Populate state with some node pools.
poolDev := &structs.NodePool{
Name: "dev-1",
Description: "test node pool for dev-1",
}
poolProd := &structs.NodePool{
Name: "prod-1",
Description: "test node pool for prod-1",
}
err = store.UpsertNodePools(structs.MsgTypeTestSetup, index, []*structs.NodePool{
poolDev,
poolProd,
})
must.NoError(t, err)
// for refering to the jobs in assertions
jobIDs := map[string]string{}
// register jobs in all pools and all namespaces
for _, ns := range []string{"engineering", "system", "default"} {
index++
must.NoError(t, store.UpsertNamespaces(index, []*structs.Namespace{{Name: ns}}))
for _, pool := range []string{"dev-1", "prod-1", "default"} {
job := mock.MinJob()
job.Namespace = ns
job.NodePool = pool
jobIDs[ns+"+"+pool] = job.ID
index++
must.NoError(t, store.UpsertJob(structs.MsgTypeTestSetup, index, nil, job))
}
}
req := &structs.NodePoolJobsRequest{
Name: "dev-1",
QueryOptions: structs.QueryOptions{
Region: "global",
Namespace: structs.AllNamespacesSentinel},
}
// Expect failure for request without a token
var resp structs.NodePoolJobsResponse
err = msgpackrpc.CallWithCodec(codec, "NodePool.ListJobs", req, &resp)
must.EqError(t, err, structs.ErrPermissionDenied.Error())
// Management token can read any namespace / any pool
// var mgmtResp structs.NodePoolJobsResponse
req.AuthToken = root.SecretID
err = msgpackrpc.CallWithCodec(codec, "NodePool.ListJobs", req, &resp)
must.NoError(t, err)
must.Len(t, 3, resp.Jobs)
must.SliceContainsAll(t,
helper.ConvertSlice(resp.Jobs, func(j *structs.JobListStub) string { return j.ID }),
[]string{jobIDs["engineering+dev-1"], jobIDs["system+dev-1"], jobIDs["default+dev-1"]})
// Policy that allows access to any pool but one namespace
index++
devToken := mock.CreatePolicyAndToken(t, store, index, "dev-node-pools",
fmt.Sprintf("%s\n%s\n%s\n",
mock.NodePoolPolicy("dev-*", "read", nil),
mock.NodePoolPolicy("default", "read", nil),
mock.NamespacePolicy("engineering", "read", nil)),
)
req.AuthToken = devToken.SecretID
// with wildcard namespace
err = msgpackrpc.CallWithCodec(codec, "NodePool.ListJobs", req, &resp)
must.NoError(t, err)
must.Len(t, 1, resp.Jobs)
must.Eq(t, jobIDs["engineering+dev-1"], resp.Jobs[0].ID)
// with specific allowed namespaces
req.Namespace = "engineering"
err = msgpackrpc.CallWithCodec(codec, "NodePool.ListJobs", req, &resp)
must.NoError(t, err)
must.Len(t, 1, resp.Jobs)
must.Eq(t, jobIDs["engineering+dev-1"], resp.Jobs[0].ID)
// with disallowed namespace
req.Namespace = "system"
err = msgpackrpc.CallWithCodec(codec, "NodePool.ListJobs", req, &resp)
must.NoError(t, err)
must.Len(t, 0, resp.Jobs)
// with disallowed pool but allowed namespace
req.Namespace = "engineering"
req.Name = "prod-1"
err = msgpackrpc.CallWithCodec(codec, "NodePool.ListJobs", req, &resp)
must.EqError(t, err, structs.ErrPermissionDenied.Error())
}
func TestNodePoolEndpoint_ListJobs_Blocking(t *testing.T) {
ci.Parallel(t)
s1, cleanupS1 := TestServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
})
defer cleanupS1()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
store := s1.fsm.State()
index := uint64(1000)
var err error
// Populate state with a node pool and a job in the default pool
poolDev := &structs.NodePool{
Name: "dev-1",
Description: "test node pool for dev-1",
}
err = store.UpsertNodePools(structs.MsgTypeTestSetup, index, []*structs.NodePool{poolDev})
must.NoError(t, err)
job := mock.MinJob()
index++
must.NoError(t, store.UpsertJob(structs.MsgTypeTestSetup, index, nil, job))
req := &structs.NodePoolJobsRequest{
Name: "default",
QueryOptions: structs.QueryOptions{
Region: "global",
Namespace: structs.AllNamespacesSentinel},
}
// List the job and get the index
var resp structs.NodePoolJobsResponse
err = msgpackrpc.CallWithCodec(codec, "NodePool.ListJobs", req, &resp)
must.Len(t, 1, resp.Jobs)
must.Eq(t, index, resp.Index)
must.Eq(t, "default", resp.Jobs[0].NodePool)
// Moving a job into a pool we're watching should trigger a watch
index++
time.AfterFunc(100*time.Millisecond, func() {
job = job.Copy()
job.NodePool = "dev-1"
must.NoError(t, store.UpsertJob(structs.MsgTypeTestSetup, index, nil, job))
})
req.Name = "dev-1"
req.MinQueryIndex = index
req.MaxQueryTime = 500 * time.Millisecond
err = msgpackrpc.CallWithCodec(codec, "NodePool.ListJobs", req, &resp)
must.Len(t, 1, resp.Jobs)
must.Eq(t, index, resp.Index)
// Moving a job out of a pool we're watching should trigger a watch
index++
time.AfterFunc(100*time.Millisecond, func() {
job = job.Copy()
job.NodePool = "default"
must.NoError(t, store.UpsertJob(structs.MsgTypeTestSetup, index, nil, job))
})
req.Name = "dev-1"
req.MinQueryIndex = index
req.MaxQueryTime = 500 * time.Millisecond
err = msgpackrpc.CallWithCodec(codec, "NodePool.ListJobs", req, &resp)
must.Len(t, 0, resp.Jobs)
must.Eq(t, index, resp.Index)
}
func TestNodePoolEndpoint_ListJobs_PaginationFiltering(t *testing.T) {
ci.Parallel(t)
s1, _, cleanupS1 := TestACLServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
})
defer cleanupS1()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
store := s1.fsm.State()
index := uint64(1000)
var err error
// Populate state with some node pools.
poolDev := &structs.NodePool{
Name: "dev-1",
Description: "test node pool for dev-1",
}
poolProd := &structs.NodePool{
Name: "prod-1",
Description: "test node pool for prod-1",
}
err = store.UpsertNodePools(structs.MsgTypeTestSetup, index, []*structs.NodePool{
poolDev,
poolProd,
})
must.NoError(t, err)
index++
must.NoError(t, store.UpsertNamespaces(index,
[]*structs.Namespace{{Name: "non-default"}, {Name: "other"}}))
// create a set of jobs. these are in the order that the state store will
// return them from the iterator (sorted by key) for ease of writing tests
mocks := []struct {
name string
pool string
namespace string
status string
}{
{name: "job-00", pool: "dev-1", namespace: "default", status: structs.JobStatusPending},
{name: "job-01", pool: "dev-1", namespace: "default", status: structs.JobStatusPending},
{name: "job-02", pool: "default", namespace: "default", status: structs.JobStatusPending},
{name: "job-03", pool: "dev-1", namespace: "non-default", status: structs.JobStatusPending},
{name: "job-04", pool: "dev-1", namespace: "default", status: structs.JobStatusRunning},
{name: "job-05", pool: "dev-1", namespace: "default", status: structs.JobStatusRunning},
{name: "job-06", pool: "dev-1", namespace: "other", status: structs.JobStatusPending},
// job-07 is missing for missing index assertion
{name: "job-08", pool: "prod-1", namespace: "default", status: structs.JobStatusRunning},
{name: "job-09", pool: "prod-1", namespace: "non-default", status: structs.JobStatusPending},
{name: "job-10", pool: "dev-1", namespace: "default", status: structs.JobStatusPending},
}
for _, m := range mocks {
job := mock.MinJob()
job.ID = m.name
job.Name = m.name
job.NodePool = m.pool
job.Status = m.status
job.Namespace = m.namespace
index++
job.CreateIndex = index
must.NoError(t, store.UpsertJob(structs.MsgTypeTestSetup, index, nil, job))
}
// Policy that allows access to 2 pools and any namespace
index++
devToken := mock.CreatePolicyAndToken(t, store, index, "dev-node-pools",
fmt.Sprintf("%s\n%s\n%s\n",
mock.NodePoolPolicy("dev-*", "read", nil),
mock.NodePoolPolicy("default", "read", nil),
mock.NamespacePolicy("*", "read", nil)),
)
cases := []struct {
name string
pool string
namespace string
filter string
nextToken string
pageSize int32
expectedNextToken string
expectedIDs []string
expectedError string
}{
{
name: "test00 all dev pool default NS",
pool: "dev-1",
expectedIDs: []string{"job-00", "job-01", "job-04", "job-05", "job-10"},
},
{
name: "test01 size-2 page-1 dev pool default NS",
pool: "dev-1",
pageSize: 2,
expectedNextToken: "default.job-04",
expectedIDs: []string{"job-00", "job-01"},
},
{
name: "test02 size-2 page-1 dev pool wildcard NS",
pool: "dev-1",
namespace: "*",
pageSize: 2,
expectedNextToken: "default.job-04",
expectedIDs: []string{"job-00", "job-01"},
},
{
name: "test03 size-2 page-2 dev pool default NS",
pool: "dev-1",
pageSize: 2,
nextToken: "default.job-04",
expectedNextToken: "default.job-10",
expectedIDs: []string{"job-04", "job-05"},
},
{
name: "test04 size-2 page-2 wildcard NS",
pool: "dev-1",
namespace: "*",
pageSize: 2,
nextToken: "default.job-04",
expectedNextToken: "default.job-10",
expectedIDs: []string{"job-04", "job-05"},
},
{
name: "test05 no valid results with filters",
pool: "dev-1",
pageSize: 2,
nextToken: "",
filter: `Name matches "not-job"`,
expectedIDs: []string{},
},
{
name: "test06 go-bexpr filter across namespaces",
pool: "dev-1",
namespace: "*",
filter: `Name matches "job-0[12345]"`,
expectedIDs: []string{"job-01", "job-04", "job-05", "job-03"},
},
{
name: "test07 go-bexpr filter with pagination",
pool: "dev-1",
namespace: "*",
filter: `Name matches "job-0[12345]"`,
pageSize: 3,
expectedNextToken: "non-default.job-03",
expectedIDs: []string{"job-01", "job-04", "job-05"},
},
{
name: "test08 go-bexpr filter in namespace",
pool: "dev-1",
namespace: "non-default",
filter: `Status == "pending"`,
expectedIDs: []string{"job-03"},
},
{
name: "test09 go-bexpr invalid expression",
pool: "dev-1",
filter: `NotValid`,
expectedError: "failed to read filter expression",
},
{
name: "test10 go-bexpr invalid field",
pool: "dev-1",
filter: `InvalidField == "value"`,
expectedError: "error finding value in datum",
},
{
name: "test11 missing index",
pool: "dev-1",
pageSize: 1,
nextToken: "default.job-07",
expectedIDs: []string{"job-10"},
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
req := &structs.NodePoolJobsRequest{
Name: tc.pool,
QueryOptions: structs.QueryOptions{
Region: "global",
Namespace: tc.namespace,
Filter: tc.filter,
PerPage: tc.pageSize,
NextToken: tc.nextToken,
},
}
req.AuthToken = devToken.SecretID
var resp structs.NodePoolJobsResponse
err := msgpackrpc.CallWithCodec(codec, "NodePool.ListJobs", req, &resp)
if tc.expectedError == "" {
must.NoError(t, err)
} else {
must.Error(t, err)
must.ErrorContains(t, err, tc.expectedError)
return
}
got := set.FromFunc(resp.Jobs,
func(j *structs.JobListStub) string { return j.ID })
must.True(t, got.ContainsSlice(tc.expectedIDs),
must.Sprintf("unexpected page of jobs: %v", got))
must.Eq(t, tc.expectedNextToken, resp.QueryMeta.NextToken,
must.Sprint("unexpected NextToken"))
})
}
}

View File

@ -237,6 +237,14 @@ func jobTableSchema() *memdb.TableSchema {
Conditional: jobIsPeriodic, Conditional: jobIsPeriodic,
}, },
}, },
"pool": {
Name: "pool",
AllowMissing: false,
Unique: false,
Indexer: &memdb.StringFieldIndex{
Field: "NodePool",
},
},
}, },
} }
} }

View File

@ -2287,6 +2287,20 @@ func (s *StateStore) JobsByGC(ws memdb.WatchSet, gc bool) (memdb.ResultIterator,
return iter, nil return iter, nil
} }
// JobsByPool returns an iterator over all jobs in a given node pool.
func (s *StateStore) JobsByPool(ws memdb.WatchSet, pool string) (memdb.ResultIterator, error) {
txn := s.db.ReadTxn()
iter, err := txn.Get("jobs", "pool", pool)
if err != nil {
return nil, err
}
ws.Add(iter.WatchCh())
return iter, nil
}
// JobSummaryByID returns a job summary object which matches a specific id. // JobSummaryByID returns a job summary object which matches a specific id.
func (s *StateStore) JobSummaryByID(ws memdb.WatchSet, namespace, jobID string) (*structs.JobSummary, error) { func (s *StateStore) JobSummaryByID(ws memdb.WatchSet, namespace, jobID string) (*structs.JobSummary, error) {
txn := s.db.ReadTxn() txn := s.db.ReadTxn()

View File

@ -181,3 +181,16 @@ type NodePoolDeleteRequest struct {
Names []string Names []string
WriteRequest WriteRequest
} }
// NodePoolJobsRequest is used to make a request for the jobs in a specific node pool.
type NodePoolJobsRequest struct {
Name string
Fields *JobStubFields
QueryOptions
}
// NodePoolJobsResponse returns a list of jobs in a specific node pool.
type NodePoolJobsResponse struct {
Jobs []*JobListStub
QueryMeta
}

View File

@ -392,6 +392,7 @@ those listed in [Key Metrics](#key-metrics) above.
| `nomad.nomad.namespace.list_namespace` | Time elapsed for `Namespace.ListNamespaces` | Nanoseconds | Summary | host | | `nomad.nomad.namespace.list_namespace` | Time elapsed for `Namespace.ListNamespaces` | Nanoseconds | Summary | host |
| `nomad.nomad.namespace.upsert_namespaces` | Time elapsed for `Namespace.UpsertNamespaces` | Nanoseconds | Summary | host | | `nomad.nomad.namespace.upsert_namespaces` | Time elapsed for `Namespace.UpsertNamespaces` | Nanoseconds | Summary | host |
| `nomad.nomad.node_pool.list` | Time elapsed for `NodePool.List` RPC call | Nanoseconds | Summary | host | | `nomad.nomad.node_pool.list` | Time elapsed for `NodePool.List` RPC call | Nanoseconds | Summary | host |
| `nomad.nomad.node_pool.list_jobs` | Time elapsed for `NodePool.List` RPC call | Nanoseconds | Summary | host |
| `nomad.nomad.node_pool.get_node_pool` | Time elapsed for `NodePool.GetNodePool` RPC call | Nanoseconds | Summary | host | | `nomad.nomad.node_pool.get_node_pool` | Time elapsed for `NodePool.GetNodePool` RPC call | Nanoseconds | Summary | host |
| `nomad.nomad.node_pool.upsert_node_pools` | Time elapsed for `NodePool.UpsertNodePools` RPC call | Nanoseconds | Summary | host | | `nomad.nomad.node_pool.upsert_node_pools` | Time elapsed for `NodePool.UpsertNodePools` RPC call | Nanoseconds | Summary | host |
| `nomad.nomad.node_pool.delete_node_pools` | Time elapsed for `NodePool.DeleteNodePools` RPC call | Nanoseconds | Summary | host | | `nomad.nomad.node_pool.delete_node_pools` | Time elapsed for `NodePool.DeleteNodePools` RPC call | Nanoseconds | Summary | host |