Merge pull request #5784 from hashicorp/b-batch-node-dereg

batch node deregistration
This commit is contained in:
Lang Martin 2019-07-10 14:24:54 -04:00 committed by GitHub
commit b8b45711f3
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
13 changed files with 273 additions and 114 deletions

View file

@ -2,6 +2,7 @@
IMPROVEMENTS:
* core: Removed deprecated upgrade path code pertaining to older versions of Nomad [[GH-5894](https://github.com/hashicorp/nomad/issues/5894)]
* core: Deregister nodes in batches rather than one at a time [[GH-5784](https://github.com/hashicorp/nomad/pull/5784)
* client: Improved task event display message to include kill time out [[GH-5943](https://github.com/hashicorp/nomad/issues/5943)]
* api: Used region from job hcl when not provided as query parameter in job registration and plan endpoints [[GH-5664](https://github.com/hashicorp/nomad/pull/5664)]
* api: Inferred content type of file in alloc filesystem stat endpoint [[GH-5907](https://github.com/hashicorp/nomad/issues/5907)]

View file

@ -0,0 +1,29 @@
# New RPC Endpoint Checklist
Prefer adding a new message to changing any existing RPC messages.
## Code
* [ ] `Request` struct and `*RequestType` constant in
`nomad/structs/structs.go`. Append the constant, old constant
values must remain unchanged
* [ ] In `nomad/fsm.go`, add a dispatch case to the switch statement in `Apply`
* `*nomadFSM` method to decode the request and call the state method
* [ ] State method for modifying objects in a `Txn` in `nomad/state/state_store.go`
* `nomad/state/state_store_test.go`
* [ ] Handler for the request in `nomad/foo_endpoint.go`
* RPCs are resolved by matching the method name for bound structs
[net/rpc](https://golang.org/pkg/net/rpc/)
* Wrapper for the HTTP request in `command/agent/foo_endpoint.go`
* Backwards compatibility requires a new endpoint, an upgraded
client or server may be forwarding this request to an old server,
without support for the new RPC
* RPCs triggered by an internal process may not need support
* [ ] `nomad/core_sched.go` sends many RPCs
* `ServersMeetMinimumVersion` asserts that the server cluster is
upgraded, so use this to gaurd sending the new RPC, else send the old RPC
* Version must match the actual release version!
## Docs
* [ ] Changelog

View file

@ -7,6 +7,7 @@ import (
log "github.com/hashicorp/go-hclog"
memdb "github.com/hashicorp/go-memdb"
version "github.com/hashicorp/go-version"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/scheduler"
@ -482,19 +483,42 @@ OUTER:
return nil
}
c.logger.Debug("node GC found eligible nodes", "nodes", len(gcNode))
return c.nodeReap(eval, gcNode)
}
func (c *CoreScheduler) nodeReap(eval *structs.Evaluation, nodeIDs []string) error {
// For old clusters, send single deregistration messages COMPAT(0.11)
minVersionBatchNodeDeregister := version.Must(version.NewVersion("0.9.4"))
if !ServersMeetMinimumVersion(c.srv.Members(), minVersionBatchNodeDeregister, true) {
for _, id := range nodeIDs {
req := structs.NodeDeregisterRequest{
NodeID: id,
WriteRequest: structs.WriteRequest{
Region: c.srv.config.Region,
AuthToken: eval.LeaderACL,
},
}
var resp structs.NodeUpdateResponse
if err := c.srv.RPC("Node.Deregister", &req, &resp); err != nil {
c.logger.Error("node reap failed", "node_id", id, "error", err)
return err
}
}
return nil
}
// Call to the leader to issue the reap
for _, nodeID := range gcNode {
req := structs.NodeDeregisterRequest{
NodeID: nodeID,
for _, ids := range partitionAll(maxIdsPerReap, nodeIDs) {
req := structs.NodeBatchDeregisterRequest{
NodeIDs: ids,
WriteRequest: structs.WriteRequest{
Region: c.srv.config.Region,
AuthToken: eval.LeaderACL,
},
}
var resp structs.NodeUpdateResponse
if err := c.srv.RPC("Node.Deregister", &req, &resp); err != nil {
c.logger.Error("node reap failed", "node_id", nodeID, "error", err)
if err := c.srv.RPC("Node.BatchDeregister", &req, &resp); err != nil {
c.logger.Error("node reap failed", "node_ids", ids, "error", err)
return err
}
}

View file

@ -126,7 +126,7 @@ func TestNodeDrainWatcher_Remove_Nonexistent(t *testing.T) {
require.Equal(n, tracked[n.ID])
// Delete the node
require.Nil(state.DeleteNode(101, n.ID))
require.Nil(state.DeleteNode(101, []string{n.ID}))
testutil.WaitForResult(func() (bool, error) {
return len(m.events()) == 2, nil
}, func(err error) {

View file

@ -249,6 +249,8 @@ func (n *nomadFSM) Apply(log *raft.Log) interface{} {
return n.applyBatchDrainUpdate(buf[1:], log.Index)
case structs.SchedulerConfigRequestType:
return n.applySchedulerConfigUpdate(buf[1:], log.Index)
case structs.NodeBatchDeregisterRequestType:
return n.applyDeregisterNodeBatch(buf[1:], log.Index)
}
// Check enterprise only message types.
@ -296,10 +298,26 @@ func (n *nomadFSM) applyDeregisterNode(buf []byte, index uint64) interface{} {
panic(fmt.Errorf("failed to decode request: %v", err))
}
if err := n.state.DeleteNode(index, req.NodeID); err != nil {
if err := n.state.DeleteNode(index, []string{req.NodeID}); err != nil {
n.logger.Error("DeleteNode failed", "error", err)
return err
}
return nil
}
func (n *nomadFSM) applyDeregisterNodeBatch(buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"nomad", "fsm", "batch_deregister_node"}, time.Now())
var req structs.NodeBatchDeregisterRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}
if err := n.state.DeleteNode(index, req.NodeIDs); err != nil {
n.logger.Error("DeleteNode failed", "error", err)
return err
}
return nil
}

View file

@ -224,10 +224,10 @@ func TestFSM_DeregisterNode(t *testing.T) {
t.Fatalf("resp: %v", resp)
}
req2 := structs.NodeDeregisterRequest{
NodeID: node.ID,
req2 := structs.NodeBatchDeregisterRequest{
NodeIDs: []string{node.ID},
}
buf, err = structs.Encode(structs.NodeDeregisterRequestType, req2)
buf, err = structs.Encode(structs.NodeBatchDeregisterRequestType, req2)
if err != nil {
t.Fatalf("err: %v", err)
}

View file

@ -253,17 +253,49 @@ func (n *Node) Deregister(args *structs.NodeDeregisterRequest, reply *structs.No
}
defer metrics.MeasureSince([]string{"nomad", "client", "deregister"}, time.Now())
// Check node permissions
if args.NodeID == "" {
return fmt.Errorf("missing node ID for client deregistration")
}
// deregister takes a batch
repack := &structs.NodeBatchDeregisterRequest{
NodeIDs: []string{args.NodeID},
WriteRequest: args.WriteRequest,
}
return n.deregister(repack, reply, func() (interface{}, uint64, error) {
return n.srv.raftApply(structs.NodeDeregisterRequestType, args)
})
}
// BatchDeregister is used to remove client nodes from the cluster.
func (n *Node) BatchDeregister(args *structs.NodeBatchDeregisterRequest, reply *structs.NodeUpdateResponse) error {
if done, err := n.srv.forward("Node.BatchDeregister", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"nomad", "client", "batch_deregister"}, time.Now())
if len(args.NodeIDs) == 0 {
return fmt.Errorf("missing node IDs for client deregistration")
}
return n.deregister(args, reply, func() (interface{}, uint64, error) {
return n.srv.raftApply(structs.NodeBatchDeregisterRequestType, args)
})
}
// deregister takes a raftMessage closure, to support both Deregister and BatchDeregister
func (n *Node) deregister(args *structs.NodeBatchDeregisterRequest,
reply *structs.NodeUpdateResponse,
raftApplyFn func() (interface{}, uint64, error),
) error {
// Check request permissions
if aclObj, err := n.srv.ResolveToken(args.AuthToken); err != nil {
return err
} else if aclObj != nil && !aclObj.AllowNodeWrite() {
return structs.ErrPermissionDenied
}
// Verify the arguments
if args.NodeID == "" {
return fmt.Errorf("missing node ID for client deregistration")
}
// Look for the node
snap, err := n.srv.fsm.State().Snapshot()
if err != nil {
@ -271,49 +303,56 @@ func (n *Node) Deregister(args *structs.NodeDeregisterRequest, reply *structs.No
}
ws := memdb.NewWatchSet()
node, err := snap.NodeByID(ws, args.NodeID)
if err != nil {
return err
}
if node == nil {
return fmt.Errorf("node not found")
}
// Commit this update via Raft
_, index, err := n.srv.raftApply(structs.NodeDeregisterRequestType, args)
if err != nil {
n.logger.Error("deregister failed", "error", err)
return err
}
// Clear the heartbeat timer if any
n.srv.clearHeartbeatTimer(args.NodeID)
// Create the evaluations for this node
evalIDs, evalIndex, err := n.createNodeEvals(args.NodeID, index)
if err != nil {
n.logger.Error("eval creation failed", "error", err)
return err
}
// Determine if there are any Vault accessors on the node
accessors, err := snap.VaultAccessorsByNode(ws, args.NodeID)
if err != nil {
n.logger.Error("looking up accessors for node failed", "node_id", args.NodeID, "error", err)
return err
}
if l := len(accessors); l != 0 {
n.logger.Debug("revoking accessors on node due to deregister", "num_accessors", l, "node_id", args.NodeID)
if err := n.srv.vault.RevokeTokens(context.Background(), accessors, true); err != nil {
n.logger.Error("revoking accessors for node failed", "node_id", args.NodeID, "error", err)
for _, nodeID := range args.NodeIDs {
node, err := snap.NodeByID(ws, nodeID)
if err != nil {
return err
}
if node == nil {
return fmt.Errorf("node not found")
}
}
// Commit this update via Raft
_, index, err := raftApplyFn()
if err != nil {
n.logger.Error("raft message failed", "error", err)
return err
}
for _, nodeID := range args.NodeIDs {
// Clear the heartbeat timer if any
n.srv.clearHeartbeatTimer(nodeID)
// Create the evaluations for this node
evalIDs, evalIndex, err := n.createNodeEvals(nodeID, index)
if err != nil {
n.logger.Error("eval creation failed", "error", err)
return err
}
// Determine if there are any Vault accessors on the node
accessors, err := snap.VaultAccessorsByNode(ws, nodeID)
if err != nil {
n.logger.Error("looking up accessors for node failed", "node_id", nodeID, "error", err)
return err
}
if l := len(accessors); l != 0 {
n.logger.Debug("revoking accessors on node due to deregister", "num_accessors", l, "node_id", nodeID)
if err := n.srv.vault.RevokeTokens(context.Background(), accessors, true); err != nil {
n.logger.Error("revoking accessors for node failed", "node_id", nodeID, "error", err)
return err
}
}
reply.EvalIDs = append(reply.EvalIDs, evalIDs...)
// Set the reply eval create index just the first time
if reply.EvalCreateIndex == 0 {
reply.EvalCreateIndex = evalIndex
}
}
// Setup the reply
reply.EvalIDs = evalIDs
reply.EvalCreateIndex = evalIndex
reply.NodeModifyIndex = index
reply.Index = index
return nil

View file

@ -201,7 +201,8 @@ func TestClientEndpoint_Register_SecretMismatch(t *testing.T) {
}
}
func TestClientEndpoint_Deregister(t *testing.T) {
// Test the deprecated single node deregistration path
func TestClientEndpoint_DeregisterOne(t *testing.T) {
t.Parallel()
s1 := TestServer(t, nil)
defer s1.Shutdown()
@ -269,18 +270,18 @@ func TestClientEndpoint_Deregister_ACL(t *testing.T) {
invalidToken := mock.CreatePolicyAndToken(t, state, 1003, "test-invalid", mock.NodePolicy(acl.PolicyRead))
// Deregister without any token and expect it to fail
dereg := &structs.NodeDeregisterRequest{
NodeID: node.ID,
dereg := &structs.NodeBatchDeregisterRequest{
NodeIDs: []string{node.ID},
WriteRequest: structs.WriteRequest{Region: "global"},
}
var resp structs.GenericResponse
if err := msgpackrpc.CallWithCodec(codec, "Node.Deregister", dereg, &resp); err == nil {
if err := msgpackrpc.CallWithCodec(codec, "Node.BatchDeregister", dereg, &resp); err == nil {
t.Fatalf("node de-register succeeded")
}
// Deregister with a valid token
dereg.AuthToken = validToken.SecretID
if err := msgpackrpc.CallWithCodec(codec, "Node.Deregister", dereg, &resp); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "Node.BatchDeregister", dereg, &resp); err != nil {
t.Fatalf("err: %v", err)
}
@ -295,18 +296,18 @@ func TestClientEndpoint_Deregister_ACL(t *testing.T) {
}
// Deregister with an invalid token.
dereg1 := &structs.NodeDeregisterRequest{
NodeID: node1.ID,
dereg1 := &structs.NodeBatchDeregisterRequest{
NodeIDs: []string{node1.ID},
WriteRequest: structs.WriteRequest{Region: "global"},
}
dereg1.AuthToken = invalidToken.SecretID
if err := msgpackrpc.CallWithCodec(codec, "Node.Deregister", dereg1, &resp); err == nil {
if err := msgpackrpc.CallWithCodec(codec, "Node.BatchDeregister", dereg1, &resp); err == nil {
t.Fatalf("rpc should not have succeeded")
}
// Try with a root token
dereg1.AuthToken = root.SecretID
if err := msgpackrpc.CallWithCodec(codec, "Node.Deregister", dereg1, &resp); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "Node.BatchDeregister", dereg1, &resp); err != nil {
t.Fatalf("err: %v", err)
}
}
@ -344,12 +345,12 @@ func TestClientEndpoint_Deregister_Vault(t *testing.T) {
state.UpsertVaultAccessor(100, []*structs.VaultAccessor{va1, va2})
// Deregister
dereg := &structs.NodeDeregisterRequest{
NodeID: node.ID,
dereg := &structs.NodeBatchDeregisterRequest{
NodeIDs: []string{node.ID},
WriteRequest: structs.WriteRequest{Region: "global"},
}
var resp2 structs.GenericResponse
if err := msgpackrpc.CallWithCodec(codec, "Node.Deregister", dereg, &resp2); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "Node.BatchDeregister", dereg, &resp2); err != nil {
t.Fatalf("err: %v", err)
}
if resp2.Index == 0 {
@ -1447,7 +1448,7 @@ func TestClientEndpoint_GetNode_Blocking(t *testing.T) {
// Node delete triggers watches
time.AfterFunc(100*time.Millisecond, func() {
if err := state.DeleteNode(400, node2.ID); err != nil {
if err := state.DeleteNode(400, []string{node2.ID}); err != nil {
t.Fatalf("err: %v", err)
}
})
@ -2714,7 +2715,7 @@ func TestClientEndpoint_ListNodes_Blocking(t *testing.T) {
// Node delete triggers watches.
time.AfterFunc(100*time.Millisecond, func() {
errCh <- state.DeleteNode(50, node.ID)
errCh <- state.DeleteNode(50, []string{node.ID})
})
req.MinQueryIndex = 45

View file

@ -677,24 +677,30 @@ func (s *StateStore) UpsertNode(index uint64, node *structs.Node) error {
return nil
}
// DeleteNode is used to deregister a node
func (s *StateStore) DeleteNode(index uint64, nodeID string) error {
// DeleteNode deregisters a batch of nodes
func (s *StateStore) DeleteNode(index uint64, nodes []string) error {
if len(nodes) == 0 {
return fmt.Errorf("node ids missing")
}
txn := s.db.Txn(true)
defer txn.Abort()
// Lookup the node
existing, err := txn.First("nodes", "id", nodeID)
if err != nil {
return fmt.Errorf("node lookup failed: %v", err)
}
if existing == nil {
return fmt.Errorf("node not found")
for _, nodeID := range nodes {
existing, err := txn.First("nodes", "id", nodeID)
if err != nil {
return fmt.Errorf("node lookup failed: %s: %v", nodeID, err)
}
if existing == nil {
return fmt.Errorf("node not found: %s", nodeID)
}
// Delete the node
if err := txn.Delete("nodes", existing); err != nil {
return fmt.Errorf("node delete failed: %s: %v", nodeID, err)
}
}
// Delete the node
if err := txn.Delete("nodes", existing); err != nil {
return fmt.Errorf("node delete failed: %v", err)
}
if err := txn.Insert("index", &IndexEntry{"nodes", index}); err != nil {
return fmt.Errorf("index update failed: %v", err)
}

View file

@ -809,49 +809,45 @@ func TestStateStore_UpsertNode_Node(t *testing.T) {
func TestStateStore_DeleteNode_Node(t *testing.T) {
state := testStateStore(t)
node := mock.Node()
err := state.UpsertNode(1000, node)
if err != nil {
t.Fatalf("err: %v", err)
}
// Create and insert two nodes, which we'll delete
node0 := mock.Node()
node1 := mock.Node()
err := state.UpsertNode(1000, node0)
require.NoError(t, err)
err = state.UpsertNode(1001, node1)
require.NoError(t, err)
// Create a watchset so we can test that delete fires the watch
ws := memdb.NewWatchSet()
if _, err := state.NodeByID(ws, node.ID); err != nil {
t.Fatalf("bad: %v", err)
}
err = state.DeleteNode(1001, node.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
// Check that both nodes are not nil
out, err := state.NodeByID(ws, node0.ID)
require.NoError(t, err)
require.NotNil(t, out)
out, err = state.NodeByID(ws, node1.ID)
require.NoError(t, err)
require.NotNil(t, out)
if !watchFired(ws) {
t.Fatalf("bad")
}
// Delete both nodes in a batch, fires the watch
err = state.DeleteNode(1002, []string{node0.ID, node1.ID})
require.NoError(t, err)
require.True(t, watchFired(ws))
// Check that both nodes are nil
ws = memdb.NewWatchSet()
out, err := state.NodeByID(ws, node.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if out != nil {
t.Fatalf("bad: %#v %#v", node, out)
}
out, err = state.NodeByID(ws, node0.ID)
require.NoError(t, err)
require.Nil(t, out)
out, err = state.NodeByID(ws, node1.ID)
require.NoError(t, err)
require.Nil(t, out)
// Ensure that the index is still at 1002, from DeleteNode
index, err := state.Index("nodes")
if err != nil {
t.Fatalf("err: %v", err)
}
if index != 1001 {
t.Fatalf("bad: %d", index)
}
if watchFired(ws) {
t.Fatalf("bad")
}
require.NoError(t, err)
require.Equal(t, uint64(1002), index)
require.False(t, watchFired(ws))
}
func TestStateStore_UpdateNodeStatus_Node(t *testing.T) {

View file

@ -83,6 +83,7 @@ const (
NodeUpdateEligibilityRequestType
BatchNodeUpdateDrainRequestType
SchedulerConfigRequestType
NodeBatchDeregisterRequestType
)
const (
@ -322,6 +323,13 @@ type NodeDeregisterRequest struct {
WriteRequest
}
// NodeBatchDeregisterRequest is used for Node.BatchDeregister endpoint
// to deregister a batch of nodes from being schedulable entities.
type NodeBatchDeregisterRequest struct {
NodeIDs []string
WriteRequest
}
// NodeServerInfo is used to in NodeUpdateResponse to return Nomad server
// information used in RPC server lists.
type NodeServerInfo struct {

View file

@ -196,6 +196,27 @@ func shuffleStrings(list []string) {
}
}
// partitionAll splits a slice of strings into a slice of slices of strings, each with a max
// size of `size`. All entries from the original slice are preserved. The last slice may be
// smaller than `size`. The input slice is unmodified
func partitionAll(size int, xs []string) [][]string {
if size < 1 {
return [][]string{xs}
}
out := [][]string{}
for i := 0; i < len(xs); i += size {
j := i + size
if j > len(xs) {
j = len(xs)
}
out = append(out, xs[i:j])
}
return out
}
// maxUint64 returns the maximum value
func maxUint64(inputs ...uint64) uint64 {
l := len(inputs)

View file

@ -8,6 +8,7 @@ import (
version "github.com/hashicorp/go-version"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/serf/serf"
"github.com/stretchr/testify/require"
)
func TestIsNomadServer(t *testing.T) {
@ -230,6 +231,21 @@ func TestShuffleStrings(t *testing.T) {
}
}
func Test_partitionAll(t *testing.T) {
xs := []string{"a", "b", "c", "d", "e", "f"}
// evenly divisible
require.Equal(t, [][]string{{"a", "b"}, {"c", "d"}, {"e", "f"}}, partitionAll(2, xs))
require.Equal(t, [][]string{{"a", "b", "c"}, {"d", "e", "f"}}, partitionAll(3, xs))
// whole thing fits int the last part
require.Equal(t, [][]string{{"a", "b", "c", "d", "e", "f"}}, partitionAll(7, xs))
// odd remainder
require.Equal(t, [][]string{{"a", "b", "c", "d"}, {"e", "f"}}, partitionAll(4, xs))
// zero size
require.Equal(t, [][]string{{"a", "b", "c", "d", "e", "f"}}, partitionAll(0, xs))
// one size
require.Equal(t, [][]string{{"a"}, {"b"}, {"c"}, {"d"}, {"e"}, {"f"}}, partitionAll(1, xs))
}
func TestMaxUint64(t *testing.T) {
t.Parallel()
if maxUint64(1, 2) != 2 {