Consider all system jobs for a new node (#11054)

When a node becomes ready, create an eval for all system jobs across
namespaces.

The previous code uses `job.ID` to deduplicate evals, but that ignores
the job namespace. Thus if there are multiple jobs in different
namespaces sharing the same ID/Name, only one will be considered for
running in the new node. Thus, Nomad may skip running some system jobs
in that node.
This commit is contained in:
Mahmood Ali 2021-08-18 09:50:37 -04:00 committed by GitHub
parent 97966c7a71
commit 84a3522133
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 73 additions and 9 deletions

3
.changelog/11054.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:bug
core: Fixed a bug where system jobs with non-unique IDs may not be placed on new nodes
```

View File

@ -1387,15 +1387,15 @@ func (n *Node) createNodeEvals(nodeID string, nodeIndex uint64) ([]string, uint6
// Create an eval for each JobID affected // Create an eval for each JobID affected
var evals []*structs.Evaluation var evals []*structs.Evaluation
var evalIDs []string var evalIDs []string
jobIDs := make(map[string]struct{}) jobIDs := map[structs.NamespacedID]struct{}{}
now := time.Now().UTC().UnixNano() now := time.Now().UTC().UnixNano()
for _, alloc := range allocs { for _, alloc := range allocs {
// Deduplicate on JobID // Deduplicate on JobID
if _, ok := jobIDs[alloc.JobID]; ok { if _, ok := jobIDs[alloc.JobNamespacedID()]; ok {
continue continue
} }
jobIDs[alloc.JobID] = struct{}{} jobIDs[alloc.JobNamespacedID()] = struct{}{}
// Create a new eval // Create a new eval
eval := &structs.Evaluation{ eval := &structs.Evaluation{
@ -1418,10 +1418,10 @@ func (n *Node) createNodeEvals(nodeID string, nodeIndex uint64) ([]string, uint6
// Create an evaluation for each system job. // Create an evaluation for each system job.
for _, job := range sysJobs { for _, job := range sysJobs {
// Still dedup on JobID as the node may already have the system job. // Still dedup on JobID as the node may already have the system job.
if _, ok := jobIDs[job.ID]; ok { if _, ok := jobIDs[job.NamespacedID()]; ok {
continue continue
} }
jobIDs[job.ID] = struct{}{} jobIDs[job.NamespacedID()] = struct{}{}
// Create a new eval // Create a new eval
eval := &structs.Evaluation{ eval := &structs.Evaluation{

View File

@ -2707,6 +2707,67 @@ func TestClientEndpoint_CreateNodeEvals(t *testing.T) {
} }
} }
// TestClientEndpoint_CreateNodeEvals_MultipleNSes asserts that evals are made
// for all jobs across namespaces
func TestClientEndpoint_CreateNodeEvals_MultipleNSes(t *testing.T) {
t.Parallel()
s1, cleanupS1 := TestServer(t, nil)
defer cleanupS1()
testutil.WaitForLeader(t, s1.RPC)
state := s1.fsm.State()
idx := uint64(3)
ns1 := mock.Namespace()
err := state.UpsertNamespaces(idx, []*structs.Namespace{ns1})
require.NoError(t, err)
idx++
node := mock.Node()
err = state.UpsertNode(structs.MsgTypeTestSetup, idx, node)
require.NoError(t, err)
idx++
// Inject a fake system job.
defaultJob := mock.SystemJob()
err = state.UpsertJob(structs.MsgTypeTestSetup, idx, defaultJob)
require.NoError(t, err)
idx++
nsJob := mock.SystemJob()
nsJob.ID = defaultJob.ID
nsJob.Namespace = ns1.Name
err = state.UpsertJob(structs.MsgTypeTestSetup, idx, nsJob)
require.NoError(t, err)
idx++
// Create some evaluations
evalIDs, index, err := s1.staticEndpoints.Node.createNodeEvals(node.ID, 1)
require.NoError(t, err)
require.NotZero(t, index)
require.Len(t, evalIDs, 2)
byNS := map[string]*structs.Evaluation{}
for _, evalID := range evalIDs {
eval, err := state.EvalByID(nil, evalID)
require.NoError(t, err)
byNS[eval.Namespace] = eval
}
require.Len(t, byNS, 2)
defaultNSEval := byNS[defaultJob.Namespace]
require.NotNil(t, defaultNSEval)
require.Equal(t, defaultJob.ID, defaultNSEval.JobID)
require.Equal(t, defaultJob.Namespace, defaultNSEval.Namespace)
otherNSEval := byNS[nsJob.Namespace]
require.NotNil(t, otherNSEval)
require.Equal(t, nsJob.ID, otherNSEval.JobID)
require.Equal(t, nsJob.Namespace, otherNSEval.Namespace)
}
func TestClientEndpoint_Evaluate(t *testing.T) { func TestClientEndpoint_Evaluate(t *testing.T) {
t.Parallel() t.Parallel()

View File

@ -4081,8 +4081,8 @@ type Job struct {
} }
// NamespacedID returns the namespaced id useful for logging // NamespacedID returns the namespaced id useful for logging
func (j *Job) NamespacedID() *NamespacedID { func (j *Job) NamespacedID() NamespacedID {
return &NamespacedID{ return NamespacedID{
ID: j.ID, ID: j.ID,
Namespace: j.Namespace, Namespace: j.Namespace,
} }

View File

@ -153,7 +153,7 @@ type BinPackIterator struct {
source RankIterator source RankIterator
evict bool evict bool
priority int priority int
jobId *structs.NamespacedID jobId structs.NamespacedID
taskGroup *structs.TaskGroup taskGroup *structs.TaskGroup
memoryOversubscription bool memoryOversubscription bool
scoreFit func(*structs.Node, *structs.ComparableResources) float64 scoreFit func(*structs.Node, *structs.ComparableResources) float64
@ -233,7 +233,7 @@ OUTER:
var allocsToPreempt []*structs.Allocation var allocsToPreempt []*structs.Allocation
// Initialize preemptor with node // Initialize preemptor with node
preemptor := NewPreemptor(iter.priority, iter.ctx, iter.jobId) preemptor := NewPreemptor(iter.priority, iter.ctx, &iter.jobId)
preemptor.SetNode(option.Node) preemptor.SetNode(option.Node)
// Count the number of existing preemptions // Count the number of existing preemptions