From 09fd05bd8fdce405c06e0cd6e390bd1b05dcf20f Mon Sep 17 00:00:00 2001 From: Lang Martin Date: Fri, 7 Jun 2019 11:25:55 -0400 Subject: [PATCH] node_endpoint raft store then shutdown, test deprecation --- nomad/node_endpoint.go | 36 +++++++++++++++++--------------- nomad/node_endpoint_test.go | 41 +++++++++++++++++++++++++++++++++++++ 2 files changed, 61 insertions(+), 16 deletions(-) diff --git a/nomad/node_endpoint.go b/nomad/node_endpoint.go index 7459f83d7..cc0965e49 100644 --- a/nomad/node_endpoint.go +++ b/nomad/node_endpoint.go @@ -261,8 +261,14 @@ func (n *Node) Deregister(args *structs.NodeDeregisterRequest, reply *structs.No } // Verify the arguments + var nodeIDs []string if len(args.NodeIDs) == 0 { - return fmt.Errorf("missing node IDs for client deregistration") + if args.NodeID == "" { + return fmt.Errorf("missing node IDs for client deregistration") + } + nodeIDs = append(nodeIDs, args.NodeID) + } else if args.NodeID != "" { + return fmt.Errorf("use only NodeIDs, the NodeID field is deprecated") } // Open state handles @@ -273,8 +279,8 @@ func (n *Node) Deregister(args *structs.NodeDeregisterRequest, reply *structs.No ws := memdb.NewWatchSet() - for _, nodeID := range args.NodeIDs { - // Look for the node + // Assert that the state contains the nodes + for _, nodeID := range nodeIDs { node, err := snap.NodeByID(ws, nodeID) if err != nil { return fmt.Errorf("node lookup failed: %s: %v", nodeID, err) @@ -282,8 +288,17 @@ func (n *Node) Deregister(args *structs.NodeDeregisterRequest, reply *structs.No if node == nil { return fmt.Errorf("node not found: %s", nodeID) } + } - // Clear the heartbeat timer if any + // Commit this update to Raft, before we clear the heartbeatTimer so that failure + // leaves the node running + _, index, err := n.srv.raftApply(structs.NodeDeregisterRequestType, args) + if err != nil { + n.logger.Error("deregister failed", "error", err) + return err + } + + for _, nodeID := range nodeIDs { n.srv.clearHeartbeatTimer(nodeID) // If there are any Vault accessors on the node, revoke them @@ -300,19 +315,8 @@ func (n *Node) Deregister(args *structs.NodeDeregisterRequest, reply *structs.No return err } } - } - // Commit this update via Raft - _, index, err := n.srv.raftApply(structs.NodeDeregisterRequestType, args) - if err != nil { - n.logger.Error("deregister failed", "error", err) - return err - } - - // Create the evaluations for these nodes - for _, nodeID := range args.NodeIDs { - // QUESTION createNodeEvals opens it's own state and watch handles, does - // that break atomicity? + // Create the evaluations for these nodes evalIDs, evalIndex, err := n.createNodeEvals(nodeID, index) if err != nil { n.logger.Error("eval creation failed", "error", err) diff --git a/nomad/node_endpoint_test.go b/nomad/node_endpoint_test.go index 29ec2cc0f..84642d662 100644 --- a/nomad/node_endpoint_test.go +++ b/nomad/node_endpoint_test.go @@ -246,6 +246,47 @@ func TestClientEndpoint_Deregister(t *testing.T) { } } +func TestClientEndpoint_DeregisterDeprecatedField(t *testing.T) { + t.Parallel() + s1 := TestServer(t, nil) + defer s1.Shutdown() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create the register request + node := mock.Node() + reg := &structs.NodeRegisterRequest{ + Node: node, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + + // Fetch the response + var resp structs.GenericResponse + require.NoError(t, msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp)) + + // Deregister fails, because the DeregisterRequest should use only the batch field + dereg := &structs.NodeDeregisterRequest{ + NodeID: "foo", + NodeIDs: []string{node.ID}, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + require.Error(t, msgpackrpc.CallWithCodec(codec, "Node.Deregister", dereg, &resp)) + + // Deregister succeeds using the pre-0.9.3 single node request + dereg = &structs.NodeDeregisterRequest{ + NodeID: node.ID, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + require.NoError(t, msgpackrpc.CallWithCodec(codec, "Node.Deregister", dereg, &resp)) + + // Check for the node in the FSM + state := s1.fsm.State() + ws := memdb.NewWatchSet() + out, err := state.NodeByID(ws, node.ID) + require.NoError(t, err) + require.Nil(t, out) +} + func TestClientEndpoint_Deregister_ACL(t *testing.T) { t.Parallel() s1, root := TestACLServer(t, nil)