core: emit node evals only for sys jobs in dc (#12955)
Whenever a node joins the cluster, either for the first time or after being `down`, we emit a evaluation for every system job to ensure all applicable system jobs are running on the node. This patch adds an optimization to skip creating evaluations for system jobs not in the current node's DC. While the scheduler performs the same feasability check, skipping the creation of the evaluation altogether saves disk, network, and memory.
This commit is contained in:
parent
a9a66ad018
commit
f21272065d
|
@ -0,0 +1,3 @@
|
||||||
|
```release-note:improvement
|
||||||
|
core: On node updates skip creating evaluations for jobs not in the node's datacenter.
|
||||||
|
```
|
|
@ -171,7 +171,7 @@ func (n *Node) Register(args *structs.NodeRegisterRequest, reply *structs.NodeUp
|
||||||
|
|
||||||
// Check if we should trigger evaluations
|
// Check if we should trigger evaluations
|
||||||
if shouldCreateNodeEval(originalNode, args.Node) {
|
if shouldCreateNodeEval(originalNode, args.Node) {
|
||||||
evalIDs, evalIndex, err := n.createNodeEvals(args.Node.ID, index)
|
evalIDs, evalIndex, err := n.createNodeEvals(args.Node, index)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
n.logger.Error("eval creation failed", "error", err)
|
n.logger.Error("eval creation failed", "error", err)
|
||||||
return err
|
return err
|
||||||
|
@ -350,15 +350,16 @@ func (n *Node) deregister(args *structs.NodeBatchDeregisterRequest,
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
ws := memdb.NewWatchSet()
|
nodes := make([]*structs.Node, 0, len(args.NodeIDs))
|
||||||
for _, nodeID := range args.NodeIDs {
|
for _, nodeID := range args.NodeIDs {
|
||||||
node, err := snap.NodeByID(ws, nodeID)
|
node, err := snap.NodeByID(nil, nodeID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if node == nil {
|
if node == nil {
|
||||||
return fmt.Errorf("node not found")
|
return fmt.Errorf("node not found")
|
||||||
}
|
}
|
||||||
|
nodes = append(nodes, node)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Commit this update via Raft
|
// Commit this update via Raft
|
||||||
|
@ -368,19 +369,21 @@ func (n *Node) deregister(args *structs.NodeBatchDeregisterRequest,
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, nodeID := range args.NodeIDs {
|
for _, node := range nodes {
|
||||||
|
nodeID := node.ID
|
||||||
|
|
||||||
// Clear the heartbeat timer if any
|
// Clear the heartbeat timer if any
|
||||||
n.srv.clearHeartbeatTimer(nodeID)
|
n.srv.clearHeartbeatTimer(nodeID)
|
||||||
|
|
||||||
// Create the evaluations for this node
|
// Create the evaluations for this node
|
||||||
evalIDs, evalIndex, err := n.createNodeEvals(nodeID, index)
|
evalIDs, evalIndex, err := n.createNodeEvals(node, index)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
n.logger.Error("eval creation failed", "error", err)
|
n.logger.Error("eval creation failed", "error", err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Determine if there are any Vault accessors on the node
|
// Determine if there are any Vault accessors on the node
|
||||||
if accessors, err := snap.VaultAccessorsByNode(ws, nodeID); err != nil {
|
if accessors, err := snap.VaultAccessorsByNode(nil, nodeID); err != nil {
|
||||||
n.logger.Error("looking up vault accessors for node failed", "node_id", nodeID, "error", err)
|
n.logger.Error("looking up vault accessors for node failed", "node_id", nodeID, "error", err)
|
||||||
return err
|
return err
|
||||||
} else if l := len(accessors); l > 0 {
|
} else if l := len(accessors); l > 0 {
|
||||||
|
@ -392,7 +395,7 @@ func (n *Node) deregister(args *structs.NodeBatchDeregisterRequest,
|
||||||
}
|
}
|
||||||
|
|
||||||
// Determine if there are any SI token accessors on the node
|
// Determine if there are any SI token accessors on the node
|
||||||
if accessors, err := snap.SITokenAccessorsByNode(ws, nodeID); err != nil {
|
if accessors, err := snap.SITokenAccessorsByNode(nil, nodeID); err != nil {
|
||||||
n.logger.Error("looking up si accessors for node failed", "node_id", nodeID, "error", err)
|
n.logger.Error("looking up si accessors for node failed", "node_id", nodeID, "error", err)
|
||||||
return err
|
return err
|
||||||
} else if l := len(accessors); l > 0 {
|
} else if l := len(accessors); l > 0 {
|
||||||
|
@ -490,7 +493,7 @@ func (n *Node) UpdateStatus(args *structs.NodeUpdateStatusRequest, reply *struct
|
||||||
// Check if we should trigger evaluations
|
// Check if we should trigger evaluations
|
||||||
if structs.ShouldDrainNode(args.Status) ||
|
if structs.ShouldDrainNode(args.Status) ||
|
||||||
nodeStatusTransitionRequiresEval(args.Status, node.Status) {
|
nodeStatusTransitionRequiresEval(args.Status, node.Status) {
|
||||||
evalIDs, evalIndex, err := n.createNodeEvals(args.NodeID, index)
|
evalIDs, evalIndex, err := n.createNodeEvals(node, index)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
n.logger.Error("eval creation failed", "error", err)
|
n.logger.Error("eval creation failed", "error", err)
|
||||||
return err
|
return err
|
||||||
|
@ -658,7 +661,7 @@ func (n *Node) UpdateDrain(args *structs.NodeUpdateDrainRequest,
|
||||||
// If the node is transitioning to be eligible, create Node evaluations
|
// If the node is transitioning to be eligible, create Node evaluations
|
||||||
// because there may be a System job registered that should be evaluated.
|
// because there may be a System job registered that should be evaluated.
|
||||||
if node.SchedulingEligibility == structs.NodeSchedulingIneligible && args.MarkEligible && args.DrainStrategy == nil {
|
if node.SchedulingEligibility == structs.NodeSchedulingIneligible && args.MarkEligible && args.DrainStrategy == nil {
|
||||||
evalIDs, evalIndex, err := n.createNodeEvals(args.NodeID, index)
|
evalIDs, evalIndex, err := n.createNodeEvals(node, index)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
n.logger.Error("eval creation failed", "error", err)
|
n.logger.Error("eval creation failed", "error", err)
|
||||||
return err
|
return err
|
||||||
|
@ -754,7 +757,7 @@ func (n *Node) UpdateEligibility(args *structs.NodeUpdateEligibilityRequest,
|
||||||
// If the node is transitioning to be eligible, create Node evaluations
|
// If the node is transitioning to be eligible, create Node evaluations
|
||||||
// because there may be a System job registered that should be evaluated.
|
// because there may be a System job registered that should be evaluated.
|
||||||
if node.SchedulingEligibility == structs.NodeSchedulingIneligible && args.Eligibility == structs.NodeSchedulingEligible {
|
if node.SchedulingEligibility == structs.NodeSchedulingIneligible && args.Eligibility == structs.NodeSchedulingEligible {
|
||||||
evalIDs, evalIndex, err := n.createNodeEvals(args.NodeID, index)
|
evalIDs, evalIndex, err := n.createNodeEvals(node, index)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
n.logger.Error("eval creation failed", "error", err)
|
n.logger.Error("eval creation failed", "error", err)
|
||||||
return err
|
return err
|
||||||
|
@ -802,7 +805,7 @@ func (n *Node) Evaluate(args *structs.NodeEvaluateRequest, reply *structs.NodeUp
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create the evaluation
|
// Create the evaluation
|
||||||
evalIDs, evalIndex, err := n.createNodeEvals(args.NodeID, node.ModifyIndex)
|
evalIDs, evalIndex, err := n.createNodeEvals(node, node.ModifyIndex)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
n.logger.Error("eval creation failed", "error", err)
|
n.logger.Error("eval creation failed", "error", err)
|
||||||
return err
|
return err
|
||||||
|
@ -1444,7 +1447,9 @@ func (n *Node) List(args *structs.NodeListRequest,
|
||||||
|
|
||||||
// createNodeEvals is used to create evaluations for each alloc on a node.
|
// createNodeEvals is used to create evaluations for each alloc on a node.
|
||||||
// Each Eval is scoped to a job, so we need to potentially trigger many evals.
|
// Each Eval is scoped to a job, so we need to potentially trigger many evals.
|
||||||
func (n *Node) createNodeEvals(nodeID string, nodeIndex uint64) ([]string, uint64, error) {
|
func (n *Node) createNodeEvals(node *structs.Node, nodeIndex uint64) ([]string, uint64, error) {
|
||||||
|
nodeID := node.ID
|
||||||
|
|
||||||
// Snapshot the state
|
// Snapshot the state
|
||||||
snap, err := n.srv.fsm.State().Snapshot()
|
snap, err := n.srv.fsm.State().Snapshot()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -1452,20 +1457,30 @@ func (n *Node) createNodeEvals(nodeID string, nodeIndex uint64) ([]string, uint6
|
||||||
}
|
}
|
||||||
|
|
||||||
// Find all the allocations for this node
|
// Find all the allocations for this node
|
||||||
ws := memdb.NewWatchSet()
|
allocs, err := snap.AllocsByNode(nil, nodeID)
|
||||||
allocs, err := snap.AllocsByNode(ws, nodeID)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, 0, fmt.Errorf("failed to find allocs for '%s': %v", nodeID, err)
|
return nil, 0, fmt.Errorf("failed to find allocs for '%s': %v", nodeID, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
sysJobsIter, err := snap.JobsByScheduler(ws, "system")
|
sysJobsIter, err := snap.JobsByScheduler(nil, "system")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, 0, fmt.Errorf("failed to find system jobs for '%s': %v", nodeID, err)
|
return nil, 0, fmt.Errorf("failed to find system jobs for '%s': %v", nodeID, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
var sysJobs []*structs.Job
|
var sysJobs []*structs.Job
|
||||||
for job := sysJobsIter.Next(); job != nil; job = sysJobsIter.Next() {
|
for jobI := sysJobsIter.Next(); jobI != nil; jobI = sysJobsIter.Next() {
|
||||||
sysJobs = append(sysJobs, job.(*structs.Job))
|
job := jobI.(*structs.Job)
|
||||||
|
// Avoid creating evals for jobs that don't run in this
|
||||||
|
// datacenter. We could perform an entire feasibility check
|
||||||
|
// here, but datacenter is a good optimization to start with as
|
||||||
|
// datacenter cardinality tends to be low so the check
|
||||||
|
// shouldn't add much work.
|
||||||
|
for _, dc := range job.Datacenters {
|
||||||
|
if dc == node.Datacenter {
|
||||||
|
sysJobs = append(sysJobs, job)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Fast-path if nothing to do
|
// Fast-path if nothing to do
|
||||||
|
|
|
@ -2677,23 +2677,32 @@ func TestClientEndpoint_CreateNodeEvals(t *testing.T) {
|
||||||
s1, cleanupS1 := TestServer(t, nil)
|
s1, cleanupS1 := TestServer(t, nil)
|
||||||
defer cleanupS1()
|
defer cleanupS1()
|
||||||
testutil.WaitForLeader(t, s1.RPC)
|
testutil.WaitForLeader(t, s1.RPC)
|
||||||
|
state := s1.fsm.State()
|
||||||
|
|
||||||
|
idx, err := state.LatestIndex()
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
node := mock.Node()
|
||||||
|
err = state.UpsertNode(structs.MsgTypeTestSetup, idx, node)
|
||||||
|
require.NoError(t, err)
|
||||||
|
idx++
|
||||||
|
|
||||||
// Inject fake evaluations
|
// Inject fake evaluations
|
||||||
alloc := mock.Alloc()
|
alloc := mock.Alloc()
|
||||||
state := s1.fsm.State()
|
alloc.NodeID = node.ID
|
||||||
state.UpsertJobSummary(1, mock.JobSummary(alloc.JobID))
|
state.UpsertJobSummary(1, mock.JobSummary(alloc.JobID))
|
||||||
if err := state.UpsertAllocs(structs.MsgTypeTestSetup, 2, []*structs.Allocation{alloc}); err != nil {
|
require.NoError(t, state.UpsertAllocs(structs.MsgTypeTestSetup, idx, []*structs.Allocation{alloc}))
|
||||||
t.Fatalf("err: %v", err)
|
idx++
|
||||||
}
|
|
||||||
|
|
||||||
// Inject a fake system job.
|
// Inject a fake system job.
|
||||||
job := mock.SystemJob()
|
job := mock.SystemJob()
|
||||||
if err := state.UpsertJob(structs.MsgTypeTestSetup, 3, job); err != nil {
|
if err := state.UpsertJob(structs.MsgTypeTestSetup, idx, job); err != nil {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
|
idx++
|
||||||
|
|
||||||
// Create some evaluations
|
// Create some evaluations
|
||||||
ids, index, err := s1.staticEndpoints.Node.createNodeEvals(alloc.NodeID, 1)
|
ids, index, err := s1.staticEndpoints.Node.createNodeEvals(node, 1)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
|
@ -2790,7 +2799,7 @@ func TestClientEndpoint_CreateNodeEvals_MultipleNSes(t *testing.T) {
|
||||||
idx++
|
idx++
|
||||||
|
|
||||||
// Create some evaluations
|
// Create some evaluations
|
||||||
evalIDs, index, err := s1.staticEndpoints.Node.createNodeEvals(node.ID, 1)
|
evalIDs, index, err := s1.staticEndpoints.Node.createNodeEvals(node, 1)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.NotZero(t, index)
|
require.NotZero(t, index)
|
||||||
require.Len(t, evalIDs, 2)
|
require.Len(t, evalIDs, 2)
|
||||||
|
@ -2815,6 +2824,51 @@ func TestClientEndpoint_CreateNodeEvals_MultipleNSes(t *testing.T) {
|
||||||
require.Equal(t, nsJob.Namespace, otherNSEval.Namespace)
|
require.Equal(t, nsJob.Namespace, otherNSEval.Namespace)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestClientEndpoint_CreateNodeEvals_MultipleDCes asserts that evals are made
|
||||||
|
// only for the DC the node is in.
|
||||||
|
func TestClientEndpoint_CreateNodeEvals_MultipleDCes(t *testing.T) {
|
||||||
|
ci.Parallel(t)
|
||||||
|
|
||||||
|
s1, cleanupS1 := TestServer(t, nil)
|
||||||
|
defer cleanupS1()
|
||||||
|
testutil.WaitForLeader(t, s1.RPC)
|
||||||
|
|
||||||
|
state := s1.fsm.State()
|
||||||
|
|
||||||
|
idx, err := state.LatestIndex()
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
node := mock.Node()
|
||||||
|
node.Datacenter = "test1"
|
||||||
|
err = state.UpsertNode(structs.MsgTypeTestSetup, idx, node)
|
||||||
|
require.NoError(t, err)
|
||||||
|
idx++
|
||||||
|
|
||||||
|
// Inject a fake system job in the same dc
|
||||||
|
defaultJob := mock.SystemJob()
|
||||||
|
defaultJob.Datacenters = []string{"test1", "test2"}
|
||||||
|
err = state.UpsertJob(structs.MsgTypeTestSetup, idx, defaultJob)
|
||||||
|
require.NoError(t, err)
|
||||||
|
idx++
|
||||||
|
|
||||||
|
// Inject a fake system job in a different dc
|
||||||
|
nsJob := mock.SystemJob()
|
||||||
|
nsJob.Datacenters = []string{"test2", "test3"}
|
||||||
|
err = state.UpsertJob(structs.MsgTypeTestSetup, idx, nsJob)
|
||||||
|
require.NoError(t, err)
|
||||||
|
idx++
|
||||||
|
|
||||||
|
// Create evaluations
|
||||||
|
evalIDs, index, err := s1.staticEndpoints.Node.createNodeEvals(node, 1)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NotZero(t, index)
|
||||||
|
require.Len(t, evalIDs, 1)
|
||||||
|
|
||||||
|
eval, err := state.EvalByID(nil, evalIDs[0])
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, defaultJob.ID, eval.JobID)
|
||||||
|
}
|
||||||
|
|
||||||
func TestClientEndpoint_Evaluate(t *testing.T) {
|
func TestClientEndpoint_Evaluate(t *testing.T) {
|
||||||
ci.Parallel(t)
|
ci.Parallel(t)
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue