heartbeat: Handle transitioning from disconnected to down (#12559)
This commit is contained in:
parent
0891218ee9
commit
4d3a0aae6d
|
@ -152,6 +152,8 @@ func (h *nodeHeartbeater) invalidateHeartbeat(id string) {
|
|||
|
||||
h.logger.Warn("node TTL expired", "node_id", id)
|
||||
|
||||
canDisconnect, hasPendingReconnects := h.disconnectState(id)
|
||||
|
||||
// Make a request to update the node status
|
||||
req := structs.NodeUpdateStatusRequest{
|
||||
NodeID: id,
|
||||
|
@ -162,7 +164,7 @@ func (h *nodeHeartbeater) invalidateHeartbeat(id string) {
|
|||
},
|
||||
}
|
||||
|
||||
if h.shouldDisconnect(id) {
|
||||
if canDisconnect && hasPendingReconnects {
|
||||
req.Status = structs.NodeStatusDisconnected
|
||||
}
|
||||
|
||||
|
@ -172,21 +174,46 @@ func (h *nodeHeartbeater) invalidateHeartbeat(id string) {
|
|||
}
|
||||
}
|
||||
|
||||
func (h *nodeHeartbeater) shouldDisconnect(id string) bool {
|
||||
func (h *nodeHeartbeater) disconnectState(id string) (bool, bool) {
|
||||
node, err := h.State().NodeByID(nil, id)
|
||||
if err != nil {
|
||||
h.logger.Error("error retrieving node by id", "error", err)
|
||||
return false, false
|
||||
}
|
||||
|
||||
// Exit if this is the node already in a state other than ready.
|
||||
if node.Status != structs.NodeStatusReady {
|
||||
return false, false
|
||||
}
|
||||
|
||||
allocs, err := h.State().AllocsByNode(nil, id)
|
||||
if err != nil {
|
||||
h.logger.Error("error retrieving allocs by node", "error", err)
|
||||
return false
|
||||
return false, false
|
||||
}
|
||||
|
||||
now := time.Now().UTC()
|
||||
// Check if the node has any allocs that are configured with max_client_disconnect,
|
||||
// that are past the disconnect window, and if so, whether it has at least one
|
||||
// alloc that isn't yet expired.
|
||||
nodeCanDisconnect := false
|
||||
for _, alloc := range allocs {
|
||||
if alloc.DisconnectTimeout(now).After(now) {
|
||||
return true
|
||||
allocCanDisconnect := alloc.DisconnectTimeout(now).After(now)
|
||||
// Only process this until we find that at least one alloc is configured
|
||||
// with max_client_disconnect.
|
||||
if !nodeCanDisconnect && allocCanDisconnect {
|
||||
nodeCanDisconnect = true
|
||||
}
|
||||
// Only process this until we find one that we want to run and has not
|
||||
// yet expired.
|
||||
if allocCanDisconnect &&
|
||||
alloc.DesiredStatus == structs.AllocDesiredStatusRun &&
|
||||
!alloc.Expired(now) {
|
||||
return true, true
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
return nodeCanDisconnect, false
|
||||
}
|
||||
|
||||
// clearHeartbeatTimer is used to clear the heartbeat time for
|
||||
|
|
|
@ -8,6 +8,7 @@ import (
|
|||
memdb "github.com/hashicorp/go-memdb"
|
||||
msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
|
||||
"github.com/hashicorp/nomad/ci"
|
||||
"github.com/hashicorp/nomad/helper"
|
||||
"github.com/hashicorp/nomad/nomad/mock"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
"github.com/hashicorp/nomad/testutil"
|
||||
|
@ -285,3 +286,71 @@ func TestHeartbeat_Server_HeartbeatTTL_Failover(t *testing.T) {
|
|||
t.Fatalf("err: %s", err)
|
||||
})
|
||||
}
|
||||
|
||||
func TestHeartbeat_InvalidateHeartbeat_DisconnectedClient(t *testing.T) {
|
||||
ci.Parallel(t)
|
||||
|
||||
type testCase struct {
|
||||
name string
|
||||
now time.Time
|
||||
maxClientDisconnect *time.Duration
|
||||
expectedNodeStatus string
|
||||
}
|
||||
|
||||
testCases := []testCase{
|
||||
{
|
||||
name: "has-pending-reconnects",
|
||||
now: time.Now().UTC(),
|
||||
maxClientDisconnect: helper.TimeToPtr(5 * time.Second),
|
||||
expectedNodeStatus: structs.NodeStatusDisconnected,
|
||||
},
|
||||
{
|
||||
name: "has-expired-reconnects",
|
||||
maxClientDisconnect: helper.TimeToPtr(5 * time.Second),
|
||||
now: time.Now().UTC().Add(-10 * time.Second),
|
||||
expectedNodeStatus: structs.NodeStatusDown,
|
||||
},
|
||||
{
|
||||
name: "has-expired-reconnects-equal-timestamp",
|
||||
maxClientDisconnect: helper.TimeToPtr(5 * time.Second),
|
||||
now: time.Now().UTC().Add(-5 * time.Second),
|
||||
expectedNodeStatus: structs.NodeStatusDown,
|
||||
},
|
||||
{
|
||||
name: "has-no-reconnects",
|
||||
now: time.Now().UTC(),
|
||||
maxClientDisconnect: nil,
|
||||
expectedNodeStatus: structs.NodeStatusDown,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
s1, cleanupS1 := TestServer(t, nil)
|
||||
defer cleanupS1()
|
||||
testutil.WaitForLeader(t, s1.RPC)
|
||||
|
||||
// Create a node
|
||||
node := mock.Node()
|
||||
state := s1.fsm.State()
|
||||
require.NoError(t, state.UpsertNode(structs.MsgTypeTestSetup, 1, node))
|
||||
|
||||
alloc := mock.Alloc()
|
||||
alloc.NodeID = node.ID
|
||||
alloc.Job.TaskGroups[0].MaxClientDisconnect = tc.maxClientDisconnect
|
||||
alloc.ClientStatus = structs.AllocClientStatusUnknown
|
||||
alloc.AllocStates = []*structs.AllocState{{
|
||||
Field: structs.AllocStateFieldClientStatus,
|
||||
Value: structs.AllocClientStatusUnknown,
|
||||
Time: tc.now,
|
||||
}}
|
||||
require.NoError(t, state.UpsertAllocs(structs.MsgTypeTestSetup, 2, []*structs.Allocation{alloc}))
|
||||
|
||||
// Trigger status update
|
||||
s1.invalidateHeartbeat(node.ID)
|
||||
out, err := state.NodeByID(nil, node.ID)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, tc.expectedNodeStatus, out.Status)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
@ -217,10 +217,10 @@ func shouldCreateNodeEval(original, updated *structs.Node) bool {
|
|||
}
|
||||
|
||||
if original == nil {
|
||||
return transitionedToReady(updated.Status, structs.NodeStatusInit)
|
||||
return nodeStatusTransitionRequiresEval(updated.Status, structs.NodeStatusInit)
|
||||
}
|
||||
|
||||
if transitionedToReady(updated.Status, original.Status) {
|
||||
if nodeStatusTransitionRequiresEval(updated.Status, original.Status) {
|
||||
return true
|
||||
}
|
||||
|
||||
|
@ -486,8 +486,8 @@ func (n *Node) UpdateStatus(args *structs.NodeUpdateStatusRequest, reply *struct
|
|||
}
|
||||
|
||||
// Check if we should trigger evaluations
|
||||
transitionToReady := transitionedToReady(args.Status, node.Status)
|
||||
if structs.ShouldDrainNode(args.Status) || transitionToReady || args.Status == structs.NodeStatusDisconnected {
|
||||
if structs.ShouldDrainNode(args.Status) ||
|
||||
nodeStatusTransitionRequiresEval(args.Status, node.Status) {
|
||||
evalIDs, evalIndex, err := n.createNodeEvals(args.NodeID, index)
|
||||
if err != nil {
|
||||
n.logger.Error("eval creation failed", "error", err)
|
||||
|
@ -546,9 +546,6 @@ func (n *Node) UpdateStatus(args *structs.NodeUpdateStatusRequest, reply *struct
|
|||
}
|
||||
}
|
||||
|
||||
case structs.NodeStatusDisconnected:
|
||||
n.logger.Trace(fmt.Sprintf("heartbeat reset skipped for disconnected node %q", args.NodeID))
|
||||
|
||||
default:
|
||||
ttl, err := n.srv.resetHeartbeatTimer(args.NodeID)
|
||||
if err != nil {
|
||||
|
@ -570,13 +567,13 @@ func (n *Node) UpdateStatus(args *structs.NodeUpdateStatusRequest, reply *struct
|
|||
return nil
|
||||
}
|
||||
|
||||
// transitionedToReady is a helper that takes a nodes new and old status and
|
||||
// nodeStatusTransitionRequiresEval is a helper that takes a nodes new and old status and
|
||||
// returns whether it has transitioned to ready.
|
||||
func transitionedToReady(newStatus, oldStatus string) bool {
|
||||
func nodeStatusTransitionRequiresEval(newStatus, oldStatus string) bool {
|
||||
initToReady := oldStatus == structs.NodeStatusInit && newStatus == structs.NodeStatusReady
|
||||
terminalToReady := oldStatus == structs.NodeStatusDown && newStatus == structs.NodeStatusReady
|
||||
disconnectedToReady := oldStatus == structs.NodeStatusDisconnected && newStatus == structs.NodeStatusReady
|
||||
return initToReady || terminalToReady || disconnectedToReady
|
||||
disconnectedToOther := oldStatus == structs.NodeStatusDisconnected && newStatus != structs.NodeStatusDisconnected
|
||||
return initToReady || terminalToReady || disconnectedToOther
|
||||
}
|
||||
|
||||
// UpdateDrain is used to update the drain mode of a client node
|
||||
|
|
Loading…
Reference in New Issue