diff --git a/CHANGELOG.md b/CHANGELOG.md index e04020542..c605789ec 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ IMPROVEMENTS: * core: Removed deprecated upgrade path code pertaining to older versions of Nomad [[GH-5894](https://github.com/hashicorp/nomad/issues/5894)] + * core: Deregister nodes in batches rather than one at a time [[GH-5784](https://github.com/hashicorp/nomad/pull/5784) * client: Improved task event display message to include kill time out [[GH-5943](https://github.com/hashicorp/nomad/issues/5943)] * api: Used region from job hcl when not provided as query parameter in job registration and plan endpoints [[GH-5664](https://github.com/hashicorp/nomad/pull/5664)] * api: Inferred content type of file in alloc filesystem stat endpoint [[GH-5907](https://github.com/hashicorp/nomad/issues/5907)] diff --git a/contributing/checklist-rpc-endpoint.md b/contributing/checklist-rpc-endpoint.md new file mode 100644 index 000000000..53e3578e2 --- /dev/null +++ b/contributing/checklist-rpc-endpoint.md @@ -0,0 +1,29 @@ +# New RPC Endpoint Checklist + +Prefer adding a new message to changing any existing RPC messages. + +## Code + +* [ ] `Request` struct and `*RequestType` constant in + `nomad/structs/structs.go`. Append the constant, old constant + values must remain unchanged +* [ ] In `nomad/fsm.go`, add a dispatch case to the switch statement in `Apply` + * `*nomadFSM` method to decode the request and call the state method +* [ ] State method for modifying objects in a `Txn` in `nomad/state/state_store.go` + * `nomad/state/state_store_test.go` +* [ ] Handler for the request in `nomad/foo_endpoint.go` + * RPCs are resolved by matching the method name for bound structs + [net/rpc](https://golang.org/pkg/net/rpc/) +* Wrapper for the HTTP request in `command/agent/foo_endpoint.go` + * Backwards compatibility requires a new endpoint, an upgraded + client or server may be forwarding this request to an old server, + without support for the new RPC + * RPCs triggered by an internal process may not need support +* [ ] `nomad/core_sched.go` sends many RPCs + * `ServersMeetMinimumVersion` asserts that the server cluster is + upgraded, so use this to gaurd sending the new RPC, else send the old RPC + * Version must match the actual release version! + +## Docs + +* [ ] Changelog diff --git a/nomad/core_sched.go b/nomad/core_sched.go index 3d691d605..1fb7330ea 100644 --- a/nomad/core_sched.go +++ b/nomad/core_sched.go @@ -7,6 +7,7 @@ import ( log "github.com/hashicorp/go-hclog" memdb "github.com/hashicorp/go-memdb" + version "github.com/hashicorp/go-version" "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/scheduler" @@ -482,19 +483,42 @@ OUTER: return nil } c.logger.Debug("node GC found eligible nodes", "nodes", len(gcNode)) + return c.nodeReap(eval, gcNode) +} + +func (c *CoreScheduler) nodeReap(eval *structs.Evaluation, nodeIDs []string) error { + // For old clusters, send single deregistration messages COMPAT(0.11) + minVersionBatchNodeDeregister := version.Must(version.NewVersion("0.9.4")) + if !ServersMeetMinimumVersion(c.srv.Members(), minVersionBatchNodeDeregister, true) { + for _, id := range nodeIDs { + req := structs.NodeDeregisterRequest{ + NodeID: id, + WriteRequest: structs.WriteRequest{ + Region: c.srv.config.Region, + AuthToken: eval.LeaderACL, + }, + } + var resp structs.NodeUpdateResponse + if err := c.srv.RPC("Node.Deregister", &req, &resp); err != nil { + c.logger.Error("node reap failed", "node_id", id, "error", err) + return err + } + } + return nil + } // Call to the leader to issue the reap - for _, nodeID := range gcNode { - req := structs.NodeDeregisterRequest{ - NodeID: nodeID, + for _, ids := range partitionAll(maxIdsPerReap, nodeIDs) { + req := structs.NodeBatchDeregisterRequest{ + NodeIDs: ids, WriteRequest: structs.WriteRequest{ Region: c.srv.config.Region, AuthToken: eval.LeaderACL, }, } var resp structs.NodeUpdateResponse - if err := c.srv.RPC("Node.Deregister", &req, &resp); err != nil { - c.logger.Error("node reap failed", "node_id", nodeID, "error", err) + if err := c.srv.RPC("Node.BatchDeregister", &req, &resp); err != nil { + c.logger.Error("node reap failed", "node_ids", ids, "error", err) return err } } diff --git a/nomad/drainer/watch_nodes_test.go b/nomad/drainer/watch_nodes_test.go index efabefc7c..d29086dce 100644 --- a/nomad/drainer/watch_nodes_test.go +++ b/nomad/drainer/watch_nodes_test.go @@ -126,7 +126,7 @@ func TestNodeDrainWatcher_Remove_Nonexistent(t *testing.T) { require.Equal(n, tracked[n.ID]) // Delete the node - require.Nil(state.DeleteNode(101, n.ID)) + require.Nil(state.DeleteNode(101, []string{n.ID})) testutil.WaitForResult(func() (bool, error) { return len(m.events()) == 2, nil }, func(err error) { diff --git a/nomad/fsm.go b/nomad/fsm.go index 49392d292..50a397e0b 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -249,6 +249,8 @@ func (n *nomadFSM) Apply(log *raft.Log) interface{} { return n.applyBatchDrainUpdate(buf[1:], log.Index) case structs.SchedulerConfigRequestType: return n.applySchedulerConfigUpdate(buf[1:], log.Index) + case structs.NodeBatchDeregisterRequestType: + return n.applyDeregisterNodeBatch(buf[1:], log.Index) } // Check enterprise only message types. @@ -296,10 +298,26 @@ func (n *nomadFSM) applyDeregisterNode(buf []byte, index uint64) interface{} { panic(fmt.Errorf("failed to decode request: %v", err)) } - if err := n.state.DeleteNode(index, req.NodeID); err != nil { + if err := n.state.DeleteNode(index, []string{req.NodeID}); err != nil { n.logger.Error("DeleteNode failed", "error", err) return err } + + return nil +} + +func (n *nomadFSM) applyDeregisterNodeBatch(buf []byte, index uint64) interface{} { + defer metrics.MeasureSince([]string{"nomad", "fsm", "batch_deregister_node"}, time.Now()) + var req structs.NodeBatchDeregisterRequest + if err := structs.Decode(buf, &req); err != nil { + panic(fmt.Errorf("failed to decode request: %v", err)) + } + + if err := n.state.DeleteNode(index, req.NodeIDs); err != nil { + n.logger.Error("DeleteNode failed", "error", err) + return err + } + return nil } diff --git a/nomad/fsm_test.go b/nomad/fsm_test.go index ef79d1461..d75ba1ee3 100644 --- a/nomad/fsm_test.go +++ b/nomad/fsm_test.go @@ -224,10 +224,10 @@ func TestFSM_DeregisterNode(t *testing.T) { t.Fatalf("resp: %v", resp) } - req2 := structs.NodeDeregisterRequest{ - NodeID: node.ID, + req2 := structs.NodeBatchDeregisterRequest{ + NodeIDs: []string{node.ID}, } - buf, err = structs.Encode(structs.NodeDeregisterRequestType, req2) + buf, err = structs.Encode(structs.NodeBatchDeregisterRequestType, req2) if err != nil { t.Fatalf("err: %v", err) } diff --git a/nomad/node_endpoint.go b/nomad/node_endpoint.go index 1fed74bbc..b5c96c7cf 100644 --- a/nomad/node_endpoint.go +++ b/nomad/node_endpoint.go @@ -253,17 +253,49 @@ func (n *Node) Deregister(args *structs.NodeDeregisterRequest, reply *structs.No } defer metrics.MeasureSince([]string{"nomad", "client", "deregister"}, time.Now()) - // Check node permissions + if args.NodeID == "" { + return fmt.Errorf("missing node ID for client deregistration") + } + + // deregister takes a batch + repack := &structs.NodeBatchDeregisterRequest{ + NodeIDs: []string{args.NodeID}, + WriteRequest: args.WriteRequest, + } + + return n.deregister(repack, reply, func() (interface{}, uint64, error) { + return n.srv.raftApply(structs.NodeDeregisterRequestType, args) + }) +} + +// BatchDeregister is used to remove client nodes from the cluster. +func (n *Node) BatchDeregister(args *structs.NodeBatchDeregisterRequest, reply *structs.NodeUpdateResponse) error { + if done, err := n.srv.forward("Node.BatchDeregister", args, args, reply); done { + return err + } + defer metrics.MeasureSince([]string{"nomad", "client", "batch_deregister"}, time.Now()) + + if len(args.NodeIDs) == 0 { + return fmt.Errorf("missing node IDs for client deregistration") + } + + return n.deregister(args, reply, func() (interface{}, uint64, error) { + return n.srv.raftApply(structs.NodeBatchDeregisterRequestType, args) + }) +} + +// deregister takes a raftMessage closure, to support both Deregister and BatchDeregister +func (n *Node) deregister(args *structs.NodeBatchDeregisterRequest, + reply *structs.NodeUpdateResponse, + raftApplyFn func() (interface{}, uint64, error), +) error { + // Check request permissions if aclObj, err := n.srv.ResolveToken(args.AuthToken); err != nil { return err } else if aclObj != nil && !aclObj.AllowNodeWrite() { return structs.ErrPermissionDenied } - // Verify the arguments - if args.NodeID == "" { - return fmt.Errorf("missing node ID for client deregistration") - } // Look for the node snap, err := n.srv.fsm.State().Snapshot() if err != nil { @@ -271,49 +303,56 @@ func (n *Node) Deregister(args *structs.NodeDeregisterRequest, reply *structs.No } ws := memdb.NewWatchSet() - node, err := snap.NodeByID(ws, args.NodeID) - if err != nil { - return err - } - if node == nil { - return fmt.Errorf("node not found") - } - - // Commit this update via Raft - _, index, err := n.srv.raftApply(structs.NodeDeregisterRequestType, args) - if err != nil { - n.logger.Error("deregister failed", "error", err) - return err - } - - // Clear the heartbeat timer if any - n.srv.clearHeartbeatTimer(args.NodeID) - - // Create the evaluations for this node - evalIDs, evalIndex, err := n.createNodeEvals(args.NodeID, index) - if err != nil { - n.logger.Error("eval creation failed", "error", err) - return err - } - - // Determine if there are any Vault accessors on the node - accessors, err := snap.VaultAccessorsByNode(ws, args.NodeID) - if err != nil { - n.logger.Error("looking up accessors for node failed", "node_id", args.NodeID, "error", err) - return err - } - - if l := len(accessors); l != 0 { - n.logger.Debug("revoking accessors on node due to deregister", "num_accessors", l, "node_id", args.NodeID) - if err := n.srv.vault.RevokeTokens(context.Background(), accessors, true); err != nil { - n.logger.Error("revoking accessors for node failed", "node_id", args.NodeID, "error", err) + for _, nodeID := range args.NodeIDs { + node, err := snap.NodeByID(ws, nodeID) + if err != nil { return err } + if node == nil { + return fmt.Errorf("node not found") + } + } + + // Commit this update via Raft + _, index, err := raftApplyFn() + if err != nil { + n.logger.Error("raft message failed", "error", err) + return err + } + + for _, nodeID := range args.NodeIDs { + // Clear the heartbeat timer if any + n.srv.clearHeartbeatTimer(nodeID) + + // Create the evaluations for this node + evalIDs, evalIndex, err := n.createNodeEvals(nodeID, index) + if err != nil { + n.logger.Error("eval creation failed", "error", err) + return err + } + + // Determine if there are any Vault accessors on the node + accessors, err := snap.VaultAccessorsByNode(ws, nodeID) + if err != nil { + n.logger.Error("looking up accessors for node failed", "node_id", nodeID, "error", err) + return err + } + + if l := len(accessors); l != 0 { + n.logger.Debug("revoking accessors on node due to deregister", "num_accessors", l, "node_id", nodeID) + if err := n.srv.vault.RevokeTokens(context.Background(), accessors, true); err != nil { + n.logger.Error("revoking accessors for node failed", "node_id", nodeID, "error", err) + return err + } + } + + reply.EvalIDs = append(reply.EvalIDs, evalIDs...) + // Set the reply eval create index just the first time + if reply.EvalCreateIndex == 0 { + reply.EvalCreateIndex = evalIndex + } } - // Setup the reply - reply.EvalIDs = evalIDs - reply.EvalCreateIndex = evalIndex reply.NodeModifyIndex = index reply.Index = index return nil diff --git a/nomad/node_endpoint_test.go b/nomad/node_endpoint_test.go index 6b7b90e2f..00e7706f8 100644 --- a/nomad/node_endpoint_test.go +++ b/nomad/node_endpoint_test.go @@ -201,7 +201,8 @@ func TestClientEndpoint_Register_SecretMismatch(t *testing.T) { } } -func TestClientEndpoint_Deregister(t *testing.T) { +// Test the deprecated single node deregistration path +func TestClientEndpoint_DeregisterOne(t *testing.T) { t.Parallel() s1 := TestServer(t, nil) defer s1.Shutdown() @@ -269,18 +270,18 @@ func TestClientEndpoint_Deregister_ACL(t *testing.T) { invalidToken := mock.CreatePolicyAndToken(t, state, 1003, "test-invalid", mock.NodePolicy(acl.PolicyRead)) // Deregister without any token and expect it to fail - dereg := &structs.NodeDeregisterRequest{ - NodeID: node.ID, + dereg := &structs.NodeBatchDeregisterRequest{ + NodeIDs: []string{node.ID}, WriteRequest: structs.WriteRequest{Region: "global"}, } var resp structs.GenericResponse - if err := msgpackrpc.CallWithCodec(codec, "Node.Deregister", dereg, &resp); err == nil { + if err := msgpackrpc.CallWithCodec(codec, "Node.BatchDeregister", dereg, &resp); err == nil { t.Fatalf("node de-register succeeded") } // Deregister with a valid token dereg.AuthToken = validToken.SecretID - if err := msgpackrpc.CallWithCodec(codec, "Node.Deregister", dereg, &resp); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "Node.BatchDeregister", dereg, &resp); err != nil { t.Fatalf("err: %v", err) } @@ -295,18 +296,18 @@ func TestClientEndpoint_Deregister_ACL(t *testing.T) { } // Deregister with an invalid token. - dereg1 := &structs.NodeDeregisterRequest{ - NodeID: node1.ID, + dereg1 := &structs.NodeBatchDeregisterRequest{ + NodeIDs: []string{node1.ID}, WriteRequest: structs.WriteRequest{Region: "global"}, } dereg1.AuthToken = invalidToken.SecretID - if err := msgpackrpc.CallWithCodec(codec, "Node.Deregister", dereg1, &resp); err == nil { + if err := msgpackrpc.CallWithCodec(codec, "Node.BatchDeregister", dereg1, &resp); err == nil { t.Fatalf("rpc should not have succeeded") } // Try with a root token dereg1.AuthToken = root.SecretID - if err := msgpackrpc.CallWithCodec(codec, "Node.Deregister", dereg1, &resp); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "Node.BatchDeregister", dereg1, &resp); err != nil { t.Fatalf("err: %v", err) } } @@ -344,12 +345,12 @@ func TestClientEndpoint_Deregister_Vault(t *testing.T) { state.UpsertVaultAccessor(100, []*structs.VaultAccessor{va1, va2}) // Deregister - dereg := &structs.NodeDeregisterRequest{ - NodeID: node.ID, + dereg := &structs.NodeBatchDeregisterRequest{ + NodeIDs: []string{node.ID}, WriteRequest: structs.WriteRequest{Region: "global"}, } var resp2 structs.GenericResponse - if err := msgpackrpc.CallWithCodec(codec, "Node.Deregister", dereg, &resp2); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "Node.BatchDeregister", dereg, &resp2); err != nil { t.Fatalf("err: %v", err) } if resp2.Index == 0 { @@ -1447,7 +1448,7 @@ func TestClientEndpoint_GetNode_Blocking(t *testing.T) { // Node delete triggers watches time.AfterFunc(100*time.Millisecond, func() { - if err := state.DeleteNode(400, node2.ID); err != nil { + if err := state.DeleteNode(400, []string{node2.ID}); err != nil { t.Fatalf("err: %v", err) } }) @@ -2714,7 +2715,7 @@ func TestClientEndpoint_ListNodes_Blocking(t *testing.T) { // Node delete triggers watches. time.AfterFunc(100*time.Millisecond, func() { - errCh <- state.DeleteNode(50, node.ID) + errCh <- state.DeleteNode(50, []string{node.ID}) }) req.MinQueryIndex = 45 diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 9763d6c12..a384e2b4d 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -677,24 +677,30 @@ func (s *StateStore) UpsertNode(index uint64, node *structs.Node) error { return nil } -// DeleteNode is used to deregister a node -func (s *StateStore) DeleteNode(index uint64, nodeID string) error { +// DeleteNode deregisters a batch of nodes +func (s *StateStore) DeleteNode(index uint64, nodes []string) error { + if len(nodes) == 0 { + return fmt.Errorf("node ids missing") + } + txn := s.db.Txn(true) defer txn.Abort() - // Lookup the node - existing, err := txn.First("nodes", "id", nodeID) - if err != nil { - return fmt.Errorf("node lookup failed: %v", err) - } - if existing == nil { - return fmt.Errorf("node not found") + for _, nodeID := range nodes { + existing, err := txn.First("nodes", "id", nodeID) + if err != nil { + return fmt.Errorf("node lookup failed: %s: %v", nodeID, err) + } + if existing == nil { + return fmt.Errorf("node not found: %s", nodeID) + } + + // Delete the node + if err := txn.Delete("nodes", existing); err != nil { + return fmt.Errorf("node delete failed: %s: %v", nodeID, err) + } } - // Delete the node - if err := txn.Delete("nodes", existing); err != nil { - return fmt.Errorf("node delete failed: %v", err) - } if err := txn.Insert("index", &IndexEntry{"nodes", index}); err != nil { return fmt.Errorf("index update failed: %v", err) } diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index d21e4bbde..68e6d4ab3 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -809,49 +809,45 @@ func TestStateStore_UpsertNode_Node(t *testing.T) { func TestStateStore_DeleteNode_Node(t *testing.T) { state := testStateStore(t) - node := mock.Node() - err := state.UpsertNode(1000, node) - if err != nil { - t.Fatalf("err: %v", err) - } + // Create and insert two nodes, which we'll delete + node0 := mock.Node() + node1 := mock.Node() + err := state.UpsertNode(1000, node0) + require.NoError(t, err) + err = state.UpsertNode(1001, node1) + require.NoError(t, err) // Create a watchset so we can test that delete fires the watch ws := memdb.NewWatchSet() - if _, err := state.NodeByID(ws, node.ID); err != nil { - t.Fatalf("bad: %v", err) - } - err = state.DeleteNode(1001, node.ID) - if err != nil { - t.Fatalf("err: %v", err) - } + // Check that both nodes are not nil + out, err := state.NodeByID(ws, node0.ID) + require.NoError(t, err) + require.NotNil(t, out) + out, err = state.NodeByID(ws, node1.ID) + require.NoError(t, err) + require.NotNil(t, out) - if !watchFired(ws) { - t.Fatalf("bad") - } + // Delete both nodes in a batch, fires the watch + err = state.DeleteNode(1002, []string{node0.ID, node1.ID}) + require.NoError(t, err) + require.True(t, watchFired(ws)) + // Check that both nodes are nil ws = memdb.NewWatchSet() - out, err := state.NodeByID(ws, node.ID) - if err != nil { - t.Fatalf("err: %v", err) - } - - if out != nil { - t.Fatalf("bad: %#v %#v", node, out) - } + out, err = state.NodeByID(ws, node0.ID) + require.NoError(t, err) + require.Nil(t, out) + out, err = state.NodeByID(ws, node1.ID) + require.NoError(t, err) + require.Nil(t, out) + // Ensure that the index is still at 1002, from DeleteNode index, err := state.Index("nodes") - if err != nil { - t.Fatalf("err: %v", err) - } - if index != 1001 { - t.Fatalf("bad: %d", index) - } - - if watchFired(ws) { - t.Fatalf("bad") - } + require.NoError(t, err) + require.Equal(t, uint64(1002), index) + require.False(t, watchFired(ws)) } func TestStateStore_UpdateNodeStatus_Node(t *testing.T) { diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index dee93eee7..c6145e14e 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -83,6 +83,7 @@ const ( NodeUpdateEligibilityRequestType BatchNodeUpdateDrainRequestType SchedulerConfigRequestType + NodeBatchDeregisterRequestType ) const ( @@ -322,6 +323,13 @@ type NodeDeregisterRequest struct { WriteRequest } +// NodeBatchDeregisterRequest is used for Node.BatchDeregister endpoint +// to deregister a batch of nodes from being schedulable entities. +type NodeBatchDeregisterRequest struct { + NodeIDs []string + WriteRequest +} + // NodeServerInfo is used to in NodeUpdateResponse to return Nomad server // information used in RPC server lists. type NodeServerInfo struct { diff --git a/nomad/util.go b/nomad/util.go index ccd4504af..055b39e2e 100644 --- a/nomad/util.go +++ b/nomad/util.go @@ -196,6 +196,27 @@ func shuffleStrings(list []string) { } } +// partitionAll splits a slice of strings into a slice of slices of strings, each with a max +// size of `size`. All entries from the original slice are preserved. The last slice may be +// smaller than `size`. The input slice is unmodified +func partitionAll(size int, xs []string) [][]string { + if size < 1 { + return [][]string{xs} + } + + out := [][]string{} + + for i := 0; i < len(xs); i += size { + j := i + size + if j > len(xs) { + j = len(xs) + } + out = append(out, xs[i:j]) + } + + return out +} + // maxUint64 returns the maximum value func maxUint64(inputs ...uint64) uint64 { l := len(inputs) diff --git a/nomad/util_test.go b/nomad/util_test.go index f216d5e42..b1df2e523 100644 --- a/nomad/util_test.go +++ b/nomad/util_test.go @@ -8,6 +8,7 @@ import ( version "github.com/hashicorp/go-version" "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/serf/serf" + "github.com/stretchr/testify/require" ) func TestIsNomadServer(t *testing.T) { @@ -230,6 +231,21 @@ func TestShuffleStrings(t *testing.T) { } } +func Test_partitionAll(t *testing.T) { + xs := []string{"a", "b", "c", "d", "e", "f"} + // evenly divisible + require.Equal(t, [][]string{{"a", "b"}, {"c", "d"}, {"e", "f"}}, partitionAll(2, xs)) + require.Equal(t, [][]string{{"a", "b", "c"}, {"d", "e", "f"}}, partitionAll(3, xs)) + // whole thing fits int the last part + require.Equal(t, [][]string{{"a", "b", "c", "d", "e", "f"}}, partitionAll(7, xs)) + // odd remainder + require.Equal(t, [][]string{{"a", "b", "c", "d"}, {"e", "f"}}, partitionAll(4, xs)) + // zero size + require.Equal(t, [][]string{{"a", "b", "c", "d", "e", "f"}}, partitionAll(0, xs)) + // one size + require.Equal(t, [][]string{{"a"}, {"b"}, {"c"}, {"d"}, {"e"}, {"f"}}, partitionAll(1, xs)) +} + func TestMaxUint64(t *testing.T) { t.Parallel() if maxUint64(1, 2) != 2 {