node pool: node pool upsert on multiregion node register (#17503)

When registering a node with a new node pool in a non-authoritative
region we can't create the node pool because this new pool will not be
replicated to other regions.

This commit modifies the node registration logic to only allow automatic
node pool creation in the authoritative region.

In non-authoritative regions, the client is registered, but the node
pool is not created. The client is kept in the `initialing` status until
its node pool is created in the authoritative region and replicated to
the client's region.
This commit is contained in:
Luiz Aoqui 2023-06-13 11:28:28 -04:00 committed by GitHub
parent 952eb2713e
commit bc17cffaef
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 448 additions and 191 deletions

View File

@ -1431,7 +1431,9 @@ Client Options:
-node-pool
Register this node in this node pool. If the node pool does not exist it
will be created automatically when the node registers.
will be created automatically if the node registers in the authoritative
region. In non-authoritative regions, the node is kept in the
'initializing' status until the node pool is created and replicated.
-meta
User specified metadata to associated with the node. Each instance of -meta

View File

@ -204,9 +204,12 @@ type ClientConfig struct {
// NodeClass is used to group the node by class
NodeClass string `hcl:"node_class"`
// NodePool defines the node pool in which the client is registered. If the
// node pool does not exist it will be created automatically when the node
// registers.
// NodePool defines the node pool in which the client is registered.
//
// If the node pool does not exist, it will be created automatically if the
// node registers in the authoritative region. In non-authoritative
// regions, the node is kept in the 'initializing' status until the node
// pool is created and replicated.
NodePool string `hcl:"node_pool"`
// Options is used for configuration of nomad internals,

View File

@ -390,7 +390,13 @@ func (n *nomadFSM) applyUpsertNode(reqType structs.MessageType, buf []byte, inde
// Handle upgrade paths
req.Node.Canonicalize()
if err := n.state.UpsertNode(reqType, index, req.Node); err != nil {
// Upsert node.
var opts []state.NodeUpsertOption
if req.CreateNodePool {
opts = append(opts, state.NodeUpsertWithNodePool)
}
if err := n.state.UpsertNode(reqType, index, req.Node, opts...); err != nil {
n.logger.Error("UpsertNode failed", "error", err)
return err
}

View File

@ -244,100 +244,44 @@ func TestFSM_UpsertNode_Canonicalize_Ineligible(t *testing.T) {
func TestFSM_UpsertNode_NodePool(t *testing.T) {
ci.Parallel(t)
existingPoolName := "dev"
nodeWithPoolID := uuid.Generate()
nodeWithoutPoolID := uuid.Generate()
testCases := []struct {
name string
nodeID string
pool string
expectedPool string
validateFn func(*testing.T, *structs.Node, *structs.NodePool)
name string
setupReqFn func(*structs.NodeRegisterRequest)
validateFn func(*testing.T, *structs.Node, *structs.NodePool)
}{
{
name: "register new node in new node pool",
nodeID: "",
pool: "new",
expectedPool: "new",
validateFn: func(t *testing.T, node *structs.Node, pool *structs.NodePool) {
// Verify node pool was created in the same transaction as the
// node registration.
must.Eq(t, pool.CreateIndex, node.ModifyIndex)
name: "node with empty node pool is placed in defualt",
setupReqFn: func(req *structs.NodeRegisterRequest) {
req.Node.NodePool = ""
},
},
{
name: "register new node in existing node pool",
nodeID: "",
pool: existingPoolName,
expectedPool: existingPoolName,
validateFn: func(t *testing.T, node *structs.Node, pool *structs.NodePool) {
// Verify node pool was not modified.
must.NotEq(t, pool.CreateIndex, node.ModifyIndex)
},
},
{
name: "register new node in built-in node pool",
nodeID: "",
pool: structs.NodePoolDefault,
expectedPool: structs.NodePoolDefault,
validateFn: func(t *testing.T, node *structs.Node, pool *structs.NodePool) {
// Verify node pool was not modified.
must.Eq(t, structs.NodePoolDefault, node.NodePool)
must.Eq(t, 1, pool.ModifyIndex)
},
},
{
name: "register new node with empty node pool in default",
nodeID: "",
pool: "",
expectedPool: structs.NodePoolDefault,
},
{
name: "move existing node to new node pool",
nodeID: nodeWithPoolID,
pool: "new",
expectedPool: "new",
name: "create new node pool with node",
setupReqFn: func(req *structs.NodeRegisterRequest) {
req.Node.NodePool = "new"
req.CreateNodePool = true
},
validateFn: func(t *testing.T, node *structs.Node, pool *structs.NodePool) {
// Verify node pool was created in the same transaction as the
// node was updated.
must.Eq(t, pool.CreateIndex, node.ModifyIndex)
must.NotNil(t, pool)
must.Eq(t, "new", pool.Name)
must.Eq(t, pool.Name, node.NodePool)
must.Eq(t, node.ModifyIndex, pool.CreateIndex)
},
},
{
name: "move existing node to existing node pool",
nodeID: nodeWithPoolID,
pool: existingPoolName,
expectedPool: existingPoolName,
},
{
name: "move existing node to built-in node pool",
nodeID: nodeWithPoolID,
pool: structs.NodePoolDefault,
expectedPool: structs.NodePoolDefault,
},
{
name: "move existing node with empty node pool to default",
nodeID: nodeWithPoolID,
pool: "",
expectedPool: structs.NodePoolDefault,
},
{
name: "update node without pool to new node pool",
nodeID: nodeWithoutPoolID,
pool: "new",
expectedPool: "new",
},
{
name: "update node without pool to existing node pool",
nodeID: nodeWithoutPoolID,
pool: existingPoolName,
expectedPool: existingPoolName,
},
{
name: "update node without pool with empty string to default",
nodeID: nodeWithoutPoolID,
pool: "",
expectedPool: structs.NodePoolDefault,
name: "don't create new node pool with node",
setupReqFn: func(req *structs.NodeRegisterRequest) {
req.Node.NodePool = "new"
req.CreateNodePool = false
},
validateFn: func(t *testing.T, node *structs.Node, pool *structs.NodePool) {
must.Nil(t, pool)
must.Eq(t, "new", node.NodePool)
},
},
}
@ -345,42 +289,13 @@ func TestFSM_UpsertNode_NodePool(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
fsm := testFSM(t)
// Populate state with a test node pool.
existingPool := mock.NodePool()
existingPool.Name = existingPoolName
err := fsm.State().UpsertNodePools(structs.MsgTypeTestSetup, 1000, []*structs.NodePool{existingPool})
must.NoError(t, err)
// Populate state with pre-existing node assigned to the
// pre-existing node pool.
nodeWithPool := mock.Node()
nodeWithPool.ID = nodeWithPoolID
nodeWithPool.NodePool = existingPool.Name
err = fsm.State().UpsertNode(structs.MsgTypeTestSetup, 1001, nodeWithPool)
must.NoError(t, err)
// Populate state with pre-existing node with empty node pool to
// simulate an upgrade path.
nodeWithoutPool := mock.Node()
nodeWithoutPool.ID = nodeWithoutPoolID
err = fsm.State().UpsertNode(structs.MsgTypeTestSetup, 1002, nodeWithoutPool)
must.NoError(t, err)
// Upsert test node.
var node *structs.Node
switch tc.nodeID {
case nodeWithPoolID:
node = nodeWithPool.Copy()
case nodeWithoutPoolID:
node = nodeWithoutPool.Copy()
default:
node = mock.Node()
node := mock.Node()
req := structs.NodeRegisterRequest{
Node: node,
}
if tc.setupReqFn != nil {
tc.setupReqFn(&req)
}
// Set the node pool and apply node register log.
node.NodePool = tc.pool
req := structs.NodeRegisterRequest{Node: node}
buf, err := structs.Encode(structs.NodeRegisterRequestType, req)
must.NoError(t, err)
@ -390,21 +305,14 @@ func TestFSM_UpsertNode_NodePool(t *testing.T) {
// Snapshot the state.
s := fsm.State()
// Verify node exists.
got, err := s.NodeByID(nil, node.ID)
gotNode, err := s.NodeByID(nil, node.ID)
must.NoError(t, err)
must.NotNil(t, got)
// Verify node pool exists.
pool, err := s.NodePoolByName(nil, tc.expectedPool)
gotPool, err := s.NodePoolByName(nil, gotNode.NodePool)
must.NoError(t, err)
must.NotNil(t, pool)
// Verify node was assigned to node pool.
must.Eq(t, tc.expectedPool, got.NodePool)
if tc.validateFn != nil {
tc.validateFn(t, got, pool)
tc.validateFn(t, gotNode, gotPool)
}
})
}

View File

@ -52,6 +52,10 @@ const (
// NodeHeartbeatEventReregistered is the message used when the node becomes
// reregistered by the heartbeat.
NodeHeartbeatEventReregistered = "Node reregistered by heartbeat"
// NodeWaitingForNodePool is the message used when the node is waiting for
// its node pool to be created.
NodeWaitingForNodePool = "Node registered but waiting for node pool to be created"
)
// Node endpoint is used for client interactions
@ -135,8 +139,6 @@ func (n *Node) Register(args *structs.NodeRegisterRequest, reply *structs.NodeUp
if args.Node.SecretID == "" {
return fmt.Errorf("missing node secret ID for client registration")
}
// Validate node pool value if provided. The node is canonicalized in the
// FSM, where an empty node pool is set to "default".
if args.Node.NodePool != "" {
err := structs.ValidateNodePoolName(args.Node.NodePool)
if err != nil {
@ -160,6 +162,11 @@ func (n *Node) Register(args *structs.NodeRegisterRequest, reply *structs.NodeUp
args.Node.SchedulingEligibility = structs.NodeSchedulingEligible
}
// Default the node pool if none is given.
if args.Node.NodePool == "" {
args.Node.NodePool = structs.NodePoolDefault
}
// Set the timestamp when the node is registered
args.Node.StatusUpdatedAt = time.Now().Unix()
@ -201,7 +208,18 @@ func (n *Node) Register(args *structs.NodeRegisterRequest, reply *structs.NodeUp
n.srv.addNodeConn(n.ctx)
}
// Commit this update via Raft
// Commit this update via Raft.
//
// Only the authoritative region is allowed to create the node pool for the
// node if it doesn't exist yet. This prevents non-authoritative regions
// from having to push their local state to the authoritative region.
//
// Nodes in non-authoritative regions that are registered with a new node
// pool are kept in the `initializing` status until the node pool is
// created and replicated.
if n.srv.Region() == n.srv.config.AuthoritativeRegion {
args.CreateNodePool = true
}
_, index, err := n.srv.raftApply(structs.NodeRegisterRequestType, args)
if err != nil {
n.logger.Error("register failed", "error", err)
@ -491,15 +509,24 @@ func (n *Node) deregister(args *structs.NodeBatchDeregisterRequest,
// Clients with non-terminal allocations must first call UpdateAlloc to be able
// to transition from the initializing status to ready.
//
// Clients node pool must exist for them to be able to transition from
// initializing to ready.
//
// ┌────────────────────────────────────── No ───┐
// │ │
// ┌──▼───┐ ┌─────────────┐ ┌────────┴────────┐
// ── Register ─► init ├─ ready ──► Has allocs? ├─ Yes ─► Allocs updated? │
// └──▲───┘ └─────┬───────┘ └────────┬────────┘
// │ │ │
// ready └─ No ─┐ ┌─────── Yes ──┘
// │ │ │
// ┌──────┴───────┐ ┌──▼──▼─┐ ┌──────┐
// └──▲──▲┘ └─────┬───────┘ └────────┬────────┘
// │ │ │ │
// │ │ └─ No ─┐ ┌─────── Yes ──┘
// │ │ │ │
// │ │ ┌────────▼──▼───────┐
// │ └──────────No───┤ Node pool exists? │
// │ └─────────┬─────────┘
// │ │
// ready Yes
// │ │
// ┌──────┴───────┐ ┌───▼───┐ ┌──────┐
// │ disconnected ◄─ disconnected ─┤ ready ├─ down ──► down │
// └──────────────┘ └───▲───┘ └──┬───┘
// │ │
@ -567,6 +594,8 @@ func (n *Node) UpdateStatus(args *structs.NodeUpdateStatusRequest, reply *struct
switch node.Status {
case structs.NodeStatusInit:
if args.Status == structs.NodeStatusReady {
// Keep node in the initializing status if it has allocations but
// they are not updated.
allocs, err := snap.AllocsByNodeTerminal(ws, args.NodeID, false)
if err != nil {
return fmt.Errorf("failed to query node allocs: %v", err)
@ -574,8 +603,26 @@ func (n *Node) UpdateStatus(args *structs.NodeUpdateStatusRequest, reply *struct
allocsUpdated := node.LastAllocUpdateIndex > node.LastMissedHeartbeatIndex
if len(allocs) > 0 && !allocsUpdated {
n.logger.Debug("marking node as %s due to outdated allocation information", structs.NodeStatusInit)
args.Status = structs.NodeStatusInit
}
// Keep node in the initialing status if it's in a node pool that
// doesn't exist.
pool, err := snap.NodePoolByName(ws, node.NodePool)
if err != nil {
return fmt.Errorf("failed to query node pool: %v", err)
}
if pool == nil {
n.logger.Debug("marking node as %s due to missing node pool", structs.NodeStatusInit)
args.Status = structs.NodeStatusInit
if !node.HasEvent(NodeWaitingForNodePool) {
args.NodeEvent = structs.NewNodeEvent().
SetSubsystem(structs.NodeEventSubsystemCluster).
SetMessage(NodeWaitingForNodePool).
AddDetail("node_pool", node.NodePool)
}
}
}
case structs.NodeStatusDisconnected:
if args.Status == structs.NodeStatusReady {
@ -585,7 +632,7 @@ func (n *Node) UpdateStatus(args *structs.NodeUpdateStatusRequest, reply *struct
// Commit this update via Raft
var index uint64
if node.Status != args.Status {
if node.Status != args.Status || args.NodeEvent != nil {
// Attach an event if we are updating the node status to ready when it
// is down via a heartbeat
if node.Status == structs.NodeStatusDown && args.NodeEvent == nil {

View File

@ -8,6 +8,7 @@ import (
"errors"
"fmt"
"net"
"net/rpc"
"reflect"
"strings"
"testing"
@ -28,6 +29,7 @@ import (
vapi "github.com/hashicorp/vault/api"
"github.com/kr/pretty"
"github.com/shoenig/test/must"
"github.com/shoenig/test/wait"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
@ -300,6 +302,205 @@ func TestClientEndpoint_Register_NodePool(t *testing.T) {
}
}
func TestClientEndpoint_Register_NodePool_Multiregion(t *testing.T) {
ci.Parallel(t)
// Helper function to setup client heartbeat.
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
heartbeat := func(ctx context.Context, codec rpc.ClientCodec, req *structs.NodeUpdateStatusRequest) {
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
default:
}
var resp structs.NodeUpdateResponse
msgpackrpc.CallWithCodec(codec, "Node.UpdateStatus", req, &resp)
}
}
// Create servers in two regions.
s1, rootToken1, cleanupS1 := TestACLServer(t, func(c *Config) {
c.Region = "region-1"
c.AuthoritativeRegion = "region-1"
})
defer cleanupS1()
codec1 := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
s2, _, cleanupS2 := TestACLServer(t, func(c *Config) {
c.Region = "region-2"
c.AuthoritativeRegion = "region-1"
// Speed-up replication for testing.
c.ReplicationBackoff = 500 * time.Millisecond
c.ReplicationToken = rootToken1.SecretID
})
defer cleanupS2()
codec2 := rpcClient(t, s2)
testutil.WaitForLeader(t, s2.RPC)
// Verify that registering a node with a new node pool in the authoritative
// region creates the node pool.
node1 := mock.Node()
node1.Status = ""
node1.NodePool = "new-pool-region-1"
// Register node in region-1.
req := &structs.NodeRegisterRequest{
Node: node1,
WriteRequest: structs.WriteRequest{Region: "region-1"},
}
var resp structs.GenericResponse
err := msgpackrpc.CallWithCodec(codec1, "Node.Register", req, &resp)
must.NoError(t, err)
// Setup heartbeat for node in region-1.
go heartbeat(ctx, rpcClient(t, s1), &structs.NodeUpdateStatusRequest{
NodeID: node1.ID,
Status: structs.NodeStatusReady,
WriteRequest: structs.WriteRequest{Region: "region-1"},
})
// Verify client becomes ready.
must.Wait(t, wait.InitialSuccess(
wait.ErrorFunc(func() error {
n, err := s1.State().NodeByID(nil, node1.ID)
if err != nil {
return err
}
if n.Status != structs.NodeStatusReady {
return fmt.Errorf("expected node to be %s, got %s", structs.NodeStatusReady, n.Status)
}
return nil
}),
wait.Timeout(10*time.Second),
wait.Gap(time.Second),
))
// Verify that registering a node with a new node pool in the
// non-authoritative region does not create the node pool and the client is
// kept in the initializing status.
node2 := mock.Node()
node2.Status = ""
node2.NodePool = "new-pool-region-2"
// Register node in region-2.
req = &structs.NodeRegisterRequest{
Node: node2,
WriteRequest: structs.WriteRequest{Region: "region-2"},
}
err = msgpackrpc.CallWithCodec(codec2, "Node.Register", req, &resp)
must.NoError(t, err)
// Setup heartbeat for node in region-2.
go heartbeat(ctx, rpcClient(t, s2), &structs.NodeUpdateStatusRequest{
NodeID: node2.ID,
Status: structs.NodeStatusReady,
WriteRequest: structs.WriteRequest{Region: "region-2"},
})
// Verify client is kept at the initializing status and has a node pool
// missing event.
must.Wait(t, wait.InitialSuccess(
wait.ErrorFunc(func() error {
n, err := s2.State().NodeByID(nil, node2.ID)
if err != nil {
return err
}
if !n.HasEvent(NodeWaitingForNodePool) {
return fmt.Errorf("node pool missing event not found:\n%v", n.Events)
}
return nil
}),
wait.Timeout(10*time.Second),
wait.Gap(time.Second),
))
must.Wait(t, wait.ContinualSuccess(
wait.ErrorFunc(func() error {
n, err := s2.State().NodeByID(nil, node2.ID)
if err != nil {
return err
}
if n.Status != structs.NodeStatusInit {
return fmt.Errorf("expected node to be %s, got %s", structs.NodeStatusInit, n.Status)
}
return nil
}),
wait.Timeout(time.Second),
wait.Gap(time.Second),
))
// Federate regions.
TestJoin(t, s1, s2)
// Verify node pool from authoritative region is replicated.
must.Wait(t, wait.InitialSuccess(
wait.ErrorFunc(func() error {
poolName := node1.NodePool
pool, err := s2.State().NodePoolByName(nil, poolName)
if err != nil {
return err
}
if pool == nil {
return fmt.Errorf("node pool %s not found in region-2", poolName)
}
return nil
}),
wait.Timeout(10*time.Second),
wait.Gap(time.Second),
))
// Create node pool for region-2.
nodePoolReq := &structs.NodePoolUpsertRequest{
NodePools: []*structs.NodePool{{Name: node2.NodePool}},
WriteRequest: structs.WriteRequest{
Region: "region-2",
AuthToken: rootToken1.SecretID,
},
}
var nodePoolResp *structs.GenericResponse
err = msgpackrpc.CallWithCodec(codec2, "NodePool.UpsertNodePools", nodePoolReq, &nodePoolResp)
must.NoError(t, err)
// Verify node pool exists in both regions and the node in region-2 is now
// ready.
must.Wait(t, wait.InitialSuccess(
wait.ErrorFunc(func() error {
for region, s := range map[string]*state.StateStore{
"region-1": s1.State(),
"region-2": s2.State(),
} {
poolName := node2.NodePool
pool, err := s.NodePoolByName(nil, poolName)
if err != nil {
return err
}
if pool == nil {
return fmt.Errorf("expected node pool %s to exist in region %s", poolName, region)
}
}
n, err := s2.State().NodeByID(nil, node2.ID)
if err != nil {
return err
}
if n.Status != structs.NodeStatusReady {
return fmt.Errorf("expected node to be %s, got %s", structs.NodeStatusReady, n.Status)
}
return nil
}),
wait.Timeout(10*time.Second),
wait.Gap(time.Second),
))
}
// Test the deprecated single node deregistration path
func TestClientEndpoint_DeregisterOne(t *testing.T) {
ci.Parallel(t)

View File

@ -41,6 +41,15 @@ const (
SortReverse SortOption = true
)
// NodeUpsertOption represents options to configure a NodeUpsert operation.
type NodeUpsertOption uint8
const (
// NodeUpsertWithNodePool indicates that the node pool in the node should
// be created if it doesn't exist.
NodeUpsertWithNodePool NodeUpsertOption = iota
)
const (
// NodeEligibilityEventPlanRejectThreshold is the message used when the node
// is set to ineligible due to multiple plan failures.
@ -896,15 +905,17 @@ func (s *StateStore) ScalingEventsByJob(ws memdb.WatchSet, namespace, jobID stri
// UpsertNode is used to register a node or update a node definition
// This is assumed to be triggered by the client, so we retain the value
// of drain/eligibility which is set by the scheduler.
func (s *StateStore) UpsertNode(msgType structs.MessageType, index uint64, node *structs.Node) error {
func (s *StateStore) UpsertNode(msgType structs.MessageType, index uint64, node *structs.Node, opts ...NodeUpsertOption) error {
txn := s.db.WriteTxnMsgT(msgType, index)
defer txn.Abort()
// Create node pool if necessary.
if node.NodePool != "" {
_, err := s.fetchOrCreateNodePoolTxn(txn, index, node.NodePool)
if err != nil {
return err
for _, opt := range opts {
// Create node pool if necessary.
if opt == NodeUpsertWithNodePool && node.NodePool != "" {
_, err := s.fetchOrCreateNodePoolTxn(txn, index, node.NodePool)
if err != nil {
return err
}
}
}

View File

@ -1284,50 +1284,105 @@ func TestStateStore_UpsertNode_NodePool(t *testing.T) {
nodeWithoutPoolID := uuid.Generate()
testCases := []struct {
name string
nodeID string
poolName string
expectedPool string
expectedNewPool bool
name string
nodeID string
pool string
createPool bool
expectedPool string
expectedPoolExists bool
validateFn func(*testing.T, *structs.Node, *structs.NodePool)
}{
{
name: "register new node in existing pool",
nodeID: "",
poolName: devPoolName,
expectedPool: devPoolName,
name: "register new node in new node pool",
nodeID: "",
pool: "new",
createPool: true,
expectedPool: "new",
expectedPoolExists: true,
validateFn: func(t *testing.T, node *structs.Node, pool *structs.NodePool) {
// Verify node pool was created in the same transaction as the
// node registration.
must.Eq(t, pool.CreateIndex, node.ModifyIndex)
},
},
{
name: "register new node in new pool",
nodeID: "",
poolName: "new",
expectedPool: "new",
expectedNewPool: true,
name: "register new node in existing node pool",
nodeID: "",
pool: devPoolName,
expectedPool: devPoolName,
expectedPoolExists: true,
validateFn: func(t *testing.T, node *structs.Node, pool *structs.NodePool) {
// Verify node pool was not modified.
must.NotEq(t, pool.CreateIndex, node.ModifyIndex)
},
},
{
name: "update existing node in existing pool",
nodeID: nodeWithPoolID,
poolName: devPoolName,
expectedPool: devPoolName,
name: "register new node in built-in node pool",
nodeID: "",
pool: structs.NodePoolDefault,
expectedPool: structs.NodePoolDefault,
expectedPoolExists: true,
validateFn: func(t *testing.T, node *structs.Node, pool *structs.NodePool) {
// Verify node pool was not modified.
must.Eq(t, 1, pool.ModifyIndex)
},
},
{
name: "update existing node with new pool",
nodeID: nodeWithPoolID,
poolName: "new",
expectedPool: "new",
expectedNewPool: true,
name: "move existing node to new node pool",
nodeID: nodeWithPoolID,
pool: "new",
createPool: true,
expectedPool: "new",
expectedPoolExists: true,
validateFn: func(t *testing.T, node *structs.Node, pool *structs.NodePool) {
// Verify node pool was created in the same transaction as the
// node was updated.
must.Eq(t, pool.CreateIndex, node.ModifyIndex)
},
},
{
name: "update legacy node with pool",
nodeID: nodeWithoutPoolID,
poolName: devPoolName,
expectedPool: devPoolName,
name: "move existing node to existing node pool",
nodeID: nodeWithPoolID,
pool: devPoolName,
expectedPool: devPoolName,
expectedPoolExists: true,
},
{
name: "update legacy node with new pool",
nodeID: nodeWithoutPoolID,
poolName: "new",
expectedPool: "new",
expectedNewPool: true,
name: "move existing node to built-in node pool",
nodeID: nodeWithPoolID,
pool: structs.NodePoolDefault,
expectedPool: structs.NodePoolDefault,
expectedPoolExists: true,
},
{
name: "update node without pool to new node pool",
nodeID: nodeWithoutPoolID,
pool: "new",
createPool: true,
expectedPool: "new",
expectedPoolExists: true,
},
{
name: "update node without pool to existing node pool",
nodeID: nodeWithoutPoolID,
pool: devPoolName,
expectedPool: devPoolName,
expectedPoolExists: true,
},
{
name: "update node without pool with empty string to default",
nodeID: nodeWithoutPoolID,
pool: "",
expectedPool: structs.NodePoolDefault,
expectedPoolExists: true,
},
{
name: "register new node in new node pool without creating it",
nodeID: "",
pool: "new",
createPool: false,
expectedPool: "new",
expectedPoolExists: false,
},
}
@ -1366,26 +1421,34 @@ func TestStateStore_UpsertNode_NodePool(t *testing.T) {
default:
node = mock.Node()
}
node.NodePool = tc.poolName
err = state.UpsertNode(structs.MsgTypeTestSetup, 1003, node)
node.NodePool = tc.pool
opts := []NodeUpsertOption{}
if tc.createPool {
opts = append(opts, NodeUpsertWithNodePool)
}
err = state.UpsertNode(structs.MsgTypeTestSetup, 1003, node, opts...)
must.NoError(t, err)
// Verify that node is part of the expected pool.
got, err := state.NodeByID(nil, node.ID)
must.NoError(t, err)
must.Eq(t, tc.expectedPool, got.NodePool)
must.NotNil(t, got)
// Fech pool.
// Verify node pool exists if requests.
pool, err := state.NodePoolByName(nil, tc.expectedPool)
must.NoError(t, err)
must.NotNil(t, pool)
if tc.expectedNewPool {
// Verify that pool was created along with node registration.
must.Eq(t, node.ModifyIndex, pool.CreateIndex)
if tc.expectedPoolExists {
must.NotNil(t, pool)
} else {
// Verify that pool was not modified.
must.Less(t, node.ModifyIndex, pool.ModifyIndex)
must.Nil(t, pool)
}
// Verify node was assigned to node pool.
must.Eq(t, tc.expectedPool, got.NodePool)
if tc.validateFn != nil {
tc.validateFn(t, got, pool)
}
})
}

View File

@ -581,6 +581,11 @@ type WriteMeta struct {
type NodeRegisterRequest struct {
Node *Node
NodeEvent *NodeEvent
// CreateNodePool is used to indicate that the node's node pool should be
// create along with the node registration if it doesn't exist.
CreateNodePool bool
WriteRequest
}
@ -2363,6 +2368,16 @@ func (n *Node) IsInPool(pool string) bool {
return pool == NodePoolAll || n.NodePool == pool
}
// HasEvent returns true if the node has the given message in its events list.
func (n *Node) HasEvent(msg string) bool {
for _, ev := range n.Events {
if ev.Message == msg {
return true
}
}
return false
}
// Stub returns a summarized version of the node
func (n *Node) Stub(fields *NodeStubFields) *NodeListStub {

View File

@ -88,8 +88,9 @@ client {
- `node_pool` `(string: "default")` - Specifies the node pool in which the
client is registered. If the node pool does not exist yet, it will be created
automatically when the node registers with the cluster. If not specified the
`default` node pool is used.
automatically if the node registers in the authoritative region. In
non-authoritative regions, the node is kept in the `initializing` status
until the node pool is created and replicated.
- `options` <code>([Options](#options-parameters): nil)</code> - Specifies a
key-value mapping of internal configuration for clients, such as for driver