node pools: list nodes in pool (#17413)
This commit is contained in:
parent
c26f01eefd
commit
2420c93179
|
@ -710,6 +710,9 @@ func (a *ACL) AllowAgentWrite() bool {
|
|||
// AllowNodeRead checks if read operations are allowed for a node
|
||||
func (a *ACL) AllowNodeRead() bool {
|
||||
switch {
|
||||
// a is nil if ACLs are disabled.
|
||||
case a == nil:
|
||||
return true
|
||||
case a.management:
|
||||
return true
|
||||
case a.node == PolicyWrite:
|
||||
|
|
|
@ -1051,6 +1051,32 @@ func parseNode(req *http.Request, nodeID *string) {
|
|||
}
|
||||
}
|
||||
|
||||
// parseNodeListStubFields parses query parameters related to node list stubs
|
||||
// fields.
|
||||
func parseNodeListStubFields(req *http.Request) (*structs.NodeStubFields, error) {
|
||||
fields := &structs.NodeStubFields{}
|
||||
|
||||
// Parse resources field selection.
|
||||
resources, err := parseBool(req, "resources")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if resources != nil {
|
||||
fields.Resources = *resources
|
||||
}
|
||||
|
||||
// Parse OS field selection.
|
||||
os, err := parseBool(req, "os")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if os != nil {
|
||||
fields.OS = *os
|
||||
}
|
||||
|
||||
return fields, nil
|
||||
}
|
||||
|
||||
// parseWriteRequest is a convenience method for endpoints that need to parse a
|
||||
// write request.
|
||||
func (s *HTTPServer) parseWriteRequest(req *http.Request, w *structs.WriteRequest) {
|
||||
|
|
|
@ -727,6 +727,74 @@ func TestParsePagination(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestParseNodeListStubFields(t *testing.T) {
|
||||
ci.Parallel(t)
|
||||
|
||||
testCases := []struct {
|
||||
name string
|
||||
req string
|
||||
expected *structs.NodeStubFields
|
||||
expectedErr string
|
||||
}{
|
||||
{
|
||||
name: "parse resources",
|
||||
req: "/v1/nodes?resources=true",
|
||||
expected: &structs.NodeStubFields{
|
||||
Resources: true,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "parse os",
|
||||
req: "/v1/nodes?os=true",
|
||||
expected: &structs.NodeStubFields{
|
||||
OS: true,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "no resources but with os",
|
||||
req: "/v1/nodes?resources=false&os=true",
|
||||
expected: &structs.NodeStubFields{
|
||||
OS: true,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "invalid resources value",
|
||||
req: "/v1/nodes?resources=invalid",
|
||||
expectedErr: `Failed to parse value of "resources"`,
|
||||
},
|
||||
{
|
||||
name: "invalid os value",
|
||||
req: "/v1/nodes?os=invalid",
|
||||
expectedErr: `Failed to parse value of "os"`,
|
||||
},
|
||||
{
|
||||
name: "invalid key is ignored",
|
||||
req: "/v1/nodes?key=invalid",
|
||||
expected: &structs.NodeStubFields{},
|
||||
},
|
||||
{
|
||||
name: "no field",
|
||||
req: "/v1/nodes",
|
||||
expected: &structs.NodeStubFields{},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
req, err := http.NewRequest("GET", tc.req, nil)
|
||||
must.NoError(t, err)
|
||||
|
||||
got, err := parseNodeListStubFields(req)
|
||||
if tc.expectedErr != "" {
|
||||
must.ErrorContains(t, err, tc.expectedErr)
|
||||
} else {
|
||||
must.NoError(t, err)
|
||||
must.Eq(t, tc.expected, got)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// TestHTTP_VerifyHTTPSClient asserts that a client certificate signed by the
|
||||
// appropriate CA is required when VerifyHTTPSClient=true.
|
||||
func TestHTTP_VerifyHTTPSClient(t *testing.T) {
|
||||
|
|
|
@ -4,6 +4,7 @@
|
|||
package agent
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
"strings"
|
||||
|
||||
|
@ -21,25 +22,12 @@ func (s *HTTPServer) NodesRequest(resp http.ResponseWriter, req *http.Request) (
|
|||
return nil, nil
|
||||
}
|
||||
|
||||
args.Fields = &structs.NodeStubFields{}
|
||||
// Parse resources field selection
|
||||
resources, err := parseBool(req, "resources")
|
||||
// Parse fields selection.
|
||||
fields, err := parseNodeListStubFields(req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if resources != nil {
|
||||
args.Fields.Resources = *resources
|
||||
}
|
||||
|
||||
// Parse OS
|
||||
os, err := parseBool(req, "os")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if os != nil {
|
||||
args.Fields.OS = *os
|
||||
return nil, CodedError(http.StatusBadRequest, fmt.Errorf("Failed to parse node list fields: %v", err).Error())
|
||||
}
|
||||
args.Fields = fields
|
||||
|
||||
var out structs.NodeListResponse
|
||||
if err := s.agent.RPC("Node.List", &args, &out); err != nil {
|
||||
|
|
|
@ -4,6 +4,7 @@
|
|||
package agent
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
"strings"
|
||||
|
||||
|
@ -24,6 +25,9 @@ func (s *HTTPServer) NodePoolsRequest(resp http.ResponseWriter, req *http.Reques
|
|||
func (s *HTTPServer) NodePoolSpecificRequest(resp http.ResponseWriter, req *http.Request) (any, error) {
|
||||
path := strings.TrimPrefix(req.URL.Path, "/v1/node/pool/")
|
||||
switch {
|
||||
case strings.HasSuffix(path, "/nodes"):
|
||||
poolName := strings.TrimSuffix(path, "/nodes")
|
||||
return s.nodePoolNodesList(resp, req, poolName)
|
||||
default:
|
||||
return s.nodePoolCRUD(resp, req, path)
|
||||
}
|
||||
|
@ -119,3 +123,37 @@ func (s *HTTPServer) nodePoolDelete(resp http.ResponseWriter, req *http.Request,
|
|||
setIndex(resp, out.Index)
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (s *HTTPServer) nodePoolNodesList(resp http.ResponseWriter, req *http.Request, poolName string) (interface{}, error) {
|
||||
args := structs.NodePoolNodesRequest{
|
||||
Name: poolName,
|
||||
}
|
||||
if s.parse(resp, req, &args.Region, &args.QueryOptions) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// Parse node fields selection.
|
||||
fields, err := parseNodeListStubFields(req)
|
||||
if err != nil {
|
||||
return nil, CodedError(http.StatusBadRequest, fmt.Sprintf("Failed to parse node list fields: %v", err))
|
||||
}
|
||||
args.Fields = fields
|
||||
|
||||
if args.Prefix != "" {
|
||||
// the prefix argument is ambiguous for this endpoint (does it refer to
|
||||
// the node pool name or the node IDs like /v1/nodes?) so the RPC
|
||||
// handler ignores it
|
||||
return nil, CodedError(http.StatusBadRequest, "prefix argument not allowed")
|
||||
}
|
||||
|
||||
var out structs.NodePoolNodesResponse
|
||||
if err := s.agent.RPC("NodePool.ListNodes", &args, &out); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
setMeta(resp, &out.QueryMeta)
|
||||
if out.Nodes == nil {
|
||||
out.Nodes = make([]*structs.NodeListStub, 0)
|
||||
}
|
||||
return out.Nodes, nil
|
||||
}
|
||||
|
|
|
@ -312,3 +312,111 @@ func TestHTTP_NodePool_Delete(t *testing.T) {
|
|||
must.Nil(t, got)
|
||||
})
|
||||
}
|
||||
|
||||
func TestHTTP_NodePool_NodesList(t *testing.T) {
|
||||
ci.Parallel(t)
|
||||
httpTest(t,
|
||||
func(c *Config) {
|
||||
// Disable client so it doesn't impact tests since we're registering
|
||||
// our own test nodes.
|
||||
c.Client.Enabled = false
|
||||
},
|
||||
func(s *TestAgent) {
|
||||
// Populate state with test data.
|
||||
pool1 := mock.NodePool()
|
||||
pool2 := mock.NodePool()
|
||||
args := structs.NodePoolUpsertRequest{
|
||||
NodePools: []*structs.NodePool{pool1, pool2},
|
||||
}
|
||||
var resp structs.GenericResponse
|
||||
err := s.Agent.RPC("NodePool.UpsertNodePools", &args, &resp)
|
||||
must.NoError(t, err)
|
||||
|
||||
// Split test nodes between default, pool1, and pool2.
|
||||
nodesByPool := make(map[string][]*structs.Node)
|
||||
for i := 0; i < 10; i++ {
|
||||
node := mock.Node()
|
||||
switch i % 3 {
|
||||
case 0:
|
||||
// Leave node pool value empty so node goes to default.
|
||||
case 1:
|
||||
node.NodePool = pool1.Name
|
||||
case 2:
|
||||
node.NodePool = pool2.Name
|
||||
}
|
||||
nodeRegReq := structs.NodeRegisterRequest{
|
||||
Node: node,
|
||||
WriteRequest: structs.WriteRequest{
|
||||
Region: "global",
|
||||
},
|
||||
}
|
||||
var nodeRegResp structs.NodeUpdateResponse
|
||||
err := s.Agent.RPC("Node.Register", &nodeRegReq, &nodeRegResp)
|
||||
must.NoError(t, err)
|
||||
|
||||
nodesByPool[node.NodePool] = append(nodesByPool[node.NodePool], node)
|
||||
}
|
||||
|
||||
testCases := []struct {
|
||||
name string
|
||||
pool string
|
||||
args string
|
||||
expectedNodes []*structs.Node
|
||||
expectedErr string
|
||||
validateFn func(*testing.T, []*structs.NodeListStub)
|
||||
}{
|
||||
{
|
||||
name: "nodes in default",
|
||||
pool: structs.NodePoolDefault,
|
||||
expectedNodes: nodesByPool[structs.NodePoolDefault],
|
||||
validateFn: func(t *testing.T, stubs []*structs.NodeListStub) {
|
||||
must.Nil(t, stubs[0].NodeResources)
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "nodes in pool1 with resources",
|
||||
pool: pool1.Name,
|
||||
args: "resources=true",
|
||||
expectedNodes: nodesByPool[pool1.Name],
|
||||
validateFn: func(t *testing.T, stubs []*structs.NodeListStub) {
|
||||
must.NotNil(t, stubs[0].NodeResources)
|
||||
},
|
||||
},
|
||||
}
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
// Make HTTP request.
|
||||
path := fmt.Sprintf("/v1/node/pool/%s/nodes?%s", tc.pool, tc.args)
|
||||
req, err := http.NewRequest("GET", path, nil)
|
||||
must.NoError(t, err)
|
||||
respW := httptest.NewRecorder()
|
||||
|
||||
obj, err := s.Server.NodePoolSpecificRequest(respW, req)
|
||||
if tc.expectedErr != "" {
|
||||
must.ErrorContains(t, err, tc.expectedErr)
|
||||
return
|
||||
}
|
||||
must.NoError(t, err)
|
||||
|
||||
// Verify request only has expected nodes.
|
||||
stubs := obj.([]*structs.NodeListStub)
|
||||
must.Len(t, len(tc.expectedNodes), stubs)
|
||||
for _, node := range tc.expectedNodes {
|
||||
must.SliceContainsFunc(t, stubs, node, func(s *structs.NodeListStub, n *structs.Node) bool {
|
||||
return s.ID == n.ID
|
||||
})
|
||||
}
|
||||
|
||||
// Verify respose.
|
||||
if tc.validateFn != nil {
|
||||
tc.validateFn(t, stubs)
|
||||
}
|
||||
|
||||
// Verify response index.
|
||||
gotIndex, err := strconv.ParseUint(respW.HeaderMap.Get("X-Nomad-Index"), 10, 64)
|
||||
must.NoError(t, err)
|
||||
must.NonZero(t, gotIndex)
|
||||
})
|
||||
}
|
||||
})
|
||||
}
|
||||
|
|
|
@ -262,7 +262,7 @@ func (n *NodePool) ListJobs(args *structs.NodePoolJobsRequest, reply *structs.No
|
|||
if done, err := n.srv.forward("NodePool.ListJobs", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
n.srv.MeasureRPCRate("node_pool", structs.RateMetricRead, args)
|
||||
n.srv.MeasureRPCRate("node_pool", structs.RateMetricList, args)
|
||||
if authErr != nil {
|
||||
return structs.ErrPermissionDenied
|
||||
}
|
||||
|
@ -383,3 +383,93 @@ func (n *NodePool) ListJobs(args *structs.NodePoolJobsRequest, reply *structs.No
|
|||
}}
|
||||
return n.srv.blockingRPC(&opts)
|
||||
}
|
||||
|
||||
// ListNodes is used to retrieve a list of nodes for a give node pool. It
|
||||
// supports pagination and filtering.
|
||||
func (n *NodePool) ListNodes(args *structs.NodePoolNodesRequest, reply *structs.NodePoolNodesResponse) error {
|
||||
authErr := n.srv.Authenticate(n.ctx, args)
|
||||
if done, err := n.srv.forward("NodePool.ListNodes", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
n.srv.MeasureRPCRate("node_pool", structs.RateMetricList, args)
|
||||
if authErr != nil {
|
||||
return structs.ErrPermissionDenied
|
||||
}
|
||||
defer metrics.MeasureSince([]string{"nomad", "node_pool", "list_nodes"}, time.Now())
|
||||
|
||||
// Resolve ACL token and verify it has read capability for nodes and the
|
||||
// node pool.
|
||||
aclObj, err := n.srv.ResolveACL(args)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
allowed := aclObj.AllowNodeRead() &&
|
||||
aclObj.AllowNodePoolOperation(args.Name, acl.NodePoolCapabilityRead)
|
||||
if !allowed {
|
||||
return structs.ErrPermissionDenied
|
||||
}
|
||||
|
||||
// Setup blocking query.
|
||||
opts := blockingOptions{
|
||||
queryOpts: &args.QueryOptions,
|
||||
queryMeta: &reply.QueryMeta,
|
||||
run: func(ws memdb.WatchSet, store *state.StateStore) error {
|
||||
// Verify node pool exists.
|
||||
pool, err := store.NodePoolByName(ws, args.Name)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if pool == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Fetch nodes in the pool.
|
||||
var iter memdb.ResultIterator
|
||||
if args.Name == structs.NodePoolAll {
|
||||
iter, err = store.Nodes(ws)
|
||||
} else {
|
||||
iter, err = store.NodesByNodePool(ws, args.Name)
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Setup paginator by node ID.
|
||||
pageOpts := paginator.StructsTokenizerOptions{
|
||||
WithID: true,
|
||||
}
|
||||
tokenizer := paginator.NewStructsTokenizer(iter, pageOpts)
|
||||
|
||||
var nodes []*structs.NodeListStub
|
||||
pager, err := paginator.NewPaginator(iter, tokenizer, nil, args.QueryOptions,
|
||||
func(raw interface{}) error {
|
||||
node := raw.(*structs.Node)
|
||||
nodes = append(nodes, node.Stub(args.Fields))
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return structs.NewErrRPCCodedf(http.StatusBadRequest, "failed to create result paginator: %v", err)
|
||||
}
|
||||
|
||||
nextToken, err := pager.Page()
|
||||
if err != nil {
|
||||
return structs.NewErrRPCCodedf(http.StatusBadRequest, "failed to read result page: %v", err)
|
||||
}
|
||||
|
||||
reply.QueryMeta.NextToken = nextToken
|
||||
reply.Nodes = nodes
|
||||
|
||||
// Use the last index that affected the nodes table.
|
||||
index, err := store.Index("nodes")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
reply.Index = helper.Max(1, index)
|
||||
|
||||
// Set the query response.
|
||||
n.srv.setQueryMeta(&reply.QueryMeta)
|
||||
return nil
|
||||
}}
|
||||
return n.srv.blockingRPC(&opts)
|
||||
}
|
||||
|
|
|
@ -1482,5 +1482,328 @@ func TestNodePoolEndpoint_ListJobs_PaginationFiltering(t *testing.T) {
|
|||
must.Sprint("unexpected NextToken"))
|
||||
})
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestNodePoolEndpoint_ListNodes(t *testing.T) {
|
||||
ci.Parallel(t)
|
||||
|
||||
s, cleanupS := TestServer(t, nil)
|
||||
defer cleanupS()
|
||||
|
||||
codec := rpcClient(t, s)
|
||||
testutil.WaitForLeader(t, s.RPC)
|
||||
|
||||
// Populate state with test data.
|
||||
pool1 := mock.NodePool()
|
||||
pool2 := mock.NodePool()
|
||||
err := s.fsm.State().UpsertNodePools(structs.MsgTypeTestSetup, 1000, []*structs.NodePool{pool1, pool2})
|
||||
must.NoError(t, err)
|
||||
|
||||
// Split test nodes between default, pool1, and pool2.
|
||||
for i := 0; i < 9; i++ {
|
||||
node := mock.Node()
|
||||
switch i % 3 {
|
||||
case 0:
|
||||
node.ID = fmt.Sprintf("00000000-0000-0000-0000-0000000000%02d", i/3)
|
||||
node.NodePool = structs.NodePoolDefault
|
||||
case 1:
|
||||
node.ID = fmt.Sprintf("11111111-0000-0000-0000-0000000000%02d", i/3)
|
||||
node.NodePool = pool1.Name
|
||||
case 2:
|
||||
node.ID = fmt.Sprintf("22222222-0000-0000-0000-0000000000%02d", i/3)
|
||||
node.NodePool = pool2.Name
|
||||
}
|
||||
switch i % 2 {
|
||||
case 0:
|
||||
node.Attributes["os.name"] = "Windows"
|
||||
case 1:
|
||||
node.Attributes["os.name"] = "Linux"
|
||||
}
|
||||
err := s.fsm.State().UpsertNode(structs.MsgTypeTestSetup, uint64(1000+1), node)
|
||||
must.NoError(t, err)
|
||||
}
|
||||
|
||||
testCases := []struct {
|
||||
name string
|
||||
req *structs.NodePoolNodesRequest
|
||||
expectedErr string
|
||||
expected []string
|
||||
expectedNextToken string
|
||||
}{
|
||||
{
|
||||
name: "nodes in default",
|
||||
req: &structs.NodePoolNodesRequest{
|
||||
Name: structs.NodePoolDefault,
|
||||
},
|
||||
expected: []string{
|
||||
"00000000-0000-0000-0000-000000000000",
|
||||
"00000000-0000-0000-0000-000000000001",
|
||||
"00000000-0000-0000-0000-000000000002",
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "nodes in all",
|
||||
req: &structs.NodePoolNodesRequest{
|
||||
Name: structs.NodePoolAll,
|
||||
},
|
||||
expected: []string{
|
||||
"00000000-0000-0000-0000-000000000000",
|
||||
"00000000-0000-0000-0000-000000000001",
|
||||
"00000000-0000-0000-0000-000000000002",
|
||||
"11111111-0000-0000-0000-000000000000",
|
||||
"11111111-0000-0000-0000-000000000001",
|
||||
"11111111-0000-0000-0000-000000000002",
|
||||
"22222222-0000-0000-0000-000000000000",
|
||||
"22222222-0000-0000-0000-000000000001",
|
||||
"22222222-0000-0000-0000-000000000002",
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "nodes in pool1 with OS",
|
||||
req: &structs.NodePoolNodesRequest{
|
||||
Name: pool1.Name,
|
||||
Fields: &structs.NodeStubFields{
|
||||
OS: true,
|
||||
},
|
||||
},
|
||||
expected: []string{
|
||||
"11111111-0000-0000-0000-000000000000",
|
||||
"11111111-0000-0000-0000-000000000001",
|
||||
"11111111-0000-0000-0000-000000000002",
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "nodes in pool2 filtered by OS",
|
||||
req: &structs.NodePoolNodesRequest{
|
||||
Name: pool2.Name,
|
||||
QueryOptions: structs.QueryOptions{
|
||||
Filter: `Attributes["os.name"] == "Windows"`,
|
||||
},
|
||||
},
|
||||
expected: []string{
|
||||
"22222222-0000-0000-0000-000000000000",
|
||||
"22222222-0000-0000-0000-000000000002",
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "nodes in pool1 paginated with resources",
|
||||
req: &structs.NodePoolNodesRequest{
|
||||
Name: pool1.Name,
|
||||
Fields: &structs.NodeStubFields{
|
||||
Resources: true,
|
||||
},
|
||||
QueryOptions: structs.QueryOptions{
|
||||
PerPage: 2,
|
||||
},
|
||||
},
|
||||
expected: []string{
|
||||
"11111111-0000-0000-0000-000000000000",
|
||||
"11111111-0000-0000-0000-000000000001",
|
||||
},
|
||||
expectedNextToken: "11111111-0000-0000-0000-000000000002",
|
||||
},
|
||||
{
|
||||
name: "nodes in pool1 paginated with resources - page 2",
|
||||
req: &structs.NodePoolNodesRequest{
|
||||
Name: pool1.Name,
|
||||
Fields: &structs.NodeStubFields{
|
||||
Resources: true,
|
||||
},
|
||||
QueryOptions: structs.QueryOptions{
|
||||
PerPage: 2,
|
||||
NextToken: "11111111-0000-0000-0000-000000000002",
|
||||
},
|
||||
},
|
||||
expected: []string{
|
||||
"11111111-0000-0000-0000-000000000002",
|
||||
},
|
||||
expectedNextToken: "",
|
||||
},
|
||||
}
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
// Always send the request to the global region.
|
||||
tc.req.Region = "global"
|
||||
|
||||
// Make node pool nodes request.
|
||||
var resp structs.NodePoolNodesResponse
|
||||
err := msgpackrpc.CallWithCodec(codec, "NodePool.ListNodes", tc.req, &resp)
|
||||
|
||||
// Check response.
|
||||
if tc.expectedErr != "" {
|
||||
must.ErrorContains(t, err, tc.expectedErr)
|
||||
must.SliceEmpty(t, resp.Nodes)
|
||||
} else {
|
||||
must.NoError(t, err)
|
||||
|
||||
got := make([]string, len(resp.Nodes))
|
||||
for i, stub := range resp.Nodes {
|
||||
got[i] = stub.ID
|
||||
}
|
||||
must.Eq(t, tc.expected, got)
|
||||
must.Eq(t, tc.expectedNextToken, resp.NextToken)
|
||||
|
||||
if tc.req.Fields != nil {
|
||||
if tc.req.Fields.Resources {
|
||||
must.NotNil(t, resp.Nodes[0].NodeResources)
|
||||
must.NotNil(t, resp.Nodes[0].ReservedResources)
|
||||
}
|
||||
if tc.req.Fields.OS {
|
||||
must.NotEq(t, "", resp.Nodes[0].Attributes["os.name"])
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestNodePoolEndpoint_ListNodes_ACL(t *testing.T) {
|
||||
ci.Parallel(t)
|
||||
|
||||
s, root, cleanupS := TestACLServer(t, nil)
|
||||
defer cleanupS()
|
||||
|
||||
codec := rpcClient(t, s)
|
||||
testutil.WaitForLeader(t, s.RPC)
|
||||
|
||||
// Populate state.
|
||||
pool := mock.NodePool()
|
||||
pool.Name = "dev-1"
|
||||
err := s.fsm.State().UpsertNodePools(structs.MsgTypeTestSetup, 1000, []*structs.NodePool{pool})
|
||||
must.NoError(t, err)
|
||||
|
||||
node := mock.Node()
|
||||
node.NodePool = pool.Name
|
||||
err = s.fsm.State().UpsertNode(structs.MsgTypeTestSetup, 1001, node)
|
||||
must.NoError(t, err)
|
||||
|
||||
// Create test ACL tokens.
|
||||
validToken := mock.CreatePolicyAndToken(t, s.fsm.State(), 1002, "valid",
|
||||
fmt.Sprintf("%s\n%s", mock.NodePoolPolicy("dev-*", "read", nil), mock.NodePolicy("read")),
|
||||
)
|
||||
poolOnlyToken := mock.CreatePolicyAndToken(t, s.fsm.State(), 1004, "pool-only",
|
||||
mock.NodePoolPolicy("dev-*", "read", nil),
|
||||
)
|
||||
nodeOnlyToken := mock.CreatePolicyAndToken(t, s.fsm.State(), 1006, "node-only",
|
||||
mock.NodePolicy("read"),
|
||||
)
|
||||
noPolicyToken := mock.CreateToken(t, s.fsm.State(), 1008, nil)
|
||||
|
||||
testCases := []struct {
|
||||
name string
|
||||
pool string
|
||||
token string
|
||||
expected []string
|
||||
expectedErr string
|
||||
}{
|
||||
{
|
||||
name: "management token is allowed",
|
||||
token: root.SecretID,
|
||||
pool: pool.Name,
|
||||
expected: []string{node.ID},
|
||||
},
|
||||
{
|
||||
name: "valid token is allowed",
|
||||
token: validToken.SecretID,
|
||||
pool: pool.Name,
|
||||
expected: []string{node.ID},
|
||||
},
|
||||
{
|
||||
name: "pool only not enough",
|
||||
token: poolOnlyToken.SecretID,
|
||||
pool: pool.Name,
|
||||
expectedErr: structs.ErrPermissionDenied.Error(),
|
||||
},
|
||||
{
|
||||
name: "node only not enough",
|
||||
token: nodeOnlyToken.SecretID,
|
||||
pool: pool.Name,
|
||||
expectedErr: structs.ErrPermissionDenied.Error(),
|
||||
},
|
||||
{
|
||||
name: "no policy not allowed",
|
||||
token: noPolicyToken.SecretID,
|
||||
pool: pool.Name,
|
||||
expectedErr: structs.ErrPermissionDenied.Error(),
|
||||
},
|
||||
{
|
||||
name: "token not allowed for pool",
|
||||
token: validToken.SecretID,
|
||||
pool: structs.NodePoolDefault,
|
||||
expectedErr: structs.ErrPermissionDenied.Error(),
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
// Make node pool ndoes request.
|
||||
req := &structs.NodePoolNodesRequest{
|
||||
Name: tc.pool,
|
||||
QueryOptions: structs.QueryOptions{
|
||||
Region: "global",
|
||||
AuthToken: tc.token,
|
||||
},
|
||||
}
|
||||
var resp structs.NodePoolNodesResponse
|
||||
err := msgpackrpc.CallWithCodec(codec, "NodePool.ListNodes", req, &resp)
|
||||
if tc.expectedErr != "" {
|
||||
must.ErrorContains(t, err, tc.expectedErr)
|
||||
must.SliceEmpty(t, resp.Nodes)
|
||||
} else {
|
||||
must.NoError(t, err)
|
||||
|
||||
// Check response.
|
||||
got := make([]string, len(resp.Nodes))
|
||||
for i, node := range resp.Nodes {
|
||||
got[i] = node.ID
|
||||
}
|
||||
must.Eq(t, tc.expected, got)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestNodePoolEndpoint_ListNodes_BlockingQuery(t *testing.T) {
|
||||
ci.Parallel(t)
|
||||
|
||||
s, cleanupS := TestServer(t, nil)
|
||||
defer cleanupS()
|
||||
|
||||
codec := rpcClient(t, s)
|
||||
testutil.WaitForLeader(t, s.RPC)
|
||||
|
||||
// Populate state with some node pools.
|
||||
pool := mock.NodePool()
|
||||
err := s.fsm.State().UpsertNodePools(structs.MsgTypeTestSetup, 1000, []*structs.NodePool{pool})
|
||||
must.NoError(t, err)
|
||||
|
||||
// Register node in pool.
|
||||
// Insert triggers watchers.
|
||||
node := mock.Node()
|
||||
node.NodePool = pool.Name
|
||||
time.AfterFunc(100*time.Millisecond, func() {
|
||||
s.fsm.State().UpsertNode(structs.MsgTypeTestSetup, 1001, node)
|
||||
})
|
||||
|
||||
req := &structs.NodePoolNodesRequest{
|
||||
Name: pool.Name,
|
||||
QueryOptions: structs.QueryOptions{
|
||||
Region: "global",
|
||||
MinQueryIndex: 1000,
|
||||
},
|
||||
}
|
||||
var resp structs.NodePoolNodesResponse
|
||||
err = msgpackrpc.CallWithCodec(codec, "NodePool.ListNodes", req, &resp)
|
||||
must.NoError(t, err)
|
||||
must.Eq(t, 1001, resp.Index)
|
||||
|
||||
// Delete triggers watchers.
|
||||
time.AfterFunc(100*time.Millisecond, func() {
|
||||
s.fsm.State().DeleteNode(structs.MsgTypeTestSetup, 1002, []string{node.ID})
|
||||
})
|
||||
|
||||
req.MinQueryIndex = 1001
|
||||
err = msgpackrpc.CallWithCodec(codec, "NodePool.ListNodes", req, &resp)
|
||||
must.NoError(t, err)
|
||||
must.Eq(t, 1002, resp.Index)
|
||||
}
|
||||
|
|
|
@ -160,6 +160,14 @@ func nodeTableSchema() *memdb.TableSchema {
|
|||
Field: "SecretID",
|
||||
},
|
||||
},
|
||||
"node_pool": {
|
||||
Name: "node_pool",
|
||||
AllowMissing: false,
|
||||
Unique: false,
|
||||
Indexer: &memdb.StringFieldIndex{
|
||||
Field: "NodePool",
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1635,6 +1635,20 @@ func (s *StateStore) NodeBySecretID(ws memdb.WatchSet, secretID string) (*struct
|
|||
return nil, nil
|
||||
}
|
||||
|
||||
// NodesByNodePool returns an iterator over all nodes that are part of the
|
||||
// given node pool.
|
||||
func (s *StateStore) NodesByNodePool(ws memdb.WatchSet, pool string) (memdb.ResultIterator, error) {
|
||||
txn := s.db.ReadTxn()
|
||||
|
||||
iter, err := txn.Get("nodes", "node_pool", pool)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
ws.Add(iter.WatchCh())
|
||||
return iter, nil
|
||||
}
|
||||
|
||||
// Nodes returns an iterator over all the nodes
|
||||
func (s *StateStore) Nodes(ws memdb.WatchSet) (memdb.ResultIterator, error) {
|
||||
txn := s.db.ReadTxn()
|
||||
|
|
|
@ -2012,6 +2012,69 @@ func TestStateStore_NodesByIDPrefix(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestStateStore_NodesByNodePool(t *testing.T) {
|
||||
ci.Parallel(t)
|
||||
|
||||
state := testStateStore(t)
|
||||
|
||||
pool := mock.NodePool()
|
||||
err := state.UpsertNodePools(structs.MsgTypeTestSetup, 1000, []*structs.NodePool{pool})
|
||||
must.NoError(t, err)
|
||||
|
||||
node1 := mock.Node()
|
||||
node1.NodePool = structs.NodePoolDefault
|
||||
err = state.UpsertNode(structs.MsgTypeTestSetup, 1001, node1)
|
||||
must.NoError(t, err)
|
||||
|
||||
node2 := mock.Node()
|
||||
node2.NodePool = pool.Name
|
||||
err = state.UpsertNode(structs.MsgTypeTestSetup, 1002, node2)
|
||||
must.NoError(t, err)
|
||||
|
||||
testCases := []struct {
|
||||
name string
|
||||
pool string
|
||||
expected []string
|
||||
}{
|
||||
{
|
||||
name: "default",
|
||||
pool: structs.NodePoolDefault,
|
||||
expected: []string{
|
||||
node1.ID,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "pool",
|
||||
pool: pool.Name,
|
||||
expected: []string{
|
||||
node2.ID,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "empty pool",
|
||||
pool: "",
|
||||
expected: []string{},
|
||||
},
|
||||
}
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
// Create watcher to test that getters don't cause it to fire.
|
||||
ws := memdb.NewWatchSet()
|
||||
|
||||
iter, err := state.NodesByNodePool(ws, tc.pool)
|
||||
must.NoError(t, err)
|
||||
|
||||
got := []string{}
|
||||
for raw := iter.Next(); raw != nil; raw = iter.Next() {
|
||||
got = append(got, raw.(*structs.Node).ID)
|
||||
}
|
||||
|
||||
must.SliceContainsAll(t, tc.expected, got)
|
||||
must.False(t, watchFired(ws))
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestStateStore_UpsertJob_Job(t *testing.T) {
|
||||
ci.Parallel(t)
|
||||
|
||||
|
|
|
@ -182,6 +182,19 @@ type NodePoolDeleteRequest struct {
|
|||
WriteRequest
|
||||
}
|
||||
|
||||
// NodePoolNodesRequest is used to list all nodes that are part of a node pool.
|
||||
type NodePoolNodesRequest struct {
|
||||
Name string
|
||||
Fields *NodeStubFields
|
||||
QueryOptions
|
||||
}
|
||||
|
||||
// NodePoolNodesResponse is used to return a list nodes in the node pool.
|
||||
type NodePoolNodesResponse struct {
|
||||
Nodes []*NodeListStub
|
||||
QueryMeta
|
||||
}
|
||||
|
||||
// NodePoolJobsRequest is used to make a request for the jobs in a specific node pool.
|
||||
type NodePoolJobsRequest struct {
|
||||
Name string
|
||||
|
|
|
@ -210,4 +210,136 @@ The table below shows this endpoint's support for
|
|||
$ nomad operator api -X DELETE /v1/node/pool/prod-eng
|
||||
```
|
||||
|
||||
## List Node Pool Nodes
|
||||
|
||||
This endpoint list the nodes in a node pool.
|
||||
|
||||
| Method | Path | Produces |
|
||||
| ------ | -------------------------------- | ------------------ |
|
||||
| `GET` | `/v1/node/pool/:node_pool/nodes` | `application/json` |
|
||||
|
||||
The table below shows this endpoint's support for
|
||||
[blocking queries](/nomad/api-docs#blocking-queries) and
|
||||
[required ACLs](/nomad/api-docs#acls).
|
||||
|
||||
| Blocking Queries | ACL Required |
|
||||
| ---------------- | ----------------------------------- |
|
||||
| `YES` | `node:read` <br /> `node_pool:read` |
|
||||
|
||||
### Parameters
|
||||
|
||||
- `:node_pool` `(string: <required>)`- Specifies the node pool to list nodes.
|
||||
|
||||
- `next_token` `(string: "")` - This endpoint supports paging. The `next_token`
|
||||
parameter accepts a string which identifies the next expected node. This
|
||||
value can be obtained from the `X-Nomad-NextToken` header from the previous
|
||||
response.
|
||||
|
||||
- `per_page` `(int: 0)` - Specifies a maximum number of nodes to return for
|
||||
this request. If omitted, the response is not paginated. The value of the
|
||||
`X-Nomad-NextToken` header of the last response can be used as the
|
||||
`next_token` of the next request to fetch additional pages.
|
||||
|
||||
- `filter` `(string: "")` - Specifies the [expression](/nomad/api-docs#filtering)
|
||||
used to filter the results. Consider using pagination to reduce resource used
|
||||
to serve the request.
|
||||
|
||||
- `resources` `(bool: false)` - Specifies whether or not to include the
|
||||
`NodeResources` and `ReservedResources` fields in the response.
|
||||
|
||||
- `os` `(bool: false)` - Specifies whether or not to include special attributes
|
||||
such as operating system name in the response.
|
||||
|
||||
### Sample Request
|
||||
|
||||
```shell-session
|
||||
$ nomad operator api /v1/node/pool/prod-eng/nodes
|
||||
```
|
||||
|
||||
```shell-session
|
||||
$ nomad operator api /v1/node/pool/prod-eng/nodes?os=true
|
||||
```
|
||||
|
||||
### Sample Response
|
||||
|
||||
```json
|
||||
[
|
||||
{
|
||||
"Address": "10.138.0.5",
|
||||
"Attributes": {
|
||||
"os.name": "ubuntu"
|
||||
},
|
||||
"CreateIndex": 6,
|
||||
"Datacenter": "dc1",
|
||||
"Drain": false,
|
||||
"Drivers": {
|
||||
"java": {
|
||||
"Attributes": {
|
||||
"driver.java.runtime": "OpenJDK Runtime Environment (build 1.8.0_162-8u162-b12-1~deb9u1-b12)",
|
||||
"driver.java.vm": "OpenJDK 64-Bit Server VM (build 25.162-b12, mixed mode)",
|
||||
"driver.java.version": "openjdk version \"1.8.0_162"
|
||||
},
|
||||
"Detected": true,
|
||||
"HealthDescription": "",
|
||||
"Healthy": true,
|
||||
"UpdateTime": "2018-04-11T23:33:48.781948669Z"
|
||||
},
|
||||
"qemu": {
|
||||
"Attributes": null,
|
||||
"Detected": false,
|
||||
"HealthDescription": "",
|
||||
"Healthy": false,
|
||||
"UpdateTime": "2018-04-11T23:33:48.7819898Z"
|
||||
},
|
||||
"rkt": {
|
||||
"Attributes": {
|
||||
"driver.rkt.appc.version": "0.8.11",
|
||||
"driver.rkt.volumes.enabled": "1",
|
||||
"driver.rkt.version": "1.29.0"
|
||||
},
|
||||
"Detected": true,
|
||||
"HealthDescription": "Driver rkt is detected: true",
|
||||
"Healthy": true,
|
||||
"UpdateTime": "2018-04-11T23:34:48.81079772Z"
|
||||
},
|
||||
"docker": {
|
||||
"Attributes": {
|
||||
"driver.docker.bridge_ip": "172.17.0.1",
|
||||
"driver.docker.version": "18.03.0-ce",
|
||||
"driver.docker.volumes.enabled": "1"
|
||||
},
|
||||
"Detected": true,
|
||||
"HealthDescription": "Driver is available and responsive",
|
||||
"Healthy": true,
|
||||
"UpdateTime": "2018-04-11T23:34:48.713720323Z"
|
||||
},
|
||||
"exec": {
|
||||
"Attributes": {},
|
||||
"Detected": true,
|
||||
"HealthDescription": "Driver exec is detected: true",
|
||||
"Healthy": true,
|
||||
"UpdateTime": "2018-04-11T23:34:48.711026521Z"
|
||||
},
|
||||
"raw_exec": {
|
||||
"Attributes": {},
|
||||
"Detected": true,
|
||||
"HealthDescription": "",
|
||||
"Healthy": true,
|
||||
"UpdateTime": "2018-04-11T23:33:48.710448534Z"
|
||||
}
|
||||
},
|
||||
"ID": "f7476465-4d6e-c0de-26d0-e383c49be941",
|
||||
"LastDrain": null,
|
||||
"ModifyIndex": 2526,
|
||||
"Name": "nomad-4",
|
||||
"NodeClass": "",
|
||||
"NodePool": "prod-eng",
|
||||
"SchedulingEligibility": "eligible",
|
||||
"Status": "ready",
|
||||
"StatusDescription": "",
|
||||
"Version": "0.8.0-rc1"
|
||||
}
|
||||
]
|
||||
```
|
||||
|
||||
[api_scheduler_alog]: /nomad/api-docs/operator/scheduler#scheduleralgorithm
|
||||
|
|
|
@ -392,7 +392,8 @@ those listed in [Key Metrics](#key-metrics) above.
|
|||
| `nomad.nomad.namespace.list_namespace` | Time elapsed for `Namespace.ListNamespaces` | Nanoseconds | Summary | host |
|
||||
| `nomad.nomad.namespace.upsert_namespaces` | Time elapsed for `Namespace.UpsertNamespaces` | Nanoseconds | Summary | host |
|
||||
| `nomad.nomad.node_pool.list` | Time elapsed for `NodePool.List` RPC call | Nanoseconds | Summary | host |
|
||||
| `nomad.nomad.node_pool.list_jobs` | Time elapsed for `NodePool.List` RPC call | Nanoseconds | Summary | host |
|
||||
| `nomad.nomad.node_pool.list_jobs` | Time elapsed for `NodePool.ListJobs` RPC call | Nanoseconds | Summary | host |
|
||||
| `nomad.nomad.node_pool.list_nodes` | Time elapsed for `NodePool.ListNodes` RPC call | Nanoseconds | Summary | host |
|
||||
| `nomad.nomad.node_pool.get_node_pool` | Time elapsed for `NodePool.GetNodePool` RPC call | Nanoseconds | Summary | host |
|
||||
| `nomad.nomad.node_pool.upsert_node_pools` | Time elapsed for `NodePool.UpsertNodePools` RPC call | Nanoseconds | Summary | host |
|
||||
| `nomad.nomad.node_pool.delete_node_pools` | Time elapsed for `NodePool.DeleteNodePools` RPC call | Nanoseconds | Summary | host |
|
||||
|
|
Loading…
Reference in New Issue