open-nomad/nomad/node_pool_endpoint_test.go

1966 lines
51 KiB
Go

// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0
package nomad
import (
"fmt"
"testing"
"time"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/go-set"
msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/hashicorp/nomad/ci"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
"github.com/shoenig/test/must"
)
func TestNodePoolEndpoint_List(t *testing.T) {
ci.Parallel(t)
s, cleanupS := TestServer(t, nil)
defer cleanupS()
codec := rpcClient(t, s)
testutil.WaitForLeader(t, s.RPC)
// Populate state with some node pools.
poolDev1 := &structs.NodePool{
Name: "dev-1",
Description: "test node pool for dev-1",
Meta: map[string]string{
"env": "dev",
"index": "1",
},
}
poolDev2 := &structs.NodePool{
Name: "dev-2",
Description: "test node pool for dev-2",
Meta: map[string]string{
"env": "dev",
"index": "2",
},
}
poolDevNoMeta := &structs.NodePool{
Name: "dev-no-meta",
Description: "test node pool for dev without meta",
}
poolProd1 := &structs.NodePool{
Name: "prod-1",
Description: "test node pool for prod-1",
Meta: map[string]string{
"env": "prod",
"index": "1",
},
}
poolProd2 := &structs.NodePool{
Name: "prod-2",
Description: "test node pool for prod-2",
Meta: map[string]string{
"env": "prod",
"index": "2",
},
}
err := s.fsm.State().UpsertNodePools(structs.MsgTypeTestSetup, 1000, []*structs.NodePool{
poolDev1,
poolDev2,
poolDevNoMeta,
poolProd1,
poolProd2,
})
must.NoError(t, err)
testCases := []struct {
name string
req *structs.NodePoolListRequest
expectedErr string
expected []string
expectedNextToken string
}{
{
name: "list all",
req: &structs.NodePoolListRequest{},
expected: []string{
"all", "default",
"dev-1", "dev-2", "dev-no-meta",
"prod-1", "prod-2",
},
},
{
name: "list all reverse",
req: &structs.NodePoolListRequest{
QueryOptions: structs.QueryOptions{
Reverse: true,
},
},
expected: []string{
"prod-2", "prod-1",
"dev-no-meta", "dev-2", "dev-1",
"default", "all",
},
},
{
name: "filter by prefix",
req: &structs.NodePoolListRequest{
QueryOptions: structs.QueryOptions{
Prefix: "prod-",
},
},
expected: []string{"prod-1", "prod-2"},
},
{
name: "filter by prefix reverse",
req: &structs.NodePoolListRequest{
QueryOptions: structs.QueryOptions{
Prefix: "prod-",
Reverse: true,
},
},
expected: []string{"prod-2", "prod-1"},
},
{
name: "filter by prefix no match",
req: &structs.NodePoolListRequest{
QueryOptions: structs.QueryOptions{
Prefix: "invalid-",
},
},
expected: []string{},
},
{
name: "filter by expression",
req: &structs.NodePoolListRequest{
QueryOptions: structs.QueryOptions{
Filter: `Meta.env == "dev"`,
},
},
expected: []string{"dev-1", "dev-2"},
},
{
name: "filter by expression reverse",
req: &structs.NodePoolListRequest{
QueryOptions: structs.QueryOptions{
Filter: `Meta.env == "dev"`,
Reverse: true,
},
},
expected: []string{"dev-2", "dev-1"},
},
{
name: "paginate per-page=2 page=1",
req: &structs.NodePoolListRequest{
QueryOptions: structs.QueryOptions{
PerPage: 2,
},
},
expected: []string{"all", "default"},
expectedNextToken: "dev-1",
},
{
name: "paginate per-page=2 page=2",
req: &structs.NodePoolListRequest{
QueryOptions: structs.QueryOptions{
PerPage: 2,
NextToken: "dev-1",
},
},
expected: []string{"dev-1", "dev-2"},
expectedNextToken: "dev-no-meta",
},
{
name: "paginate per-page=2 page=last",
req: &structs.NodePoolListRequest{
QueryOptions: structs.QueryOptions{
PerPage: 2,
NextToken: "prod-2",
},
},
expected: []string{"prod-2"},
expectedNextToken: "",
},
{
name: "paginate reverse per-page=2 page=2",
req: &structs.NodePoolListRequest{
QueryOptions: structs.QueryOptions{
PerPage: 2,
NextToken: "dev-no-meta",
Reverse: true,
},
},
expected: []string{"dev-no-meta", "dev-2"},
expectedNextToken: "dev-1",
},
{
name: "paginate prefix per-page=1 page=2",
req: &structs.NodePoolListRequest{
QueryOptions: structs.QueryOptions{
PerPage: 1,
NextToken: "dev-2",
Prefix: "dev",
},
},
expected: []string{"dev-2"},
expectedNextToken: "dev-no-meta",
},
{
name: "paginate filter per-page=1 page=2",
req: &structs.NodePoolListRequest{
QueryOptions: structs.QueryOptions{
PerPage: 1,
NextToken: "dev-2",
Filter: "Meta is not empty",
},
},
expected: []string{"dev-2"},
expectedNextToken: "prod-1",
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
// Always send the request to the global region.
tc.req.Region = "global"
// Make node pool list request.
var resp structs.NodePoolListResponse
err := msgpackrpc.CallWithCodec(codec, "NodePool.List", tc.req, &resp)
// Check response.
if tc.expectedErr != "" {
must.ErrorContains(t, err, tc.expectedErr)
} else {
must.NoError(t, err)
got := make([]string, len(resp.NodePools))
for i, pool := range resp.NodePools {
got[i] = pool.Name
}
must.Eq(t, tc.expected, got)
must.Eq(t, tc.expectedNextToken, resp.NextToken)
}
})
}
}
func TestNodePoolEndpoint_List_ACL(t *testing.T) {
ci.Parallel(t)
s, root, cleanupS := TestACLServer(t, nil)
defer cleanupS()
codec := rpcClient(t, s)
testutil.WaitForLeader(t, s.RPC)
// Populate state with some node pools.
poolDev1 := &structs.NodePool{
Name: "dev-1",
Description: "test node pool for dev-1",
Meta: map[string]string{
"env": "dev",
"index": "1",
},
}
poolDev2 := &structs.NodePool{
Name: "dev-2",
Description: "test node pool for dev-2",
Meta: map[string]string{
"env": "dev",
"index": "2",
},
}
poolProd1 := &structs.NodePool{
Name: "prod-1",
Description: "test node pool for prod-1",
Meta: map[string]string{
"env": "prod",
"index": "1",
},
}
err := s.fsm.State().UpsertNodePools(structs.MsgTypeTestSetup, 1000, []*structs.NodePool{
poolDev1,
poolDev2,
poolProd1,
})
must.NoError(t, err)
// Create test ACL tokens.
devToken := mock.CreatePolicyAndToken(t, s.fsm.State(), 1001, "dev-node-pools",
mock.NodePoolPolicy("dev-*", "read", nil),
)
prodToken := mock.CreatePolicyAndToken(t, s.fsm.State(), 1003, "prod-node-pools",
mock.NodePoolPolicy("prod-*", "read", nil),
)
noPolicyToken := mock.CreateToken(t, s.fsm.State(), 1005, nil)
allPoolsToken := mock.CreatePolicyAndToken(t, s.fsm.State(), 1007, "all-node-pools",
mock.NodePoolPolicy("*", "read", nil),
)
testCases := []struct {
name string
token string
expected []string
}{
{
name: "management token lists all",
token: root.SecretID,
expected: []string{
"all", "default",
"dev-1", "dev-2", "prod-1",
},
},
{
name: "dev token lists dev",
token: devToken.SecretID,
expected: []string{"dev-1", "dev-2"},
},
{
name: "prod token lists prod",
token: prodToken.SecretID,
expected: []string{"prod-1"},
},
{
name: "all pools token lists all",
token: allPoolsToken.SecretID,
expected: []string{
"all", "default",
"dev-1", "dev-2", "prod-1",
},
},
{
name: "no policy token",
token: noPolicyToken.SecretID,
expected: []string{},
},
{
name: "no token",
token: "",
expected: []string{},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
// Make node pool list request.
req := &structs.NodePoolListRequest{
QueryOptions: structs.QueryOptions{
Region: "global",
AuthToken: tc.token,
},
}
var resp structs.NodePoolListResponse
err := msgpackrpc.CallWithCodec(codec, "NodePool.List", req, &resp)
must.NoError(t, err)
// Check response.
got := make([]string, len(resp.NodePools))
for i, pool := range resp.NodePools {
got[i] = pool.Name
}
must.Eq(t, tc.expected, got)
})
}
}
func TestNodePoolEndpoint_List_BlockingQuery(t *testing.T) {
ci.Parallel(t)
s, cleanupS := TestServer(t, nil)
defer cleanupS()
codec := rpcClient(t, s)
testutil.WaitForLeader(t, s.RPC)
// Populate state with some node pools.
// Insert triggers watchers.
pool := mock.NodePool()
time.AfterFunc(100*time.Millisecond, func() {
s.fsm.State().UpsertNodePools(structs.MsgTypeTestSetup, 1000, []*structs.NodePool{pool})
})
req := &structs.NodePoolListRequest{
QueryOptions: structs.QueryOptions{
Region: "global",
MinQueryIndex: 999,
},
}
var resp structs.NodePoolListResponse
err := msgpackrpc.CallWithCodec(codec, "NodePool.List", req, &resp)
must.NoError(t, err)
must.Eq(t, 1000, resp.Index)
// Delete triggers watchers.
time.AfterFunc(100*time.Millisecond, func() {
s.fsm.State().DeleteNodePools(structs.MsgTypeTestSetup, 1001, []string{pool.Name})
})
req.MinQueryIndex = 1000
err = msgpackrpc.CallWithCodec(codec, "NodePool.List", req, &resp)
must.NoError(t, err)
must.Eq(t, 1001, resp.Index)
}
func TestNodePoolEndpoint_GetNodePool(t *testing.T) {
ci.Parallel(t)
s, cleanupS := TestServer(t, nil)
defer cleanupS()
codec := rpcClient(t, s)
testutil.WaitForLeader(t, s.RPC)
// Populate state with some node pools.
pool := mock.NodePool()
err := s.fsm.State().UpsertNodePools(structs.MsgTypeTestSetup, 1000, []*structs.NodePool{pool})
must.NoError(t, err)
testCases := []struct {
name string
pool string
expected *structs.NodePool
}{
{
name: "get pool",
pool: pool.Name,
expected: pool,
},
{
name: "non-existing",
pool: "does-not-exist",
expected: nil,
},
{
name: "empty",
pool: "",
expected: nil,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
// Make node pool fetch request.
req := &structs.NodePoolSpecificRequest{
QueryOptions: structs.QueryOptions{
Region: "global",
},
Name: tc.pool,
}
var resp structs.SingleNodePoolResponse
err := msgpackrpc.CallWithCodec(codec, "NodePool.GetNodePool", req, &resp)
must.NoError(t, err)
// Check response.
must.Eq(t, tc.expected, resp.NodePool)
})
}
}
func TestNodePoolEndpoint_GetNodePool_ACL(t *testing.T) {
ci.Parallel(t)
s, root, cleanupS := TestACLServer(t, nil)
defer cleanupS()
codec := rpcClient(t, s)
testutil.WaitForLeader(t, s.RPC)
// Populate state with some node pools.
pool := mock.NodePool()
err := s.fsm.State().UpsertNodePools(structs.MsgTypeTestSetup, 1000, []*structs.NodePool{pool})
must.NoError(t, err)
// Create test ACL tokens.
allowedToken := mock.CreatePolicyAndToken(t, s.fsm.State(), 1001, "allow",
mock.NodePoolPolicy(pool.Name, "read", nil),
)
deniedToken := mock.CreatePolicyAndToken(t, s.fsm.State(), 1003, "deny",
mock.NodePoolPolicy(pool.Name, "deny", nil),
)
noPolicyToken := mock.CreateToken(t, s.fsm.State(), 1005, nil)
allPoolsToken := mock.CreatePolicyAndToken(t, s.fsm.State(), 1007, "all-node-pools",
mock.NodePoolPolicy("*", "read", nil),
)
testCases := []struct {
name string
token string
pool string
expectedErr string
expected string
}{
{
name: "management token",
token: root.SecretID,
pool: pool.Name,
expected: pool.Name,
},
{
name: "allowed token",
token: allowedToken.SecretID,
pool: pool.Name,
expected: pool.Name,
},
{
name: "all pools token",
token: allPoolsToken.SecretID,
pool: pool.Name,
expected: pool.Name,
},
{
name: "denied token",
token: deniedToken.SecretID,
pool: pool.Name,
expectedErr: structs.ErrPermissionDenied.Error(),
},
{
name: "no policy token",
token: noPolicyToken.SecretID,
pool: pool.Name,
expectedErr: structs.ErrPermissionDenied.Error(),
},
{
name: "invalid token",
token: "invalid",
pool: pool.Name,
expectedErr: structs.ErrPermissionDenied.Error(),
},
{
name: "no token",
token: "",
pool: pool.Name,
expectedErr: structs.ErrPermissionDenied.Error(),
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
// Make node pool fetch request.
req := &structs.NodePoolSpecificRequest{
QueryOptions: structs.QueryOptions{
Region: "global",
AuthToken: tc.token,
},
Name: tc.pool,
}
var resp structs.SingleNodePoolResponse
err := msgpackrpc.CallWithCodec(codec, "NodePool.GetNodePool", req, &resp)
if tc.expectedErr != "" {
must.ErrorContains(t, err, tc.expectedErr)
must.Nil(t, resp.NodePool)
} else {
must.NoError(t, err)
must.Eq(t, tc.expected, resp.NodePool.Name)
}
})
}
}
func TestNodePoolEndpoint_GetNodePool_BlockingQuery(t *testing.T) {
ci.Parallel(t)
s, cleanupS := TestServer(t, nil)
defer cleanupS()
codec := rpcClient(t, s)
testutil.WaitForLeader(t, s.RPC)
// Upsert triggers watchers.
// Populate state with a node pools.
pool1 := mock.NodePool()
time.AfterFunc(100*time.Millisecond, func() {
s.fsm.State().UpsertNodePools(structs.MsgTypeTestSetup, 1000, []*structs.NodePool{pool1})
})
// Insert node pool that should not trigger watcher.
pool2 := mock.NodePool()
time.AfterFunc(200*time.Millisecond, func() {
s.fsm.State().UpsertNodePools(structs.MsgTypeTestSetup, 1001, []*structs.NodePool{pool2})
})
// Update first node pool to trigger watcher.
pool1 = pool1.Copy()
pool1.Meta["updated"] = "true"
time.AfterFunc(300*time.Millisecond, func() {
s.fsm.State().UpsertNodePools(structs.MsgTypeTestSetup, 1002, []*structs.NodePool{pool1})
})
req := &structs.NodePoolSpecificRequest{
QueryOptions: structs.QueryOptions{
Region: "global",
MinQueryIndex: 1000,
},
Name: pool1.Name,
}
var resp structs.SingleNodePoolResponse
err := msgpackrpc.CallWithCodec(codec, "NodePool.GetNodePool", req, &resp)
must.NoError(t, err)
must.Eq(t, 1002, resp.Index)
// Delete triggers watchers.
// Delete pool that is not being watched.
time.AfterFunc(100*time.Millisecond, func() {
s.fsm.State().DeleteNodePools(structs.MsgTypeTestSetup, 1003, []string{pool2.Name})
})
// Delete pool that is being watched.
time.AfterFunc(200*time.Millisecond, func() {
s.fsm.State().DeleteNodePools(structs.MsgTypeTestSetup, 1004, []string{pool1.Name})
})
req.MinQueryIndex = 1002
err = msgpackrpc.CallWithCodec(codec, "NodePool.GetNodePool", req, &resp)
must.NoError(t, err)
must.Eq(t, 1004, resp.Index)
}
func TestNodePoolEndpoint_UpsertNodePools(t *testing.T) {
ci.Parallel(t)
s, cleanupS := TestServer(t, nil)
defer cleanupS()
codec := rpcClient(t, s)
testutil.WaitForLeader(t, s.RPC)
// Insert a node pool that we can update.
existing := mock.NodePool()
err := s.fsm.State().UpsertNodePools(structs.MsgTypeTestSetup, 1000, []*structs.NodePool{existing})
must.NoError(t, err)
testCases := []struct {
name string
pools []*structs.NodePool
expectedErr string
}{
{
name: "insert new pool",
pools: []*structs.NodePool{
mock.NodePool(),
},
},
{
name: "insert multiple pools",
pools: []*structs.NodePool{
mock.NodePool(),
mock.NodePool(),
},
},
{
name: "update pool",
pools: []*structs.NodePool{
{
Name: existing.Name,
Description: "updated pool",
Meta: map[string]string{
"updated": "true",
},
},
},
},
{
name: "invalid pool name",
pools: []*structs.NodePool{
{
Name: "%invalid%",
},
},
expectedErr: "invalid node pool",
},
{
name: "missing pool name",
pools: []*structs.NodePool{
{
Name: "",
Description: "no name",
},
},
expectedErr: "invalid node pool",
},
{
name: "empty request",
pools: []*structs.NodePool{},
expectedErr: "must specify at least one node pool",
},
{
name: "fail to update built-in pool all",
pools: []*structs.NodePool{
{
Name: structs.NodePoolAll,
Description: "trying to update built-in pool",
},
},
expectedErr: "not allowed",
},
{
name: "fail to update built-in pool default",
pools: []*structs.NodePool{
{
Name: structs.NodePoolDefault,
Description: "trying to update built-in pool",
},
},
expectedErr: "not allowed",
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
req := &structs.NodePoolUpsertRequest{
WriteRequest: structs.WriteRequest{
Region: "global",
},
NodePools: tc.pools,
}
var resp structs.GenericResponse
err := msgpackrpc.CallWithCodec(codec, "NodePool.UpsertNodePools", req, &resp)
if tc.expectedErr != "" {
must.ErrorContains(t, err, tc.expectedErr)
} else {
must.NoError(t, err)
for _, pool := range tc.pools {
ws := memdb.NewWatchSet()
got, err := s.fsm.State().NodePoolByName(ws, pool.Name)
must.NoError(t, err)
must.Eq(t, pool, got, must.Cmp(cmpopts.IgnoreFields(
structs.NodePool{},
"Hash",
"CreateIndex",
"ModifyIndex",
)))
}
}
})
}
}
func TestNodePoolEndpoint_UpsertNodePool_ACL(t *testing.T) {
ci.Parallel(t)
s, root, cleanupS := TestACLServer(t, nil)
defer cleanupS()
codec := rpcClient(t, s)
testutil.WaitForLeader(t, s.RPC)
// Create test ACL tokens.
devToken := mock.CreatePolicyAndToken(t, s.fsm.State(), 1001, "dev-node-pools",
mock.NodePoolPolicy("dev-*", "write", nil),
)
devSpecificToken := mock.CreatePolicyAndToken(t, s.fsm.State(), 1003, "dev-1-node-pools",
mock.NodePoolPolicy("dev-1", "write", nil),
)
prodToken := mock.CreatePolicyAndToken(t, s.fsm.State(), 1005, "prod-node-pools",
mock.NodePoolPolicy("prod-*", "", []string{"write"}),
)
noPolicyToken := mock.CreateToken(t, s.fsm.State(), 1007, nil)
readOnlyToken := mock.CreatePolicyAndToken(t, s.fsm.State(), 1009, "node-pools-read-only",
mock.NodePoolPolicy("*", "read", nil),
)
testCases := []struct {
name string
token string
pools []*structs.NodePool
expectedErr string
}{
{
name: "management token has full access",
token: root.SecretID,
pools: []*structs.NodePool{
{Name: "dev-1"},
{Name: "prod-1"},
{Name: "qa-1"},
},
},
{
name: "allowed by policy",
token: devToken.SecretID,
pools: []*structs.NodePool{
{Name: "dev-1"},
},
},
{
name: "allowed by capability",
token: prodToken.SecretID,
pools: []*structs.NodePool{
{Name: "prod-1"},
},
},
{
name: "allowed by exact match",
token: devSpecificToken.SecretID,
pools: []*structs.NodePool{
{Name: "dev-1"},
},
},
{
name: "token restricted to wildcard",
token: devToken.SecretID,
pools: []*structs.NodePool{
{Name: "dev-1"}, // ok
{Name: "prod-1"}, // not ok
},
expectedErr: structs.ErrPermissionDenied.Error(),
},
{
name: "token restricted if not exact match",
token: devSpecificToken.SecretID,
pools: []*structs.NodePool{
{Name: "dev-2"},
},
expectedErr: structs.ErrPermissionDenied.Error(),
},
{
name: "no token",
token: "",
pools: []*structs.NodePool{
{Name: "dev-2"},
},
expectedErr: structs.ErrPermissionDenied.Error(),
},
{
name: "no policy",
token: noPolicyToken.SecretID,
pools: []*structs.NodePool{
{Name: "dev-1"},
},
expectedErr: structs.ErrPermissionDenied.Error(),
},
{
name: "no write",
token: readOnlyToken.SecretID,
pools: []*structs.NodePool{
{Name: "dev-1"},
},
expectedErr: structs.ErrPermissionDenied.Error(),
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
req := &structs.NodePoolUpsertRequest{
WriteRequest: structs.WriteRequest{
Region: "global",
AuthToken: tc.token,
},
NodePools: tc.pools,
}
var resp structs.GenericResponse
err := msgpackrpc.CallWithCodec(codec, "NodePool.UpsertNodePools", req, &resp)
if tc.expectedErr != "" {
must.ErrorContains(t, err, tc.expectedErr)
} else {
must.NoError(t, err)
for _, pool := range tc.pools {
ws := memdb.NewWatchSet()
got, err := s.fsm.State().NodePoolByName(ws, pool.Name)
must.NoError(t, err)
must.Eq(t, pool, got, must.Cmp(cmpopts.IgnoreFields(
structs.NodePool{},
"Hash",
"CreateIndex",
"ModifyIndex",
)))
}
}
})
}
}
func TestNodePoolEndpoint_DeleteNodePools(t *testing.T) {
ci.Parallel(t)
s, cleanupS := TestServer(t, nil)
defer cleanupS()
codec := rpcClient(t, s)
store := s.fsm.State()
testutil.WaitForLeader(t, s.RPC)
// Insert a few node pools that we can delete.
var pools []*structs.NodePool
for i := 0; i < 10; i++ {
pools = append(pools, mock.NodePool())
}
err := store.UpsertNodePools(structs.MsgTypeTestSetup, 100, pools)
must.NoError(t, err)
// Insert a node and job to block deleting
node := mock.Node()
node.NodePool = pools[3].Name
must.NoError(t, store.UpsertNode(structs.MsgTypeTestSetup, 101, node))
job := mock.MinJob()
job.NodePool = pools[4].Name
job.Status = structs.JobStatusRunning
must.NoError(t, store.UpsertJob(structs.MsgTypeTestSetup, 102, nil, job))
testCases := []struct {
name string
pools []string
expectedErr string
}{
{
name: "delete existing pool",
pools: []string{pools[0].Name},
},
{
name: "delete multiple pools",
pools: []string{
pools[1].Name,
pools[2].Name,
},
},
{
name: "delete pool occupied by node",
pools: []string{pools[3].Name},
expectedErr: fmt.Sprintf(
"node pool %q has nodes in regions: [global]", pools[3].Name),
},
{
name: "delete pool occupied by job",
pools: []string{pools[4].Name},
expectedErr: fmt.Sprintf(
"node pool %q has non-terminal jobs in regions: [global]", pools[4].Name),
},
{
name: "pool doesn't exist",
pools: []string{"doesnt-exist"},
expectedErr: "not found",
},
{
name: "empty request",
pools: []string{},
expectedErr: "must specify at least one node pool to delete",
},
{
name: "empty name",
pools: []string{""},
expectedErr: "node pool name is empty",
},
{
name: "can't delete built-in pool all",
pools: []string{"all"},
expectedErr: "not allowed",
},
{
name: "can't delete built-in pool default",
pools: []string{"default"},
expectedErr: "not allowed",
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
err := store.UpsertNodePools(structs.MsgTypeTestSetup, 1000, pools)
must.NoError(t, err)
req := &structs.NodePoolDeleteRequest{
WriteRequest: structs.WriteRequest{
Region: "global",
},
Names: tc.pools,
}
var resp structs.GenericResponse
err = msgpackrpc.CallWithCodec(codec, "NodePool.DeleteNodePools", req, &resp)
if tc.expectedErr != "" {
must.ErrorContains(t, err, tc.expectedErr)
} else {
must.NoError(t, err)
for _, pool := range tc.pools {
ws := memdb.NewWatchSet()
got, err := store.NodePoolByName(ws, pool)
must.NoError(t, err)
must.Nil(t, got)
}
}
})
}
}
func TestNodePoolEndpoint_DeleteNodePools_ACL(t *testing.T) {
ci.Parallel(t)
s, root, cleanupS := TestACLServer(t, nil)
defer cleanupS()
codec := rpcClient(t, s)
store := s.fsm.State()
testutil.WaitForLeader(t, s.RPC)
// Create test ACL tokens.
devToken := mock.CreatePolicyAndToken(t, store, 100, "dev-node-pools",
mock.NodePoolPolicy("dev-*", "write", nil),
)
devSpecificToken := mock.CreatePolicyAndToken(t, store, 102, "dev-1-node-pools",
mock.NodePoolPolicy("dev-1", "write", nil),
)
prodToken := mock.CreatePolicyAndToken(t, store, 104, "prod-node-pools",
mock.NodePoolPolicy("prod-*", "", []string{"delete"}),
)
noPolicyToken := mock.CreateToken(t, store, 106, nil)
noDeleteToken := mock.CreatePolicyAndToken(t, store, 107, "node-pools-no-delete",
mock.NodePoolPolicy("*", "", []string{"read", "write"}),
)
// Insert a few node pools that we can delete.
var pools []*structs.NodePool
for i := 0; i < 5; i++ {
devPool := mock.NodePool()
devPool.Name = fmt.Sprintf("dev-%d", i)
pools = append(pools, devPool)
prodPool := mock.NodePool()
prodPool.Name = fmt.Sprintf("prod-%d", i)
pools = append(pools, prodPool)
qaPool := mock.NodePool()
qaPool.Name = fmt.Sprintf("qa-%d", i)
pools = append(pools, qaPool)
}
err := store.UpsertNodePools(structs.MsgTypeTestSetup, 108, pools)
must.NoError(t, err)
// Insert a node and job to block deleting
node := mock.Node()
node.NodePool = "prod-3"
must.NoError(t, store.UpsertNode(structs.MsgTypeTestSetup, 109, node))
job := mock.MinJob()
job.NodePool = "prod-4"
job.Status = structs.JobStatusRunning
must.NoError(t, store.UpsertJob(structs.MsgTypeTestSetup, 110, nil, job))
testCases := []struct {
name string
token string
pools []string
expectedErr string
}{
{
name: "management token has full access",
token: root.SecretID,
pools: []string{
"dev-1",
"prod-1",
"qa-1",
},
},
{
name: "allowed by write policy",
token: devToken.SecretID,
pools: []string{"dev-1"},
},
{
name: "allowed by delete capability",
token: prodToken.SecretID,
pools: []string{"prod-1"},
},
{
name: "allowed by exact match",
token: devSpecificToken.SecretID,
pools: []string{"dev-1"},
},
{
name: "restricted by wildcard",
token: devToken.SecretID,
pools: []string{
"dev-1", // ok
"prod-1", // not ok
},
expectedErr: structs.ErrPermissionDenied.Error(),
},
{
name: "restricted if not exact match",
token: devSpecificToken.SecretID,
pools: []string{"dev-2"},
expectedErr: structs.ErrPermissionDenied.Error(),
},
{
name: "no token",
token: "",
pools: []string{"dev-1"},
expectedErr: structs.ErrPermissionDenied.Error(),
},
{
name: "no policy",
token: noPolicyToken.SecretID,
pools: []string{"dev-1"},
expectedErr: structs.ErrPermissionDenied.Error(),
},
{
name: "no delete",
token: noDeleteToken.SecretID,
pools: []string{"dev-1"},
expectedErr: structs.ErrPermissionDenied.Error(),
},
{
name: "no delete pool occupied by node",
token: root.SecretID,
pools: []string{"prod-3"},
expectedErr: "node pool \"prod-3\" has nodes in regions: [global]",
},
{
name: "no delete pool occupied by job",
token: root.SecretID,
pools: []string{"prod-4"},
expectedErr: "node pool \"prod-4\" has non-terminal jobs in regions: [global]",
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
err := store.UpsertNodePools(structs.MsgTypeTestSetup, 1000, pools)
must.NoError(t, err)
req := &structs.NodePoolDeleteRequest{
WriteRequest: structs.WriteRequest{
Region: "global",
AuthToken: tc.token,
},
Names: tc.pools,
}
var resp structs.GenericResponse
err = msgpackrpc.CallWithCodec(codec, "NodePool.DeleteNodePools", req, &resp)
if tc.expectedErr != "" {
must.ErrorContains(t, err, tc.expectedErr)
} else {
must.NoError(t, err)
for _, pool := range tc.pools {
ws := memdb.NewWatchSet()
got, err := store.NodePoolByName(ws, pool)
must.NoError(t, err)
must.Nil(t, got)
}
}
})
}
}
func TestNodePoolEndpoint_DeleteNodePools_NonLocal(t *testing.T) {
ci.Parallel(t)
s1, root, cleanupS1 := TestACLServer(t, func(c *Config) {
c.Region = "region1"
c.AuthoritativeRegion = "region1"
c.ACLEnabled = true
})
defer cleanupS1()
codec := rpcClient(t, s1)
s2, _, cleanupS2 := TestACLServer(t, func(c *Config) {
c.Region = "region2"
c.AuthoritativeRegion = "region1"
c.ACLEnabled = true
c.ReplicationBackoff = 20 * time.Millisecond
c.ReplicationToken = root.SecretID
})
defer cleanupS2()
TestJoin(t, s1, s2)
testutil.WaitForLeader(t, s1.RPC)
testutil.WaitForLeader(t, s2.RPC)
// Write a node pool to the authoritative region
np1 := mock.NodePool()
index, _ := s1.State().LatestIndex() // we need indexes to be correct here
must.NoError(t, s1.State().UpsertNodePools(
structs.MsgTypeTestSetup, index, []*structs.NodePool{np1}))
// Wait for the node pool to replicate
testutil.WaitForResult(func() (bool, error) {
store := s2.State()
out, err := store.NodePoolByName(nil, np1.Name)
return out != nil, err
}, func(err error) {
t.Fatalf("should replicate node pool")
})
// Create a job in the node pool on the non-authoritative region
job := mock.SystemJob()
job.NodePool = np1.Name
index, _ = s1.State().LatestIndex()
must.NoError(t, s2.State().UpsertJob(structs.MsgTypeTestSetup, index, nil, job))
// Deleting the node pool should fail
req := &structs.NodePoolDeleteRequest{
Names: []string{np1.Name},
WriteRequest: structs.WriteRequest{
Region: s1.Region(),
AuthToken: root.SecretID,
},
}
var resp structs.GenericResponse
err := msgpackrpc.CallWithCodec(codec, "NodePool.DeleteNodePools", req, &resp)
must.ErrorContains(t, err, fmt.Sprintf(
"node pool %q has non-terminal jobs in regions: [%s]", np1.Name, s2.Region()))
// Stop the job and now deleting the node pool will work
job = job.Copy()
job.Stop = true
index, _ = s1.State().LatestIndex()
index++
must.NoError(t, s2.State().UpsertJob(structs.MsgTypeTestSetup, index, nil, job))
must.NoError(t, msgpackrpc.CallWithCodec(codec, "NodePool.DeleteNodePools", req, &resp))
// Wait for the namespace deletion to replicate
testutil.WaitForResult(func() (bool, error) {
store := s2.State()
out, err := store.NodePoolByName(nil, np1.Name)
return out == nil, err
}, func(err error) {
t.Fatalf("should replicate node pool deletion")
})
}
func TestNodePoolEndpoint_ListJobs_ACLs(t *testing.T) {
ci.Parallel(t)
s1, root, cleanupS1 := TestACLServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
})
defer cleanupS1()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
store := s1.fsm.State()
index := uint64(1000)
var err error
// Populate state with some node pools.
poolDev := &structs.NodePool{
Name: "dev-1",
Description: "test node pool for dev-1",
}
poolProd := &structs.NodePool{
Name: "prod-1",
Description: "test node pool for prod-1",
}
err = store.UpsertNodePools(structs.MsgTypeTestSetup, index, []*structs.NodePool{
poolDev,
poolProd,
})
must.NoError(t, err)
// for refering to the jobs in assertions
jobIDs := map[string]string{}
// register jobs in all pools and all namespaces
for _, ns := range []string{"engineering", "system", "default"} {
index++
must.NoError(t, store.UpsertNamespaces(index, []*structs.Namespace{{Name: ns}}))
for _, pool := range []string{"dev-1", "prod-1", "default"} {
job := mock.MinJob()
job.Namespace = ns
job.NodePool = pool
jobIDs[ns+"+"+pool] = job.ID
index++
must.NoError(t, store.UpsertJob(structs.MsgTypeTestSetup, index, nil, job))
}
}
req := &structs.NodePoolJobsRequest{
Name: "dev-1",
QueryOptions: structs.QueryOptions{
Region: "global",
Namespace: structs.AllNamespacesSentinel},
}
// Expect failure for request without a token
var resp structs.NodePoolJobsResponse
err = msgpackrpc.CallWithCodec(codec, "NodePool.ListJobs", req, &resp)
must.EqError(t, err, structs.ErrPermissionDenied.Error())
// Management token can read any namespace / any pool
// var mgmtResp structs.NodePoolJobsResponse
req.AuthToken = root.SecretID
err = msgpackrpc.CallWithCodec(codec, "NodePool.ListJobs", req, &resp)
must.NoError(t, err)
must.Len(t, 3, resp.Jobs)
must.SliceContainsAll(t,
helper.ConvertSlice(resp.Jobs, func(j *structs.JobListStub) string { return j.ID }),
[]string{jobIDs["engineering+dev-1"], jobIDs["system+dev-1"], jobIDs["default+dev-1"]})
// Policy that allows access to any pool but one namespace
index++
devToken := mock.CreatePolicyAndToken(t, store, index, "dev-node-pools",
fmt.Sprintf("%s\n%s\n%s\n",
mock.NodePoolPolicy("dev-*", "read", nil),
mock.NodePoolPolicy("default", "read", nil),
mock.NamespacePolicy("engineering", "read", nil)),
)
req.AuthToken = devToken.SecretID
// with wildcard namespace
err = msgpackrpc.CallWithCodec(codec, "NodePool.ListJobs", req, &resp)
must.NoError(t, err)
must.Len(t, 1, resp.Jobs)
must.Eq(t, jobIDs["engineering+dev-1"], resp.Jobs[0].ID)
// with specific allowed namespaces
req.Namespace = "engineering"
err = msgpackrpc.CallWithCodec(codec, "NodePool.ListJobs", req, &resp)
must.NoError(t, err)
must.Len(t, 1, resp.Jobs)
must.Eq(t, jobIDs["engineering+dev-1"], resp.Jobs[0].ID)
// with disallowed namespace
req.Namespace = "system"
err = msgpackrpc.CallWithCodec(codec, "NodePool.ListJobs", req, &resp)
must.NoError(t, err)
must.Len(t, 0, resp.Jobs)
// with disallowed pool but allowed namespace
req.Namespace = "engineering"
req.Name = "prod-1"
err = msgpackrpc.CallWithCodec(codec, "NodePool.ListJobs", req, &resp)
must.EqError(t, err, structs.ErrPermissionDenied.Error())
}
func TestNodePoolEndpoint_ListJobs_Blocking(t *testing.T) {
ci.Parallel(t)
s1, cleanupS1 := TestServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
})
defer cleanupS1()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
store := s1.fsm.State()
index := uint64(1000)
var err error
// Populate state with a node pool and a job in the default pool
poolDev := &structs.NodePool{
Name: "dev-1",
Description: "test node pool for dev-1",
}
err = store.UpsertNodePools(structs.MsgTypeTestSetup, index, []*structs.NodePool{poolDev})
must.NoError(t, err)
job := mock.MinJob()
index++
must.NoError(t, store.UpsertJob(structs.MsgTypeTestSetup, index, nil, job))
req := &structs.NodePoolJobsRequest{
Name: "default",
QueryOptions: structs.QueryOptions{
Region: "global",
Namespace: structs.AllNamespacesSentinel},
}
// List the job and get the index
var resp structs.NodePoolJobsResponse
err = msgpackrpc.CallWithCodec(codec, "NodePool.ListJobs", req, &resp)
must.Len(t, 1, resp.Jobs)
must.Eq(t, index, resp.Index)
must.Eq(t, "default", resp.Jobs[0].NodePool)
// Moving a job into a pool we're watching should trigger a watch
index++
time.AfterFunc(100*time.Millisecond, func() {
job = job.Copy()
job.NodePool = "dev-1"
must.NoError(t, store.UpsertJob(structs.MsgTypeTestSetup, index, nil, job))
})
req.Name = "dev-1"
req.MinQueryIndex = index
req.MaxQueryTime = 500 * time.Millisecond
err = msgpackrpc.CallWithCodec(codec, "NodePool.ListJobs", req, &resp)
must.Len(t, 1, resp.Jobs)
must.Eq(t, index, resp.Index)
// Moving a job out of a pool we're watching should trigger a watch
index++
time.AfterFunc(100*time.Millisecond, func() {
job = job.Copy()
job.NodePool = "default"
must.NoError(t, store.UpsertJob(structs.MsgTypeTestSetup, index, nil, job))
})
req.Name = "dev-1"
req.MinQueryIndex = index
req.MaxQueryTime = 500 * time.Millisecond
err = msgpackrpc.CallWithCodec(codec, "NodePool.ListJobs", req, &resp)
must.Len(t, 0, resp.Jobs)
must.Eq(t, index, resp.Index)
}
func TestNodePoolEndpoint_ListJobs_PaginationFiltering(t *testing.T) {
ci.Parallel(t)
s1, root, cleanupS1 := TestACLServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
})
defer cleanupS1()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
store := s1.fsm.State()
index := uint64(1000)
var err error
// Populate state with some node pools.
poolDev := &structs.NodePool{
Name: "dev-1",
Description: "test node pool for dev-1",
}
poolProd := &structs.NodePool{
Name: "prod-1",
Description: "test node pool for prod-1",
}
err = store.UpsertNodePools(structs.MsgTypeTestSetup, index, []*structs.NodePool{
poolDev,
poolProd,
})
must.NoError(t, err)
index++
must.NoError(t, store.UpsertNamespaces(index,
[]*structs.Namespace{{Name: "non-default"}, {Name: "other"}}))
// create a set of jobs. these are in the order that the state store will
// return them from the iterator (sorted by key) for ease of writing tests
mocks := []struct {
name string
pool string
namespace string
status string
}{
{name: "job-00", pool: "dev-1", namespace: "default", status: structs.JobStatusPending},
{name: "job-01", pool: "dev-1", namespace: "default", status: structs.JobStatusPending},
{name: "job-02", pool: "default", namespace: "default", status: structs.JobStatusPending},
{name: "job-03", pool: "dev-1", namespace: "non-default", status: structs.JobStatusPending},
{name: "job-04", pool: "dev-1", namespace: "default", status: structs.JobStatusRunning},
{name: "job-05", pool: "dev-1", namespace: "default", status: structs.JobStatusRunning},
{name: "job-06", pool: "dev-1", namespace: "other", status: structs.JobStatusPending},
// job-07 is missing for missing index assertion
{name: "job-08", pool: "prod-1", namespace: "default", status: structs.JobStatusRunning},
{name: "job-09", pool: "prod-1", namespace: "non-default", status: structs.JobStatusPending},
{name: "job-10", pool: "dev-1", namespace: "default", status: structs.JobStatusPending},
{name: "job-11", pool: "all", namespace: "default", status: structs.JobStatusPending},
{name: "job-12", pool: "all", namespace: "default", status: structs.JobStatusPending},
{name: "job-13", pool: "all", namespace: "non-default", status: structs.JobStatusPending},
}
for _, m := range mocks {
job := mock.MinJob()
job.ID = m.name
job.Name = m.name
job.NodePool = m.pool
job.Status = m.status
job.Namespace = m.namespace
index++
job.CreateIndex = index
must.NoError(t, store.UpsertJob(structs.MsgTypeTestSetup, index, nil, job))
}
// Policy that allows access to 2 pools and any namespace
index++
devToken := mock.CreatePolicyAndToken(t, store, index, "dev-node-pools",
fmt.Sprintf("%s\n%s\n%s\n",
mock.NodePoolPolicy("dev-*", "read", nil),
mock.NodePoolPolicy("default", "read", nil),
mock.NamespacePolicy("*", "read", nil)),
)
cases := []struct {
name string
pool string
namespace string
filter string
nextToken string
pageSize int32
aclToken string
expectedNextToken string
expectedIDs []string
expectedError string
}{
{
name: "test00 dev pool default NS",
pool: "dev-1",
expectedIDs: []string{"job-00", "job-01", "job-04", "job-05", "job-10"},
},
{
name: "test01 size-2 page-1 dev pool default NS",
pool: "dev-1",
pageSize: 2,
expectedNextToken: "default.job-04",
expectedIDs: []string{"job-00", "job-01"},
},
{
name: "test02 size-2 page-1 dev pool wildcard NS",
pool: "dev-1",
namespace: "*",
pageSize: 2,
expectedNextToken: "default.job-04",
expectedIDs: []string{"job-00", "job-01"},
},
{
name: "test03 size-2 page-2 dev pool default NS",
pool: "dev-1",
pageSize: 2,
nextToken: "default.job-04",
expectedNextToken: "default.job-10",
expectedIDs: []string{"job-04", "job-05"},
},
{
name: "test04 size-2 page-2 wildcard NS",
pool: "dev-1",
namespace: "*",
pageSize: 2,
nextToken: "default.job-04",
expectedNextToken: "default.job-10",
expectedIDs: []string{"job-04", "job-05"},
},
{
name: "test05 no valid results with filters",
pool: "dev-1",
pageSize: 2,
nextToken: "",
filter: `Name matches "not-job"`,
expectedIDs: []string{},
},
{
name: "test06 go-bexpr filter across namespaces",
pool: "dev-1",
namespace: "*",
filter: `Name matches "job-0[12345]"`,
expectedIDs: []string{"job-01", "job-04", "job-05", "job-03"},
},
{
name: "test07 go-bexpr filter with pagination",
pool: "dev-1",
namespace: "*",
filter: `Name matches "job-0[12345]"`,
pageSize: 3,
expectedNextToken: "non-default.job-03",
expectedIDs: []string{"job-01", "job-04", "job-05"},
},
{
name: "test08 go-bexpr filter in namespace",
pool: "dev-1",
namespace: "non-default",
filter: `Status == "pending"`,
expectedIDs: []string{"job-03"},
},
{
name: "test09 go-bexpr invalid expression",
pool: "dev-1",
filter: `NotValid`,
expectedError: "failed to read filter expression",
},
{
name: "test10 go-bexpr invalid field",
pool: "dev-1",
filter: `InvalidField == "value"`,
expectedError: "error finding value in datum",
},
{
name: "test11 missing index",
pool: "dev-1",
pageSize: 1,
nextToken: "default.job-07",
expectedIDs: []string{"job-10"},
},
{
name: "test12 all pool wildcard NS",
pool: "all",
namespace: "*",
pageSize: 4,
expectedError: "Permission denied",
},
{
name: "test13 all pool wildcard NS",
pool: "all",
namespace: "*",
aclToken: root.SecretID,
expectedIDs: []string{"job-11", "job-12", "job-13"},
},
{
name: "test14 all pool default NS",
pool: "all",
pageSize: 1,
aclToken: root.SecretID,
expectedNextToken: "default.job-12",
expectedIDs: []string{"job-11"},
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
req := &structs.NodePoolJobsRequest{
Name: tc.pool,
QueryOptions: structs.QueryOptions{
Region: "global",
Namespace: tc.namespace,
Filter: tc.filter,
PerPage: tc.pageSize,
NextToken: tc.nextToken,
},
}
req.AuthToken = devToken.SecretID
if tc.aclToken != "" {
req.AuthToken = tc.aclToken
}
var resp structs.NodePoolJobsResponse
err := msgpackrpc.CallWithCodec(codec, "NodePool.ListJobs", req, &resp)
if tc.expectedError == "" {
must.NoError(t, err)
} else {
must.Error(t, err)
must.ErrorContains(t, err, tc.expectedError)
return
}
got := set.FromFunc(resp.Jobs,
func(j *structs.JobListStub) string { return j.ID })
must.True(t, got.ContainsSlice(tc.expectedIDs),
must.Sprintf("unexpected page of jobs: %v", got))
must.Eq(t, tc.expectedNextToken, resp.QueryMeta.NextToken,
must.Sprint("unexpected NextToken"))
})
}
}
func TestNodePoolEndpoint_ListNodes(t *testing.T) {
ci.Parallel(t)
s, cleanupS := TestServer(t, nil)
defer cleanupS()
codec := rpcClient(t, s)
testutil.WaitForLeader(t, s.RPC)
// Populate state with test data.
pool1 := mock.NodePool()
pool2 := mock.NodePool()
err := s.fsm.State().UpsertNodePools(structs.MsgTypeTestSetup, 1000, []*structs.NodePool{pool1, pool2})
must.NoError(t, err)
// Split test nodes between default, pool1, and pool2.
for i := 0; i < 9; i++ {
node := mock.Node()
switch i % 3 {
case 0:
node.ID = fmt.Sprintf("00000000-0000-0000-0000-0000000000%02d", i/3)
node.NodePool = structs.NodePoolDefault
case 1:
node.ID = fmt.Sprintf("11111111-0000-0000-0000-0000000000%02d", i/3)
node.NodePool = pool1.Name
case 2:
node.ID = fmt.Sprintf("22222222-0000-0000-0000-0000000000%02d", i/3)
node.NodePool = pool2.Name
}
switch i % 2 {
case 0:
node.Attributes["os.name"] = "Windows"
case 1:
node.Attributes["os.name"] = "Linux"
}
err := s.fsm.State().UpsertNode(structs.MsgTypeTestSetup, uint64(1000+1), node)
must.NoError(t, err)
}
testCases := []struct {
name string
req *structs.NodePoolNodesRequest
expectedErr string
expected []string
expectedNextToken string
}{
{
name: "nodes in default",
req: &structs.NodePoolNodesRequest{
Name: structs.NodePoolDefault,
},
expected: []string{
"00000000-0000-0000-0000-000000000000",
"00000000-0000-0000-0000-000000000001",
"00000000-0000-0000-0000-000000000002",
},
},
{
name: "nodes in all",
req: &structs.NodePoolNodesRequest{
Name: structs.NodePoolAll,
},
expected: []string{
"00000000-0000-0000-0000-000000000000",
"00000000-0000-0000-0000-000000000001",
"00000000-0000-0000-0000-000000000002",
"11111111-0000-0000-0000-000000000000",
"11111111-0000-0000-0000-000000000001",
"11111111-0000-0000-0000-000000000002",
"22222222-0000-0000-0000-000000000000",
"22222222-0000-0000-0000-000000000001",
"22222222-0000-0000-0000-000000000002",
},
},
{
name: "nodes in pool1 with OS",
req: &structs.NodePoolNodesRequest{
Name: pool1.Name,
Fields: &structs.NodeStubFields{
OS: true,
},
},
expected: []string{
"11111111-0000-0000-0000-000000000000",
"11111111-0000-0000-0000-000000000001",
"11111111-0000-0000-0000-000000000002",
},
},
{
name: "nodes in pool2 filtered by OS",
req: &structs.NodePoolNodesRequest{
Name: pool2.Name,
QueryOptions: structs.QueryOptions{
Filter: `Attributes["os.name"] == "Windows"`,
},
},
expected: []string{
"22222222-0000-0000-0000-000000000000",
"22222222-0000-0000-0000-000000000002",
},
},
{
name: "nodes in pool1 paginated with resources",
req: &structs.NodePoolNodesRequest{
Name: pool1.Name,
Fields: &structs.NodeStubFields{
Resources: true,
},
QueryOptions: structs.QueryOptions{
PerPage: 2,
},
},
expected: []string{
"11111111-0000-0000-0000-000000000000",
"11111111-0000-0000-0000-000000000001",
},
expectedNextToken: "11111111-0000-0000-0000-000000000002",
},
{
name: "nodes in pool1 paginated with resources - page 2",
req: &structs.NodePoolNodesRequest{
Name: pool1.Name,
Fields: &structs.NodeStubFields{
Resources: true,
},
QueryOptions: structs.QueryOptions{
PerPage: 2,
NextToken: "11111111-0000-0000-0000-000000000002",
},
},
expected: []string{
"11111111-0000-0000-0000-000000000002",
},
expectedNextToken: "",
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
// Always send the request to the global region.
tc.req.Region = "global"
// Make node pool nodes request.
var resp structs.NodePoolNodesResponse
err := msgpackrpc.CallWithCodec(codec, "NodePool.ListNodes", tc.req, &resp)
// Check response.
if tc.expectedErr != "" {
must.ErrorContains(t, err, tc.expectedErr)
must.SliceEmpty(t, resp.Nodes)
} else {
must.NoError(t, err)
got := make([]string, len(resp.Nodes))
for i, stub := range resp.Nodes {
got[i] = stub.ID
}
must.Eq(t, tc.expected, got)
must.Eq(t, tc.expectedNextToken, resp.NextToken)
if tc.req.Fields != nil {
if tc.req.Fields.Resources {
must.NotNil(t, resp.Nodes[0].NodeResources)
must.NotNil(t, resp.Nodes[0].ReservedResources)
}
if tc.req.Fields.OS {
must.NotEq(t, "", resp.Nodes[0].Attributes["os.name"])
}
}
}
})
}
}
func TestNodePoolEndpoint_ListNodes_ACL(t *testing.T) {
ci.Parallel(t)
s, root, cleanupS := TestACLServer(t, nil)
defer cleanupS()
codec := rpcClient(t, s)
testutil.WaitForLeader(t, s.RPC)
// Populate state.
pool := mock.NodePool()
pool.Name = "dev-1"
err := s.fsm.State().UpsertNodePools(structs.MsgTypeTestSetup, 1000, []*structs.NodePool{pool})
must.NoError(t, err)
node := mock.Node()
node.NodePool = pool.Name
err = s.fsm.State().UpsertNode(structs.MsgTypeTestSetup, 1001, node)
must.NoError(t, err)
// Create test ACL tokens.
validToken := mock.CreatePolicyAndToken(t, s.fsm.State(), 1002, "valid",
fmt.Sprintf("%s\n%s", mock.NodePoolPolicy("dev-*", "read", nil), mock.NodePolicy("read")),
)
poolOnlyToken := mock.CreatePolicyAndToken(t, s.fsm.State(), 1004, "pool-only",
mock.NodePoolPolicy("dev-*", "read", nil),
)
nodeOnlyToken := mock.CreatePolicyAndToken(t, s.fsm.State(), 1006, "node-only",
mock.NodePolicy("read"),
)
noPolicyToken := mock.CreateToken(t, s.fsm.State(), 1008, nil)
testCases := []struct {
name string
pool string
token string
expected []string
expectedErr string
}{
{
name: "management token is allowed",
token: root.SecretID,
pool: pool.Name,
expected: []string{node.ID},
},
{
name: "valid token is allowed",
token: validToken.SecretID,
pool: pool.Name,
expected: []string{node.ID},
},
{
name: "pool only not enough",
token: poolOnlyToken.SecretID,
pool: pool.Name,
expectedErr: structs.ErrPermissionDenied.Error(),
},
{
name: "node only not enough",
token: nodeOnlyToken.SecretID,
pool: pool.Name,
expectedErr: structs.ErrPermissionDenied.Error(),
},
{
name: "no policy not allowed",
token: noPolicyToken.SecretID,
pool: pool.Name,
expectedErr: structs.ErrPermissionDenied.Error(),
},
{
name: "token not allowed for pool",
token: validToken.SecretID,
pool: structs.NodePoolDefault,
expectedErr: structs.ErrPermissionDenied.Error(),
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
// Make node pool ndoes request.
req := &structs.NodePoolNodesRequest{
Name: tc.pool,
QueryOptions: structs.QueryOptions{
Region: "global",
AuthToken: tc.token,
},
}
var resp structs.NodePoolNodesResponse
err := msgpackrpc.CallWithCodec(codec, "NodePool.ListNodes", req, &resp)
if tc.expectedErr != "" {
must.ErrorContains(t, err, tc.expectedErr)
must.SliceEmpty(t, resp.Nodes)
} else {
must.NoError(t, err)
// Check response.
got := make([]string, len(resp.Nodes))
for i, node := range resp.Nodes {
got[i] = node.ID
}
must.Eq(t, tc.expected, got)
}
})
}
}
func TestNodePoolEndpoint_ListNodes_BlockingQuery(t *testing.T) {
ci.Parallel(t)
s, cleanupS := TestServer(t, nil)
defer cleanupS()
codec := rpcClient(t, s)
testutil.WaitForLeader(t, s.RPC)
// Populate state with some node pools.
pool := mock.NodePool()
err := s.fsm.State().UpsertNodePools(structs.MsgTypeTestSetup, 1000, []*structs.NodePool{pool})
must.NoError(t, err)
// Register node in pool.
// Insert triggers watchers.
node := mock.Node()
node.NodePool = pool.Name
time.AfterFunc(100*time.Millisecond, func() {
s.fsm.State().UpsertNode(structs.MsgTypeTestSetup, 1001, node)
})
req := &structs.NodePoolNodesRequest{
Name: pool.Name,
QueryOptions: structs.QueryOptions{
Region: "global",
MinQueryIndex: 1000,
},
}
var resp structs.NodePoolNodesResponse
err = msgpackrpc.CallWithCodec(codec, "NodePool.ListNodes", req, &resp)
must.NoError(t, err)
must.Eq(t, 1001, resp.Index)
// Delete triggers watchers.
time.AfterFunc(100*time.Millisecond, func() {
s.fsm.State().DeleteNode(structs.MsgTypeTestSetup, 1002, []string{node.ID})
})
req.MinQueryIndex = 1001
err = msgpackrpc.CallWithCodec(codec, "NodePool.ListNodes", req, &resp)
must.NoError(t, err)
must.Eq(t, 1002, resp.Index)
}