diff --git a/nomad/deploymentwatcher/deployment_watcher.go b/nomad/deploymentwatcher/deployment_watcher.go index 6ebbdf158..cca75e5de 100644 --- a/nomad/deploymentwatcher/deployment_watcher.go +++ b/nomad/deploymentwatcher/deployment_watcher.go @@ -226,7 +226,6 @@ func (w *deploymentWatcher) SetAllocHealth( if j != nil { resp.RevertedJobVersion = helper.Uint64ToPtr(j.Version) } - w.setLatestEval(index) return nil } @@ -265,7 +264,6 @@ func (w *deploymentWatcher) PromoteDeployment( resp.EvalCreateIndex = index resp.DeploymentModifyIndex = index resp.Index = index - w.setLatestEval(index) return nil } @@ -297,7 +295,6 @@ func (w *deploymentWatcher) PauseDeployment( } resp.DeploymentModifyIndex = i resp.Index = i - w.setLatestEval(i) return nil } @@ -347,7 +344,6 @@ func (w *deploymentWatcher) FailDeployment( if rollbackJob != nil { resp.RevertedJobVersion = helper.Uint64ToPtr(rollbackJob.Version) } - w.setLatestEval(i) return nil } @@ -490,10 +486,8 @@ FAIL: // Update the status of the deployment to failed and create an evaluation. e := w.getEval() u := w.getDeploymentStatusUpdate(structs.DeploymentStatusFailed, desc) - if index, err := w.upsertDeploymentStatusUpdate(u, e, j); err != nil { + if _, err := w.upsertDeploymentStatusUpdate(u, e, j); err != nil { w.logger.Error("failed to update deployment status", "error", err) - } else { - w.setLatestEval(index) } } @@ -512,7 +506,7 @@ func (w *deploymentWatcher) handleAllocUpdate(allocs []*structs.AllocListStub) ( var res allocUpdateResult // Get the latest evaluation index - latestEval, err := w.latestEvalIndex() + latestEval, blocked, err := w.jobEvalStatus() if err != nil { if err == context.Canceled || w.ctx.Err() == context.Canceled { return res, err @@ -528,19 +522,20 @@ func (w *deploymentWatcher) handleAllocUpdate(allocs []*structs.AllocListStub) ( continue } - // Nothing to do for this allocation - if alloc.DeploymentStatus == nil || alloc.DeploymentStatus.ModifyIndex <= latestEval { - continue - } - // Determine if the update stanza for this group is progress based progressBased := dstate.ProgressDeadline != 0 - // We need to create an eval so the job can progress. - if alloc.DeploymentStatus.IsHealthy() { - res.createEval = true - } else if progressBased && alloc.DeploymentStatus.IsUnhealthy() && deployment.Active() && !alloc.DesiredTransition.ShouldReschedule() { + // Check if the allocation has failed and we need to mark it for allow + // replacements + if progressBased && alloc.DeploymentStatus.IsUnhealthy() && + deployment.Active() && !alloc.DesiredTransition.ShouldReschedule() { res.allowReplacements = append(res.allowReplacements, alloc.ID) + continue + } + + // We need to create an eval so the job can progress. + if !blocked && alloc.DeploymentStatus.IsHealthy() && alloc.DeploymentStatus.ModifyIndex > latestEval { + res.createEval = true } // If the group is using a progress deadline, we don't have to do anything. @@ -685,10 +680,8 @@ func (w *deploymentWatcher) createBatchedUpdate(allowReplacements []string, forI w.l.Unlock() // Create the eval - if index, err := w.createUpdate(replacements, w.getEval()); err != nil { + if _, err := w.createUpdate(replacements, w.getEval()); err != nil { w.logger.Error("failed to create evaluation for deployment", "deployment_id", w.deploymentID, "error", err) - } else { - w.setLatestEval(index) } }) } @@ -764,71 +757,68 @@ func (w *deploymentWatcher) getAllocsImpl(ws memdb.WatchSet, state *state.StateS return nil, 0, err } + maxIndex := uint64(0) stubs := make([]*structs.AllocListStub, 0, len(allocs)) for _, alloc := range allocs { stubs = append(stubs, alloc.Stub()) + + if maxIndex < alloc.ModifyIndex { + maxIndex = alloc.ModifyIndex + } } - // Use the last index that affected the jobs table - index, err := state.Index("allocs") - if err != nil { - return nil, index, err + // Use the last index that affected the allocs table + if len(stubs) == 0 { + index, err := state.Index("allocs") + if err != nil { + return nil, index, err + } + maxIndex = index } - return stubs, index, nil + return stubs, maxIndex, nil } -// latestEvalIndex returns the index of the last evaluation created for -// the job. The index is used to determine if an allocation update requires an -// evaluation to be triggered. -func (w *deploymentWatcher) latestEvalIndex() (uint64, error) { +// jobEvalStatus returns the eval status for a job. It returns the index of the +// last evaluation created for the job, as well as whether there exists a +// blocked evaluation for the job. The index is used to determine if an +// allocation update requires an evaluation to be triggered. If there already is +// a blocked evaluations, no eval should be created. +func (w *deploymentWatcher) jobEvalStatus() (latestIndex uint64, blocked bool, err error) { if err := w.queryLimiter.Wait(w.ctx); err != nil { - return 0, err + return 0, false, err } snap, err := w.state.Snapshot() if err != nil { - return 0, err + return 0, false, err } evals, err := snap.EvalsByJob(nil, w.j.Namespace, w.j.ID) if err != nil { - return 0, err + return 0, false, err } if len(evals) == 0 { - idx, err := snap.Index("evals") - if err != nil { - w.setLatestEval(idx) + index, err := snap.Index("evals") + return index, false, err + } + + var max uint64 + for _, eval := range evals { + // If we have a blocked eval, then we do not care what the index is + // since we will not need to make a new eval. + if eval.ShouldBlock() { + return 0, true, nil } - return idx, err + // Prefer using the snapshot index. Otherwise use the create index + if eval.SnapshotIndex != 0 && max < eval.SnapshotIndex { + max = eval.SnapshotIndex + } else if max < eval.CreateIndex { + max = eval.CreateIndex + } } - // Prefer using the snapshot index. Otherwise use the create index - e := evals[0] - if e.SnapshotIndex != 0 { - w.setLatestEval(e.SnapshotIndex) - return e.SnapshotIndex, nil - } - - w.setLatestEval(e.CreateIndex) - return e.CreateIndex, nil -} - -// setLatestEval sets the given index as the latest eval unless the currently -// stored index is higher. -func (w *deploymentWatcher) setLatestEval(index uint64) { - w.l.Lock() - defer w.l.Unlock() - if index > w.latestEval { - w.latestEval = index - } -} - -// getLatestEval returns the latest eval index. -func (w *deploymentWatcher) getLatestEval() uint64 { - w.l.Lock() - defer w.l.Unlock() - return w.latestEval + return max, false, nil } diff --git a/nomad/drainer/watch_jobs.go b/nomad/drainer/watch_jobs.go index 4f6b7e9cc..35380cfdb 100644 --- a/nomad/drainer/watch_jobs.go +++ b/nomad/drainer/watch_jobs.go @@ -447,6 +447,7 @@ func (w *drainingJobWatcher) getJobAllocsImpl(ws memdb.WatchSet, state *state.St } // Capture the allocs for each draining job. + var maxIndex uint64 = 0 resp := make(map[structs.NamespacedID][]*structs.Allocation, l) for jns := range draining { allocs, err := state.AllocsByJob(ws, jns.Namespace, jns.ID, false) @@ -455,6 +456,17 @@ func (w *drainingJobWatcher) getJobAllocsImpl(ws memdb.WatchSet, state *state.St } resp[jns] = allocs + for _, alloc := range allocs { + if maxIndex < alloc.ModifyIndex { + maxIndex = alloc.ModifyIndex + } + } + } + + // Prefer using the actual max index of affected allocs since it means less + // unblocking + if maxIndex != 0 { + index = maxIndex } return resp, index, nil diff --git a/nomad/drainer/watch_nodes.go b/nomad/drainer/watch_nodes.go index 65ec06206..7639bc804 100644 --- a/nomad/drainer/watch_nodes.go +++ b/nomad/drainer/watch_nodes.go @@ -235,6 +235,7 @@ func (w *nodeDrainWatcher) getNodesImpl(ws memdb.WatchSet, state *state.StateSto return nil, 0, err } + var maxIndex uint64 = 0 resp := make(map[string]*structs.Node, 64) for { raw := iter.Next() @@ -244,6 +245,15 @@ func (w *nodeDrainWatcher) getNodesImpl(ws memdb.WatchSet, state *state.StateSto node := raw.(*structs.Node) resp[node.ID] = node + if maxIndex < node.ModifyIndex { + maxIndex = node.ModifyIndex + } + } + + // Prefer using the actual max index of affected nodes since it means less + // unblocking + if maxIndex != 0 { + index = maxIndex } return resp, index, nil diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 9b7cbc060..18ebde6e2 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -2825,6 +2825,7 @@ func (s *StateStore) UpdateDeploymentAllocHealth(index uint64, req *structs.Appl copy.DeploymentStatus.Healthy = helper.BoolToPtr(healthy) copy.DeploymentStatus.Timestamp = ts copy.DeploymentStatus.ModifyIndex = index + copy.ModifyIndex = index if err := s.updateDeploymentWithAlloc(index, copy, old, txn); err != nil { return fmt.Errorf("error updating deployment: %v", err) diff --git a/nomad/structs/funcs.go b/nomad/structs/funcs.go index 91298c97b..5ace4bc8c 100644 --- a/nomad/structs/funcs.go +++ b/nomad/structs/funcs.go @@ -111,6 +111,11 @@ func AllocsFit(node *Node, allocs []*Allocation, netIdx *NetworkIndex) (bool, st // For each alloc, add the resources for _, alloc := range allocs { + // Do not consider the resource impact of terminal allocations + if alloc.TerminalStatus() { + continue + } + if alloc.Resources != nil { if err := used.Add(alloc.Resources); err != nil { return false, "", nil, err diff --git a/nomad/structs/funcs_test.go b/nomad/structs/funcs_test.go index 7c0ba5dca..cbda393c6 100644 --- a/nomad/structs/funcs_test.go +++ b/nomad/structs/funcs_test.go @@ -216,6 +216,91 @@ func TestAllocsFit(t *testing.T) { } +func TestAllocsFit_TerminalAlloc(t *testing.T) { + n := &Node{ + Resources: &Resources{ + CPU: 2000, + MemoryMB: 2048, + DiskMB: 10000, + IOPS: 100, + Networks: []*NetworkResource{ + { + Device: "eth0", + CIDR: "10.0.0.0/8", + MBits: 100, + }, + }, + }, + Reserved: &Resources{ + CPU: 1000, + MemoryMB: 1024, + DiskMB: 5000, + IOPS: 50, + Networks: []*NetworkResource{ + { + Device: "eth0", + IP: "10.0.0.1", + MBits: 50, + ReservedPorts: []Port{{"main", 80}}, + }, + }, + }, + } + + a1 := &Allocation{ + Resources: &Resources{ + CPU: 1000, + MemoryMB: 1024, + DiskMB: 5000, + IOPS: 50, + Networks: []*NetworkResource{ + { + Device: "eth0", + IP: "10.0.0.1", + MBits: 50, + ReservedPorts: []Port{{"main", 8000}}, + }, + }, + }, + } + + // Should fit one allocation + fit, _, used, err := AllocsFit(n, []*Allocation{a1}, nil) + if err != nil { + t.Fatalf("err: %v", err) + } + if !fit { + t.Fatalf("Bad") + } + + // Sanity check the used resources + if used.CPU != 2000 { + t.Fatalf("bad: %#v", used) + } + if used.MemoryMB != 2048 { + t.Fatalf("bad: %#v", used) + } + + // Should fit second allocation since it is terminal + a2 := a1.Copy() + a2.DesiredStatus = AllocDesiredStatusStop + fit, _, used, err = AllocsFit(n, []*Allocation{a1, a2}, nil) + if err != nil { + t.Fatalf("err: %v", err) + } + if !fit { + t.Fatalf("Bad") + } + + // Sanity check the used resources + if used.CPU != 2000 { + t.Fatalf("bad: %#v", used) + } + if used.MemoryMB != 2048 { + t.Fatalf("bad: %#v", used) + } +} + func TestScoreFit(t *testing.T) { node := &Node{} node.Resources = &Resources{ diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 1588cc101..e4980edd2 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -6751,6 +6751,7 @@ const ( EvalTriggerFailedFollowUp = "failed-follow-up" EvalTriggerMaxPlans = "max-plan-attempts" EvalTriggerRetryFailedAlloc = "alloc-failure" + EvalTriggerQueuedAllocs = "queued-allocs" ) const ( @@ -6882,8 +6883,11 @@ type Evaluation struct { LeaderACL string // SnapshotIndex is the Raft index of the snapshot used to process the - // evaluation. As such it will only be set once it has gone through the - // scheduler. + // evaluation. The index will either be set when it has gone through the + // scheduler or if a blocked evaluation is being created. The index is set + // in this case so we can determine if an early unblocking is required since + // capacity has changed since the evaluation was created. This can result in + // the SnapshotIndex being less than the CreateIndex. SnapshotIndex uint64 // Raft Indexes @@ -7013,7 +7017,7 @@ func (e *Evaluation) CreateBlockedEval(classEligibility map[string]bool, Namespace: e.Namespace, Priority: e.Priority, Type: e.Type, - TriggeredBy: e.TriggeredBy, + TriggeredBy: EvalTriggerQueuedAllocs, JobID: e.JobID, JobModifyIndex: e.JobModifyIndex, Status: EvalStatusBlocked, @@ -7138,6 +7142,10 @@ func (p *Plan) PopUpdate(alloc *Allocation) { func (p *Plan) AppendAlloc(alloc *Allocation) { node := alloc.NodeID existing := p.NodeAllocation[node] + + // Normalize the job + alloc.Job = nil + p.NodeAllocation[node] = append(existing, alloc) } diff --git a/nomad/worker.go b/nomad/worker.go index b6bab0bbc..75b20c1c7 100644 --- a/nomad/worker.go +++ b/nomad/worker.go @@ -114,18 +114,21 @@ func (w *Worker) run() { // Check for a shutdown if w.srv.IsShutdown() { + w.logger.Error("nacking eval because the server is shutting down", "eval", log.Fmt("%#v", eval)) w.sendAck(eval.ID, token, false) return } // Wait for the raft log to catchup to the evaluation if err := w.waitForIndex(waitIndex, raftSyncLimit); err != nil { + w.logger.Error("error waiting for Raft index", "error", err, "index", waitIndex) w.sendAck(eval.ID, token, false) continue } // Invoke the scheduler to determine placements if err := w.invokeScheduler(eval, token); err != nil { + w.logger.Error("error invoking scheduler", "error", err) w.sendAck(eval.ID, token, false) continue } @@ -326,7 +329,7 @@ SUBMIT: } return nil, nil, err } else { - w.logger.Debug("submitted plan for evaluation", "plan_resp_index", resp.Index, "eval_id", plan.EvalID) + w.logger.Debug("submitted plan for evaluation", "eval_id", plan.EvalID) w.backoffReset() } diff --git a/scheduler/generic_sched.go b/scheduler/generic_sched.go index eb2b8023f..1094bc6a7 100644 --- a/scheduler/generic_sched.go +++ b/scheduler/generic_sched.go @@ -128,9 +128,10 @@ func (s *GenericScheduler) Process(eval *structs.Evaluation) error { switch eval.TriggeredBy { case structs.EvalTriggerJobRegister, structs.EvalTriggerJobDeregister, structs.EvalTriggerNodeDrain, structs.EvalTriggerNodeUpdate, - structs.EvalTriggerRollingUpdate, + structs.EvalTriggerRollingUpdate, structs.EvalTriggerQueuedAllocs, structs.EvalTriggerPeriodicJob, structs.EvalTriggerMaxPlans, - structs.EvalTriggerDeploymentWatcher, structs.EvalTriggerRetryFailedAlloc: + structs.EvalTriggerDeploymentWatcher, structs.EvalTriggerRetryFailedAlloc, + structs.EvalTriggerFailedFollowUp: default: desc := fmt.Sprintf("scheduler cannot handle '%s' evaluation reason", eval.TriggeredBy) diff --git a/scheduler/generic_sched_test.go b/scheduler/generic_sched_test.go index d735c2489..b20f43237 100644 --- a/scheduler/generic_sched_test.go +++ b/scheduler/generic_sched_test.go @@ -241,6 +241,10 @@ func TestServiceSched_JobRegister_DiskConstraints(t *testing.T) { t.Fatalf("bad: %#v", h.CreateEvals) } + if h.CreateEvals[0].TriggeredBy != structs.EvalTriggerQueuedAllocs { + t.Fatalf("bad: %#v", h.CreateEvals[0]) + } + // Ensure the plan allocated only one allocation var planned []*structs.Allocation for _, allocList := range plan.NodeAllocation { diff --git a/scheduler/system_sched.go b/scheduler/system_sched.go index 53131e3b4..b1a243a99 100644 --- a/scheduler/system_sched.go +++ b/scheduler/system_sched.go @@ -60,7 +60,7 @@ func (s *SystemScheduler) Process(eval *structs.Evaluation) error { // Verify the evaluation trigger reason is understood switch eval.TriggeredBy { - case structs.EvalTriggerJobRegister, structs.EvalTriggerNodeUpdate, + case structs.EvalTriggerJobRegister, structs.EvalTriggerNodeUpdate, structs.EvalTriggerFailedFollowUp, structs.EvalTriggerJobDeregister, structs.EvalTriggerRollingUpdate, structs.EvalTriggerDeploymentWatcher, structs.EvalTriggerNodeDrain: default: