diff --git a/command/agent/command.go b/command/agent/command.go index 4c8bb2cef..c28b01430 100644 --- a/command/agent/command.go +++ b/command/agent/command.go @@ -1431,7 +1431,9 @@ Client Options: -node-pool Register this node in this node pool. If the node pool does not exist it - will be created automatically when the node registers. + will be created automatically if the node registers in the authoritative + region. In non-authoritative regions, the node is kept in the + 'initializing' status until the node pool is created and replicated. -meta User specified metadata to associated with the node. Each instance of -meta diff --git a/command/agent/config.go b/command/agent/config.go index ab14df123..f8f299401 100644 --- a/command/agent/config.go +++ b/command/agent/config.go @@ -204,9 +204,12 @@ type ClientConfig struct { // NodeClass is used to group the node by class NodeClass string `hcl:"node_class"` - // NodePool defines the node pool in which the client is registered. If the - // node pool does not exist it will be created automatically when the node - // registers. + // NodePool defines the node pool in which the client is registered. + // + // If the node pool does not exist, it will be created automatically if the + // node registers in the authoritative region. In non-authoritative + // regions, the node is kept in the 'initializing' status until the node + // pool is created and replicated. NodePool string `hcl:"node_pool"` // Options is used for configuration of nomad internals, diff --git a/nomad/fsm.go b/nomad/fsm.go index 5de959fb9..818c97dd3 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -390,7 +390,13 @@ func (n *nomadFSM) applyUpsertNode(reqType structs.MessageType, buf []byte, inde // Handle upgrade paths req.Node.Canonicalize() - if err := n.state.UpsertNode(reqType, index, req.Node); err != nil { + // Upsert node. + var opts []state.NodeUpsertOption + if req.CreateNodePool { + opts = append(opts, state.NodeUpsertWithNodePool) + } + + if err := n.state.UpsertNode(reqType, index, req.Node, opts...); err != nil { n.logger.Error("UpsertNode failed", "error", err) return err } diff --git a/nomad/fsm_test.go b/nomad/fsm_test.go index 8e800300c..e937ec702 100644 --- a/nomad/fsm_test.go +++ b/nomad/fsm_test.go @@ -244,100 +244,44 @@ func TestFSM_UpsertNode_Canonicalize_Ineligible(t *testing.T) { func TestFSM_UpsertNode_NodePool(t *testing.T) { ci.Parallel(t) - existingPoolName := "dev" - nodeWithPoolID := uuid.Generate() - nodeWithoutPoolID := uuid.Generate() - testCases := []struct { - name string - nodeID string - pool string - expectedPool string - validateFn func(*testing.T, *structs.Node, *structs.NodePool) + name string + setupReqFn func(*structs.NodeRegisterRequest) + validateFn func(*testing.T, *structs.Node, *structs.NodePool) }{ { - name: "register new node in new node pool", - nodeID: "", - pool: "new", - expectedPool: "new", - validateFn: func(t *testing.T, node *structs.Node, pool *structs.NodePool) { - // Verify node pool was created in the same transaction as the - // node registration. - must.Eq(t, pool.CreateIndex, node.ModifyIndex) + name: "node with empty node pool is placed in defualt", + setupReqFn: func(req *structs.NodeRegisterRequest) { + req.Node.NodePool = "" }, - }, - { - name: "register new node in existing node pool", - nodeID: "", - pool: existingPoolName, - expectedPool: existingPoolName, validateFn: func(t *testing.T, node *structs.Node, pool *structs.NodePool) { - // Verify node pool was not modified. - must.NotEq(t, pool.CreateIndex, node.ModifyIndex) - }, - }, - { - name: "register new node in built-in node pool", - nodeID: "", - pool: structs.NodePoolDefault, - expectedPool: structs.NodePoolDefault, - validateFn: func(t *testing.T, node *structs.Node, pool *structs.NodePool) { - // Verify node pool was not modified. + must.Eq(t, structs.NodePoolDefault, node.NodePool) must.Eq(t, 1, pool.ModifyIndex) }, }, { - name: "register new node with empty node pool in default", - nodeID: "", - pool: "", - expectedPool: structs.NodePoolDefault, - }, - { - name: "move existing node to new node pool", - nodeID: nodeWithPoolID, - pool: "new", - expectedPool: "new", + name: "create new node pool with node", + setupReqFn: func(req *structs.NodeRegisterRequest) { + req.Node.NodePool = "new" + req.CreateNodePool = true + }, validateFn: func(t *testing.T, node *structs.Node, pool *structs.NodePool) { - // Verify node pool was created in the same transaction as the - // node was updated. - must.Eq(t, pool.CreateIndex, node.ModifyIndex) + must.NotNil(t, pool) + must.Eq(t, "new", pool.Name) + must.Eq(t, pool.Name, node.NodePool) + must.Eq(t, node.ModifyIndex, pool.CreateIndex) }, }, { - name: "move existing node to existing node pool", - nodeID: nodeWithPoolID, - pool: existingPoolName, - expectedPool: existingPoolName, - }, - { - name: "move existing node to built-in node pool", - nodeID: nodeWithPoolID, - pool: structs.NodePoolDefault, - expectedPool: structs.NodePoolDefault, - }, - { - name: "move existing node with empty node pool to default", - nodeID: nodeWithPoolID, - pool: "", - expectedPool: structs.NodePoolDefault, - }, - { - name: "update node without pool to new node pool", - nodeID: nodeWithoutPoolID, - pool: "new", - expectedPool: "new", - }, - { - name: "update node without pool to existing node pool", - nodeID: nodeWithoutPoolID, - pool: existingPoolName, - expectedPool: existingPoolName, - }, - { - name: "update node without pool with empty string to default", - nodeID: nodeWithoutPoolID, - pool: "", - expectedPool: structs.NodePoolDefault, + name: "don't create new node pool with node", + setupReqFn: func(req *structs.NodeRegisterRequest) { + req.Node.NodePool = "new" + req.CreateNodePool = false + }, + validateFn: func(t *testing.T, node *structs.Node, pool *structs.NodePool) { + must.Nil(t, pool) + must.Eq(t, "new", node.NodePool) + }, }, } @@ -345,42 +289,13 @@ func TestFSM_UpsertNode_NodePool(t *testing.T) { t.Run(tc.name, func(t *testing.T) { fsm := testFSM(t) - // Populate state with a test node pool. - existingPool := mock.NodePool() - existingPool.Name = existingPoolName - err := fsm.State().UpsertNodePools(structs.MsgTypeTestSetup, 1000, []*structs.NodePool{existingPool}) - must.NoError(t, err) - - // Populate state with pre-existing node assigned to the - // pre-existing node pool. - nodeWithPool := mock.Node() - nodeWithPool.ID = nodeWithPoolID - nodeWithPool.NodePool = existingPool.Name - - err = fsm.State().UpsertNode(structs.MsgTypeTestSetup, 1001, nodeWithPool) - must.NoError(t, err) - - // Populate state with pre-existing node with empty node pool to - // simulate an upgrade path. - nodeWithoutPool := mock.Node() - nodeWithoutPool.ID = nodeWithoutPoolID - err = fsm.State().UpsertNode(structs.MsgTypeTestSetup, 1002, nodeWithoutPool) - must.NoError(t, err) - - // Upsert test node. - var node *structs.Node - switch tc.nodeID { - case nodeWithPoolID: - node = nodeWithPool.Copy() - case nodeWithoutPoolID: - node = nodeWithoutPool.Copy() - default: - node = mock.Node() + node := mock.Node() + req := structs.NodeRegisterRequest{ + Node: node, + } + if tc.setupReqFn != nil { + tc.setupReqFn(&req) } - - // Set the node pool and apply node register log. - node.NodePool = tc.pool - req := structs.NodeRegisterRequest{Node: node} buf, err := structs.Encode(structs.NodeRegisterRequestType, req) must.NoError(t, err) @@ -390,21 +305,14 @@ func TestFSM_UpsertNode_NodePool(t *testing.T) { // Snapshot the state. s := fsm.State() - // Verify node exists. - got, err := s.NodeByID(nil, node.ID) + gotNode, err := s.NodeByID(nil, node.ID) must.NoError(t, err) - must.NotNil(t, got) - // Verify node pool exists. - pool, err := s.NodePoolByName(nil, tc.expectedPool) + gotPool, err := s.NodePoolByName(nil, gotNode.NodePool) must.NoError(t, err) - must.NotNil(t, pool) - - // Verify node was assigned to node pool. - must.Eq(t, tc.expectedPool, got.NodePool) if tc.validateFn != nil { - tc.validateFn(t, got, pool) + tc.validateFn(t, gotNode, gotPool) } }) } diff --git a/nomad/node_endpoint.go b/nomad/node_endpoint.go index 29cd571f8..1172692c3 100644 --- a/nomad/node_endpoint.go +++ b/nomad/node_endpoint.go @@ -52,6 +52,10 @@ const ( // NodeHeartbeatEventReregistered is the message used when the node becomes // reregistered by the heartbeat. NodeHeartbeatEventReregistered = "Node reregistered by heartbeat" + + // NodeWaitingForNodePool is the message used when the node is waiting for + // its node pool to be created. + NodeWaitingForNodePool = "Node registered but waiting for node pool to be created" ) // Node endpoint is used for client interactions @@ -135,8 +139,6 @@ func (n *Node) Register(args *structs.NodeRegisterRequest, reply *structs.NodeUp if args.Node.SecretID == "" { return fmt.Errorf("missing node secret ID for client registration") } - // Validate node pool value if provided. The node is canonicalized in the - // FSM, where an empty node pool is set to "default". if args.Node.NodePool != "" { err := structs.ValidateNodePoolName(args.Node.NodePool) if err != nil { @@ -160,6 +162,11 @@ func (n *Node) Register(args *structs.NodeRegisterRequest, reply *structs.NodeUp args.Node.SchedulingEligibility = structs.NodeSchedulingEligible } + // Default the node pool if none is given. + if args.Node.NodePool == "" { + args.Node.NodePool = structs.NodePoolDefault + } + // Set the timestamp when the node is registered args.Node.StatusUpdatedAt = time.Now().Unix() @@ -201,7 +208,18 @@ func (n *Node) Register(args *structs.NodeRegisterRequest, reply *structs.NodeUp n.srv.addNodeConn(n.ctx) } - // Commit this update via Raft + // Commit this update via Raft. + // + // Only the authoritative region is allowed to create the node pool for the + // node if it doesn't exist yet. This prevents non-authoritative regions + // from having to push their local state to the authoritative region. + // + // Nodes in non-authoritative regions that are registered with a new node + // pool are kept in the `initializing` status until the node pool is + // created and replicated. + if n.srv.Region() == n.srv.config.AuthoritativeRegion { + args.CreateNodePool = true + } _, index, err := n.srv.raftApply(structs.NodeRegisterRequestType, args) if err != nil { n.logger.Error("register failed", "error", err) @@ -491,15 +509,24 @@ func (n *Node) deregister(args *structs.NodeBatchDeregisterRequest, // Clients with non-terminal allocations must first call UpdateAlloc to be able // to transition from the initializing status to ready. // +// Clients node pool must exist for them to be able to transition from +// initializing to ready. +// // ┌────────────────────────────────────── No ───┐ // │ │ // ┌──▼───┐ ┌─────────────┐ ┌────────┴────────┐ // ── Register ─► init ├─ ready ──► Has allocs? ├─ Yes ─► Allocs updated? │ -// └──▲───┘ └─────┬───────┘ └────────┬────────┘ -// │ │ │ -// ready └─ No ─┐ ┌─────── Yes ──┘ -// │ │ │ -// ┌──────┴───────┐ ┌──▼──▼─┐ ┌──────┐ +// └──▲──▲┘ └─────┬───────┘ └────────┬────────┘ +// │ │ │ │ +// │ │ └─ No ─┐ ┌─────── Yes ──┘ +// │ │ │ │ +// │ │ ┌────────▼──▼───────┐ +// │ └──────────No───┤ Node pool exists? │ +// │ └─────────┬─────────┘ +// │ │ +// ready Yes +// │ │ +// ┌──────┴───────┐ ┌───▼───┐ ┌──────┐ // │ disconnected ◄─ disconnected ─┤ ready ├─ down ──► down │ // └──────────────┘ └───▲───┘ └──┬───┘ // │ │ @@ -567,6 +594,8 @@ func (n *Node) UpdateStatus(args *structs.NodeUpdateStatusRequest, reply *struct switch node.Status { case structs.NodeStatusInit: if args.Status == structs.NodeStatusReady { + // Keep node in the initializing status if it has allocations but + // they are not updated. allocs, err := snap.AllocsByNodeTerminal(ws, args.NodeID, false) if err != nil { return fmt.Errorf("failed to query node allocs: %v", err) @@ -574,8 +603,26 @@ func (n *Node) UpdateStatus(args *structs.NodeUpdateStatusRequest, reply *struct allocsUpdated := node.LastAllocUpdateIndex > node.LastMissedHeartbeatIndex if len(allocs) > 0 && !allocsUpdated { + n.logger.Debug("marking node as %s due to outdated allocation information", structs.NodeStatusInit) args.Status = structs.NodeStatusInit } + + // Keep node in the initialing status if it's in a node pool that + // doesn't exist. + pool, err := snap.NodePoolByName(ws, node.NodePool) + if err != nil { + return fmt.Errorf("failed to query node pool: %v", err) + } + if pool == nil { + n.logger.Debug("marking node as %s due to missing node pool", structs.NodeStatusInit) + args.Status = structs.NodeStatusInit + if !node.HasEvent(NodeWaitingForNodePool) { + args.NodeEvent = structs.NewNodeEvent(). + SetSubsystem(structs.NodeEventSubsystemCluster). + SetMessage(NodeWaitingForNodePool). + AddDetail("node_pool", node.NodePool) + } + } } case structs.NodeStatusDisconnected: if args.Status == structs.NodeStatusReady { @@ -585,7 +632,7 @@ func (n *Node) UpdateStatus(args *structs.NodeUpdateStatusRequest, reply *struct // Commit this update via Raft var index uint64 - if node.Status != args.Status { + if node.Status != args.Status || args.NodeEvent != nil { // Attach an event if we are updating the node status to ready when it // is down via a heartbeat if node.Status == structs.NodeStatusDown && args.NodeEvent == nil { diff --git a/nomad/node_endpoint_test.go b/nomad/node_endpoint_test.go index 83e25307d..dc30e400d 100644 --- a/nomad/node_endpoint_test.go +++ b/nomad/node_endpoint_test.go @@ -8,6 +8,7 @@ import ( "errors" "fmt" "net" + "net/rpc" "reflect" "strings" "testing" @@ -28,6 +29,7 @@ import ( vapi "github.com/hashicorp/vault/api" "github.com/kr/pretty" "github.com/shoenig/test/must" + "github.com/shoenig/test/wait" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -300,6 +302,205 @@ func TestClientEndpoint_Register_NodePool(t *testing.T) { } } +func TestClientEndpoint_Register_NodePool_Multiregion(t *testing.T) { + ci.Parallel(t) + + // Helper function to setup client heartbeat. + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + heartbeat := func(ctx context.Context, codec rpc.ClientCodec, req *structs.NodeUpdateStatusRequest) { + ticker := time.NewTicker(100 * time.Millisecond) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + default: + } + + var resp structs.NodeUpdateResponse + msgpackrpc.CallWithCodec(codec, "Node.UpdateStatus", req, &resp) + } + } + + // Create servers in two regions. + s1, rootToken1, cleanupS1 := TestACLServer(t, func(c *Config) { + c.Region = "region-1" + c.AuthoritativeRegion = "region-1" + }) + defer cleanupS1() + codec1 := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + s2, _, cleanupS2 := TestACLServer(t, func(c *Config) { + c.Region = "region-2" + c.AuthoritativeRegion = "region-1" + + // Speed-up replication for testing. + c.ReplicationBackoff = 500 * time.Millisecond + c.ReplicationToken = rootToken1.SecretID + }) + defer cleanupS2() + codec2 := rpcClient(t, s2) + testutil.WaitForLeader(t, s2.RPC) + + // Verify that registering a node with a new node pool in the authoritative + // region creates the node pool. + node1 := mock.Node() + node1.Status = "" + node1.NodePool = "new-pool-region-1" + + // Register node in region-1. + req := &structs.NodeRegisterRequest{ + Node: node1, + WriteRequest: structs.WriteRequest{Region: "region-1"}, + } + var resp structs.GenericResponse + err := msgpackrpc.CallWithCodec(codec1, "Node.Register", req, &resp) + must.NoError(t, err) + + // Setup heartbeat for node in region-1. + go heartbeat(ctx, rpcClient(t, s1), &structs.NodeUpdateStatusRequest{ + NodeID: node1.ID, + Status: structs.NodeStatusReady, + WriteRequest: structs.WriteRequest{Region: "region-1"}, + }) + + // Verify client becomes ready. + must.Wait(t, wait.InitialSuccess( + wait.ErrorFunc(func() error { + n, err := s1.State().NodeByID(nil, node1.ID) + if err != nil { + return err + } + if n.Status != structs.NodeStatusReady { + return fmt.Errorf("expected node to be %s, got %s", structs.NodeStatusReady, n.Status) + } + return nil + }), + wait.Timeout(10*time.Second), + wait.Gap(time.Second), + )) + + // Verify that registering a node with a new node pool in the + // non-authoritative region does not create the node pool and the client is + // kept in the initializing status. + node2 := mock.Node() + node2.Status = "" + node2.NodePool = "new-pool-region-2" + + // Register node in region-2. + req = &structs.NodeRegisterRequest{ + Node: node2, + WriteRequest: structs.WriteRequest{Region: "region-2"}, + } + err = msgpackrpc.CallWithCodec(codec2, "Node.Register", req, &resp) + must.NoError(t, err) + + // Setup heartbeat for node in region-2. + go heartbeat(ctx, rpcClient(t, s2), &structs.NodeUpdateStatusRequest{ + NodeID: node2.ID, + Status: structs.NodeStatusReady, + WriteRequest: structs.WriteRequest{Region: "region-2"}, + }) + + // Verify client is kept at the initializing status and has a node pool + // missing event. + must.Wait(t, wait.InitialSuccess( + wait.ErrorFunc(func() error { + n, err := s2.State().NodeByID(nil, node2.ID) + if err != nil { + return err + } + if !n.HasEvent(NodeWaitingForNodePool) { + return fmt.Errorf("node pool missing event not found:\n%v", n.Events) + } + return nil + }), + wait.Timeout(10*time.Second), + wait.Gap(time.Second), + )) + must.Wait(t, wait.ContinualSuccess( + wait.ErrorFunc(func() error { + n, err := s2.State().NodeByID(nil, node2.ID) + if err != nil { + return err + } + if n.Status != structs.NodeStatusInit { + return fmt.Errorf("expected node to be %s, got %s", structs.NodeStatusInit, n.Status) + } + return nil + }), + wait.Timeout(time.Second), + wait.Gap(time.Second), + )) + + // Federate regions. + TestJoin(t, s1, s2) + + // Verify node pool from authoritative region is replicated. + must.Wait(t, wait.InitialSuccess( + wait.ErrorFunc(func() error { + poolName := node1.NodePool + pool, err := s2.State().NodePoolByName(nil, poolName) + if err != nil { + return err + } + if pool == nil { + return fmt.Errorf("node pool %s not found in region-2", poolName) + } + return nil + }), + wait.Timeout(10*time.Second), + wait.Gap(time.Second), + )) + + // Create node pool for region-2. + nodePoolReq := &structs.NodePoolUpsertRequest{ + NodePools: []*structs.NodePool{{Name: node2.NodePool}}, + WriteRequest: structs.WriteRequest{ + Region: "region-2", + AuthToken: rootToken1.SecretID, + }, + } + var nodePoolResp *structs.GenericResponse + err = msgpackrpc.CallWithCodec(codec2, "NodePool.UpsertNodePools", nodePoolReq, &nodePoolResp) + must.NoError(t, err) + + // Verify node pool exists in both regions and the node in region-2 is now + // ready. + must.Wait(t, wait.InitialSuccess( + wait.ErrorFunc(func() error { + for region, s := range map[string]*state.StateStore{ + "region-1": s1.State(), + "region-2": s2.State(), + } { + poolName := node2.NodePool + pool, err := s.NodePoolByName(nil, poolName) + if err != nil { + return err + } + if pool == nil { + return fmt.Errorf("expected node pool %s to exist in region %s", poolName, region) + } + } + + n, err := s2.State().NodeByID(nil, node2.ID) + if err != nil { + return err + } + if n.Status != structs.NodeStatusReady { + return fmt.Errorf("expected node to be %s, got %s", structs.NodeStatusReady, n.Status) + } + return nil + }), + wait.Timeout(10*time.Second), + wait.Gap(time.Second), + )) +} + // Test the deprecated single node deregistration path func TestClientEndpoint_DeregisterOne(t *testing.T) { ci.Parallel(t) diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 4fe907060..f8f959c2c 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -41,6 +41,15 @@ const ( SortReverse SortOption = true ) +// NodeUpsertOption represents options to configure a NodeUpsert operation. +type NodeUpsertOption uint8 + +const ( + // NodeUpsertWithNodePool indicates that the node pool in the node should + // be created if it doesn't exist. + NodeUpsertWithNodePool NodeUpsertOption = iota +) + const ( // NodeEligibilityEventPlanRejectThreshold is the message used when the node // is set to ineligible due to multiple plan failures. @@ -896,15 +905,17 @@ func (s *StateStore) ScalingEventsByJob(ws memdb.WatchSet, namespace, jobID stri // UpsertNode is used to register a node or update a node definition // This is assumed to be triggered by the client, so we retain the value // of drain/eligibility which is set by the scheduler. -func (s *StateStore) UpsertNode(msgType structs.MessageType, index uint64, node *structs.Node) error { +func (s *StateStore) UpsertNode(msgType structs.MessageType, index uint64, node *structs.Node, opts ...NodeUpsertOption) error { txn := s.db.WriteTxnMsgT(msgType, index) defer txn.Abort() - // Create node pool if necessary. - if node.NodePool != "" { - _, err := s.fetchOrCreateNodePoolTxn(txn, index, node.NodePool) - if err != nil { - return err + for _, opt := range opts { + // Create node pool if necessary. + if opt == NodeUpsertWithNodePool && node.NodePool != "" { + _, err := s.fetchOrCreateNodePoolTxn(txn, index, node.NodePool) + if err != nil { + return err + } } } diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index 2296c1582..2ec12aa00 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -1284,50 +1284,105 @@ func TestStateStore_UpsertNode_NodePool(t *testing.T) { nodeWithoutPoolID := uuid.Generate() testCases := []struct { - name string - nodeID string - poolName string - expectedPool string - expectedNewPool bool + name string + nodeID string + pool string + createPool bool + expectedPool string + expectedPoolExists bool + validateFn func(*testing.T, *structs.Node, *structs.NodePool) }{ { - name: "register new node in existing pool", - nodeID: "", - poolName: devPoolName, - expectedPool: devPoolName, + name: "register new node in new node pool", + nodeID: "", + pool: "new", + createPool: true, + expectedPool: "new", + expectedPoolExists: true, + validateFn: func(t *testing.T, node *structs.Node, pool *structs.NodePool) { + // Verify node pool was created in the same transaction as the + // node registration. + must.Eq(t, pool.CreateIndex, node.ModifyIndex) + }, }, { - name: "register new node in new pool", - nodeID: "", - poolName: "new", - expectedPool: "new", - expectedNewPool: true, + name: "register new node in existing node pool", + nodeID: "", + pool: devPoolName, + expectedPool: devPoolName, + expectedPoolExists: true, + validateFn: func(t *testing.T, node *structs.Node, pool *structs.NodePool) { + // Verify node pool was not modified. + must.NotEq(t, pool.CreateIndex, node.ModifyIndex) + }, }, { - name: "update existing node in existing pool", - nodeID: nodeWithPoolID, - poolName: devPoolName, - expectedPool: devPoolName, + name: "register new node in built-in node pool", + nodeID: "", + pool: structs.NodePoolDefault, + expectedPool: structs.NodePoolDefault, + expectedPoolExists: true, + validateFn: func(t *testing.T, node *structs.Node, pool *structs.NodePool) { + // Verify node pool was not modified. + must.Eq(t, 1, pool.ModifyIndex) + }, }, { - name: "update existing node with new pool", - nodeID: nodeWithPoolID, - poolName: "new", - expectedPool: "new", - expectedNewPool: true, + name: "move existing node to new node pool", + nodeID: nodeWithPoolID, + pool: "new", + createPool: true, + expectedPool: "new", + expectedPoolExists: true, + validateFn: func(t *testing.T, node *structs.Node, pool *structs.NodePool) { + // Verify node pool was created in the same transaction as the + // node was updated. + must.Eq(t, pool.CreateIndex, node.ModifyIndex) + }, }, { - name: "update legacy node with pool", - nodeID: nodeWithoutPoolID, - poolName: devPoolName, - expectedPool: devPoolName, + name: "move existing node to existing node pool", + nodeID: nodeWithPoolID, + pool: devPoolName, + expectedPool: devPoolName, + expectedPoolExists: true, }, { - name: "update legacy node with new pool", - nodeID: nodeWithoutPoolID, - poolName: "new", - expectedPool: "new", - expectedNewPool: true, + name: "move existing node to built-in node pool", + nodeID: nodeWithPoolID, + pool: structs.NodePoolDefault, + expectedPool: structs.NodePoolDefault, + expectedPoolExists: true, + }, + { + name: "update node without pool to new node pool", + nodeID: nodeWithoutPoolID, + pool: "new", + createPool: true, + expectedPool: "new", + expectedPoolExists: true, + }, + { + name: "update node without pool to existing node pool", + nodeID: nodeWithoutPoolID, + pool: devPoolName, + expectedPool: devPoolName, + expectedPoolExists: true, + }, + { + name: "update node without pool with empty string to default", + nodeID: nodeWithoutPoolID, + pool: "", + expectedPool: structs.NodePoolDefault, + expectedPoolExists: true, + }, + { + name: "register new node in new node pool without creating it", + nodeID: "", + pool: "new", + createPool: false, + expectedPool: "new", + expectedPoolExists: false, }, } @@ -1366,26 +1421,34 @@ func TestStateStore_UpsertNode_NodePool(t *testing.T) { default: node = mock.Node() } - node.NodePool = tc.poolName - err = state.UpsertNode(structs.MsgTypeTestSetup, 1003, node) + + node.NodePool = tc.pool + opts := []NodeUpsertOption{} + if tc.createPool { + opts = append(opts, NodeUpsertWithNodePool) + } + err = state.UpsertNode(structs.MsgTypeTestSetup, 1003, node, opts...) must.NoError(t, err) // Verify that node is part of the expected pool. got, err := state.NodeByID(nil, node.ID) must.NoError(t, err) - must.Eq(t, tc.expectedPool, got.NodePool) + must.NotNil(t, got) - // Fech pool. + // Verify node pool exists if requests. pool, err := state.NodePoolByName(nil, tc.expectedPool) must.NoError(t, err) - must.NotNil(t, pool) - - if tc.expectedNewPool { - // Verify that pool was created along with node registration. - must.Eq(t, node.ModifyIndex, pool.CreateIndex) + if tc.expectedPoolExists { + must.NotNil(t, pool) } else { - // Verify that pool was not modified. - must.Less(t, node.ModifyIndex, pool.ModifyIndex) + must.Nil(t, pool) + } + + // Verify node was assigned to node pool. + must.Eq(t, tc.expectedPool, got.NodePool) + + if tc.validateFn != nil { + tc.validateFn(t, got, pool) } }) } diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index f2bb1c648..ecb681a73 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -581,6 +581,11 @@ type WriteMeta struct { type NodeRegisterRequest struct { Node *Node NodeEvent *NodeEvent + + // CreateNodePool is used to indicate that the node's node pool should be + // create along with the node registration if it doesn't exist. + CreateNodePool bool + WriteRequest } @@ -2363,6 +2368,16 @@ func (n *Node) IsInPool(pool string) bool { return pool == NodePoolAll || n.NodePool == pool } +// HasEvent returns true if the node has the given message in its events list. +func (n *Node) HasEvent(msg string) bool { + for _, ev := range n.Events { + if ev.Message == msg { + return true + } + } + return false +} + // Stub returns a summarized version of the node func (n *Node) Stub(fields *NodeStubFields) *NodeListStub { diff --git a/website/content/docs/configuration/client.mdx b/website/content/docs/configuration/client.mdx index a9a287f65..2353a9a6c 100644 --- a/website/content/docs/configuration/client.mdx +++ b/website/content/docs/configuration/client.mdx @@ -88,8 +88,9 @@ client { - `node_pool` `(string: "default")` - Specifies the node pool in which the client is registered. If the node pool does not exist yet, it will be created - automatically when the node registers with the cluster. If not specified the - `default` node pool is used. + automatically if the node registers in the authoritative region. In + non-authoritative regions, the node is kept in the `initializing` status + until the node pool is created and replicated. - `options` ([Options](#options-parameters): nil) - Specifies a key-value mapping of internal configuration for clients, such as for driver