node_endpoint raft store then shutdown, test deprecation
This commit is contained in:
parent
4610c70777
commit
09fd05bd8f
|
@ -261,9 +261,15 @@ func (n *Node) Deregister(args *structs.NodeDeregisterRequest, reply *structs.No
|
||||||
}
|
}
|
||||||
|
|
||||||
// Verify the arguments
|
// Verify the arguments
|
||||||
|
var nodeIDs []string
|
||||||
if len(args.NodeIDs) == 0 {
|
if len(args.NodeIDs) == 0 {
|
||||||
|
if args.NodeID == "" {
|
||||||
return fmt.Errorf("missing node IDs for client deregistration")
|
return fmt.Errorf("missing node IDs for client deregistration")
|
||||||
}
|
}
|
||||||
|
nodeIDs = append(nodeIDs, args.NodeID)
|
||||||
|
} else if args.NodeID != "" {
|
||||||
|
return fmt.Errorf("use only NodeIDs, the NodeID field is deprecated")
|
||||||
|
}
|
||||||
|
|
||||||
// Open state handles
|
// Open state handles
|
||||||
snap, err := n.srv.fsm.State().Snapshot()
|
snap, err := n.srv.fsm.State().Snapshot()
|
||||||
|
@ -273,8 +279,8 @@ func (n *Node) Deregister(args *structs.NodeDeregisterRequest, reply *structs.No
|
||||||
|
|
||||||
ws := memdb.NewWatchSet()
|
ws := memdb.NewWatchSet()
|
||||||
|
|
||||||
for _, nodeID := range args.NodeIDs {
|
// Assert that the state contains the nodes
|
||||||
// Look for the node
|
for _, nodeID := range nodeIDs {
|
||||||
node, err := snap.NodeByID(ws, nodeID)
|
node, err := snap.NodeByID(ws, nodeID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("node lookup failed: %s: %v", nodeID, err)
|
return fmt.Errorf("node lookup failed: %s: %v", nodeID, err)
|
||||||
|
@ -282,8 +288,17 @@ func (n *Node) Deregister(args *structs.NodeDeregisterRequest, reply *structs.No
|
||||||
if node == nil {
|
if node == nil {
|
||||||
return fmt.Errorf("node not found: %s", nodeID)
|
return fmt.Errorf("node not found: %s", nodeID)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Clear the heartbeat timer if any
|
// Commit this update to Raft, before we clear the heartbeatTimer so that failure
|
||||||
|
// leaves the node running
|
||||||
|
_, index, err := n.srv.raftApply(structs.NodeDeregisterRequestType, args)
|
||||||
|
if err != nil {
|
||||||
|
n.logger.Error("deregister failed", "error", err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, nodeID := range nodeIDs {
|
||||||
n.srv.clearHeartbeatTimer(nodeID)
|
n.srv.clearHeartbeatTimer(nodeID)
|
||||||
|
|
||||||
// If there are any Vault accessors on the node, revoke them
|
// If there are any Vault accessors on the node, revoke them
|
||||||
|
@ -300,19 +315,8 @@ func (n *Node) Deregister(args *structs.NodeDeregisterRequest, reply *structs.No
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
// Commit this update via Raft
|
|
||||||
_, index, err := n.srv.raftApply(structs.NodeDeregisterRequestType, args)
|
|
||||||
if err != nil {
|
|
||||||
n.logger.Error("deregister failed", "error", err)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Create the evaluations for these nodes
|
// Create the evaluations for these nodes
|
||||||
for _, nodeID := range args.NodeIDs {
|
|
||||||
// QUESTION createNodeEvals opens it's own state and watch handles, does
|
|
||||||
// that break atomicity?
|
|
||||||
evalIDs, evalIndex, err := n.createNodeEvals(nodeID, index)
|
evalIDs, evalIndex, err := n.createNodeEvals(nodeID, index)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
n.logger.Error("eval creation failed", "error", err)
|
n.logger.Error("eval creation failed", "error", err)
|
||||||
|
|
|
@ -246,6 +246,47 @@ func TestClientEndpoint_Deregister(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestClientEndpoint_DeregisterDeprecatedField(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
s1 := TestServer(t, nil)
|
||||||
|
defer s1.Shutdown()
|
||||||
|
codec := rpcClient(t, s1)
|
||||||
|
testutil.WaitForLeader(t, s1.RPC)
|
||||||
|
|
||||||
|
// Create the register request
|
||||||
|
node := mock.Node()
|
||||||
|
reg := &structs.NodeRegisterRequest{
|
||||||
|
Node: node,
|
||||||
|
WriteRequest: structs.WriteRequest{Region: "global"},
|
||||||
|
}
|
||||||
|
|
||||||
|
// Fetch the response
|
||||||
|
var resp structs.GenericResponse
|
||||||
|
require.NoError(t, msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp))
|
||||||
|
|
||||||
|
// Deregister fails, because the DeregisterRequest should use only the batch field
|
||||||
|
dereg := &structs.NodeDeregisterRequest{
|
||||||
|
NodeID: "foo",
|
||||||
|
NodeIDs: []string{node.ID},
|
||||||
|
WriteRequest: structs.WriteRequest{Region: "global"},
|
||||||
|
}
|
||||||
|
require.Error(t, msgpackrpc.CallWithCodec(codec, "Node.Deregister", dereg, &resp))
|
||||||
|
|
||||||
|
// Deregister succeeds using the pre-0.9.3 single node request
|
||||||
|
dereg = &structs.NodeDeregisterRequest{
|
||||||
|
NodeID: node.ID,
|
||||||
|
WriteRequest: structs.WriteRequest{Region: "global"},
|
||||||
|
}
|
||||||
|
require.NoError(t, msgpackrpc.CallWithCodec(codec, "Node.Deregister", dereg, &resp))
|
||||||
|
|
||||||
|
// Check for the node in the FSM
|
||||||
|
state := s1.fsm.State()
|
||||||
|
ws := memdb.NewWatchSet()
|
||||||
|
out, err := state.NodeByID(ws, node.ID)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Nil(t, out)
|
||||||
|
}
|
||||||
|
|
||||||
func TestClientEndpoint_Deregister_ACL(t *testing.T) {
|
func TestClientEndpoint_Deregister_ACL(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
s1, root := TestACLServer(t, nil)
|
s1, root := TestACLServer(t, nil)
|
||||||
|
|
Loading…
Reference in New Issue