Add client scheduling eligibility to heartbeat (#14483)
This commit is contained in:
parent
3fc7482ecd
commit
e58998e218
|
@ -0,0 +1,3 @@
|
|||
```release-note:bug
|
||||
metrics: Update client `node_scheduling_eligibility` value with server heartbeats.
|
||||
```
|
|
@ -1955,6 +1955,14 @@ func (c *Client) updateNodeStatus() error {
|
|||
}
|
||||
}
|
||||
|
||||
// Check heartbeat response for information about the server-side scheduling
|
||||
// state of this node
|
||||
c.UpdateConfig(func(c *config.Config) {
|
||||
if resp.SchedulingEligibility != "" {
|
||||
c.Node.SchedulingEligibility = resp.SchedulingEligibility
|
||||
}
|
||||
})
|
||||
|
||||
// Update the number of nodes in the cluster so we can adjust our server
|
||||
// rebalance rate.
|
||||
c.servers.SetNumNodes(resp.NumNodes)
|
||||
|
|
|
@ -199,7 +199,7 @@ func (n *Node) Register(args *structs.NodeRegisterRequest, reply *structs.NodeUp
|
|||
|
||||
n.srv.peerLock.RLock()
|
||||
defer n.srv.peerLock.RUnlock()
|
||||
if err := n.constructNodeServerInfoResponse(snap, reply); err != nil {
|
||||
if err := n.constructNodeServerInfoResponse(args.Node.ID, snap, reply); err != nil {
|
||||
n.logger.Error("failed to populate NodeUpdateResponse", "error", err)
|
||||
return err
|
||||
}
|
||||
|
@ -258,7 +258,7 @@ func equalDevices(n1, n2 *structs.Node) bool {
|
|||
}
|
||||
|
||||
// updateNodeUpdateResponse assumes the n.srv.peerLock is held for reading.
|
||||
func (n *Node) constructNodeServerInfoResponse(snap *state.StateSnapshot, reply *structs.NodeUpdateResponse) error {
|
||||
func (n *Node) constructNodeServerInfoResponse(nodeID string, snap *state.StateSnapshot, reply *structs.NodeUpdateResponse) error {
|
||||
reply.LeaderRPCAddr = string(n.srv.raft.Leader())
|
||||
|
||||
// Reply with config information required for future RPC requests
|
||||
|
@ -271,6 +271,10 @@ func (n *Node) constructNodeServerInfoResponse(snap *state.StateSnapshot, reply
|
|||
})
|
||||
}
|
||||
|
||||
// Add ClientStatus information to heartbeat response.
|
||||
node, _ := snap.NodeByID(nil, nodeID)
|
||||
reply.SchedulingEligibility = node.SchedulingEligibility
|
||||
|
||||
// TODO(sean@): Use an indexed node count instead
|
||||
//
|
||||
// Snapshot is used only to iterate over all nodes to create a node
|
||||
|
@ -564,7 +568,7 @@ func (n *Node) UpdateStatus(args *structs.NodeUpdateStatusRequest, reply *struct
|
|||
reply.Index = index
|
||||
n.srv.peerLock.RLock()
|
||||
defer n.srv.peerLock.RUnlock()
|
||||
if err := n.constructNodeServerInfoResponse(snap, reply); err != nil {
|
||||
if err := n.constructNodeServerInfoResponse(node.GetID(), snap, reply); err != nil {
|
||||
n.logger.Error("failed to populate NodeUpdateResponse", "error", err)
|
||||
return err
|
||||
}
|
||||
|
@ -821,7 +825,7 @@ func (n *Node) Evaluate(args *structs.NodeEvaluateRequest, reply *structs.NodeUp
|
|||
|
||||
n.srv.peerLock.RLock()
|
||||
defer n.srv.peerLock.RUnlock()
|
||||
if err := n.constructNodeServerInfoResponse(snap, reply); err != nil {
|
||||
if err := n.constructNodeServerInfoResponse(node.GetID(), snap, reply); err != nil {
|
||||
n.logger.Error("failed to populate NodeUpdateResponse", "error", err)
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -1361,6 +1361,10 @@ type NodeUpdateResponse struct {
|
|||
// region.
|
||||
Servers []*NodeServerInfo
|
||||
|
||||
// SchedulingEligibility is used to inform clients what the server-side
|
||||
// has for their scheduling status during heartbeats.
|
||||
SchedulingEligibility string
|
||||
|
||||
QueryMeta
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue