[core] nil check and error handling for client status in heartbeat responses (#17316)
Add a nil check to constructNodeServerInfoResponse to manage an apparent race between deregister and client heartbeats. Fixes #17310
This commit is contained in:
parent
98aa88c739
commit
86e04a4c6c
|
@ -0,0 +1,3 @@
|
||||||
|
```release-note:bug
|
||||||
|
core: Fix panic around client deregistration and pending heartbeats
|
||||||
|
```
|
|
@ -2024,7 +2024,8 @@ func (c *Client) updateNodeStatus() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check heartbeat response for information about the server-side scheduling
|
// Check heartbeat response for information about the server-side scheduling
|
||||||
// state of this node
|
// state of this node. If there are errors on the server side, this will come
|
||||||
|
// back as an empty string.
|
||||||
c.UpdateConfig(func(c *config.Config) {
|
c.UpdateConfig(func(c *config.Config) {
|
||||||
if resp.SchedulingEligibility != "" {
|
if resp.SchedulingEligibility != "" {
|
||||||
c.Node.SchedulingEligibility = resp.SchedulingEligibility
|
c.Node.SchedulingEligibility = resp.SchedulingEligibility
|
||||||
|
|
|
@ -286,7 +286,7 @@ func equalDevices(n1, n2 *structs.Node) bool {
|
||||||
return reflect.DeepEqual(n1.NodeResources.Devices, n2.NodeResources.Devices)
|
return reflect.DeepEqual(n1.NodeResources.Devices, n2.NodeResources.Devices)
|
||||||
}
|
}
|
||||||
|
|
||||||
// updateNodeUpdateResponse assumes the n.srv.peerLock is held for reading.
|
// constructNodeServerInfoResponse assumes the n.srv.peerLock is held for reading.
|
||||||
func (n *Node) constructNodeServerInfoResponse(nodeID string, snap *state.StateSnapshot, reply *structs.NodeUpdateResponse) error {
|
func (n *Node) constructNodeServerInfoResponse(nodeID string, snap *state.StateSnapshot, reply *structs.NodeUpdateResponse) error {
|
||||||
reply.LeaderRPCAddr = string(n.srv.raft.Leader())
|
reply.LeaderRPCAddr = string(n.srv.raft.Leader())
|
||||||
|
|
||||||
|
@ -300,16 +300,30 @@ func (n *Node) constructNodeServerInfoResponse(nodeID string, snap *state.StateS
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ws := memdb.NewWatchSet()
|
||||||
|
|
||||||
// Add ClientStatus information to heartbeat response.
|
// Add ClientStatus information to heartbeat response.
|
||||||
node, _ := snap.NodeByID(nil, nodeID)
|
if node, err := snap.NodeByID(ws, nodeID); err == nil && node != nil {
|
||||||
reply.SchedulingEligibility = node.SchedulingEligibility
|
reply.SchedulingEligibility = node.SchedulingEligibility
|
||||||
|
} else if node == nil {
|
||||||
|
|
||||||
|
// If the node is not found, leave reply.SchedulingEligibility as
|
||||||
|
// the empty string. The response handler in the client treats this
|
||||||
|
// as a no-op. As there is no call to action for an operator, log it
|
||||||
|
// at debug level.
|
||||||
|
n.logger.Debug("constructNodeServerInfoResponse: node not found",
|
||||||
|
"node_id", nodeID)
|
||||||
|
} else {
|
||||||
|
|
||||||
|
// This case is likely only reached via a code error in state store
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
// TODO(sean@): Use an indexed node count instead
|
// TODO(sean@): Use an indexed node count instead
|
||||||
//
|
//
|
||||||
// Snapshot is used only to iterate over all nodes to create a node
|
// Snapshot is used only to iterate over all nodes to create a node
|
||||||
// count to send back to Nomad Clients in their heartbeat so Clients
|
// count to send back to Nomad Clients in their heartbeat so Clients
|
||||||
// can estimate the size of the cluster.
|
// can estimate the size of the cluster.
|
||||||
ws := memdb.NewWatchSet()
|
|
||||||
iter, err := snap.Nodes(ws)
|
iter, err := snap.Nodes(ws)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
for {
|
for {
|
||||||
|
|
|
@ -4434,3 +4434,23 @@ func TestNode_List_PaginationFiltering(t *testing.T) {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestNode_constructNodeServerInfoResponse_MissingNode(t *testing.T) {
|
||||||
|
ci.Parallel(t)
|
||||||
|
|
||||||
|
s, cleanup := TestServer(t, nil)
|
||||||
|
defer cleanup()
|
||||||
|
testutil.WaitForLeader(t, s.RPC)
|
||||||
|
|
||||||
|
// Create a node that isn't a member of the state to force a not found
|
||||||
|
node := mock.Node()
|
||||||
|
var reply structs.NodeUpdateResponse
|
||||||
|
|
||||||
|
nE := NewNodeEndpoint(s, nil)
|
||||||
|
snap, err := s.State().Snapshot()
|
||||||
|
must.NoError(t, err)
|
||||||
|
|
||||||
|
// call constructNodeServerInfoResponse. Before GH #17316 this would panic
|
||||||
|
require.NoError(t, nE.constructNodeServerInfoResponse(node.ID, snap, &reply))
|
||||||
|
must.NotNil(t, &reply)
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue