scheduler: create placements for non-register MRD (#15325)

* scheduler: create placements for non-register MRD

For multiregion jobs, the scheduler does not create placements on
registration because the deployment must wait for the other regions.
Once of these regions will then trigger the deployment to run.

Currently, this is done in the scheduler by considering any eval for a
multiregion job as "paused" since it's expected that another region will
eventually unpause it.

This becomes a problem where evals not triggered by a job registration
happen, such as on a node update. These types of regional changes do not
have other regions waiting to progress the deployment, and so they were
never resulting in placements.

The fix is to create a deployment at job registration time. This
additional piece of state allows the scheduler to differentiate between
a multiregion change, where there are other regions engaged in the
deployment so no placements are required, from a regional change, where
the scheduler does need to create placements.

This deployment starts in the new "initializing" status to signal to the
scheduler that it needs to compute the initial deployment state. The
multiregion deployment will wait until this deployment state is
persisted and its starts is set to "pending". Without this state
transition it's possible to hit a race condition where the plan applier
and the deployment watcher may step of each other and overwrite their
changes.

* changelog: add entry for #15325
This commit is contained in:
Luiz Aoqui 2022-11-25 12:45:34 -05:00 committed by GitHub
parent 9c85315bd2
commit 8f91be26ab
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 123 additions and 61 deletions

3
.changelog/15325.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:bug
scheduler (Enterprise): Fixed a bug that prevented new allocations from multiregion jobs to be placed in situations where other regions are not involved, such as node updates.
```

View File

@ -613,10 +613,41 @@ func (n *nomadFSM) applyUpsertJob(msgType structs.MessageType, buf []byte, index
}
}
if req.Deployment != nil {
// Cancel any preivous deployment.
lastDeployment, err := n.state.LatestDeploymentByJobID(ws, req.Job.Namespace, req.Job.ID)
if err != nil {
return fmt.Errorf("failed to retrieve latest deployment: %v", err)
}
if lastDeployment != nil && lastDeployment.Active() {
activeDeployment := lastDeployment.Copy()
activeDeployment.Status = structs.DeploymentStatusCancelled
activeDeployment.StatusDescription = structs.DeploymentStatusDescriptionNewerJob
if err := n.state.UpsertDeployment(index, activeDeployment); err != nil {
return err
}
}
// Update the deployment with the latest job indexes.
req.Deployment.JobCreateIndex = req.Job.CreateIndex
req.Deployment.JobModifyIndex = req.Job.ModifyIndex
req.Deployment.JobSpecModifyIndex = req.Job.JobModifyIndex
req.Deployment.JobVersion = req.Job.Version
if err := n.state.UpsertDeployment(index, req.Deployment); err != nil {
return err
}
}
// COMPAT: Prior to Nomad 0.12.x evaluations were submitted in a separate Raft log,
// so this may be nil during server upgrades.
if req.Eval != nil {
req.Eval.JobModifyIndex = index
if req.Deployment != nil {
req.Eval.DeploymentID = req.Deployment.ID
}
if err := n.upsertEvals(msgType, index, []*structs.Evaluation{req.Eval}); err != nil {
return err
}

View File

@ -364,6 +364,9 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis
submittedEval = true
}
// Pre-register a deployment if necessary.
args.Deployment = j.multiregionCreateDeployment(job, eval)
// Commit this update via Raft
fsmErr, index, err := j.srv.raftApply(structs.JobRegisterRequestType, args)
if err, ok := fsmErr.(error); ok && err != nil {

View File

@ -12,6 +12,12 @@ func (j *Job) enforceSubmitJob(override bool, job *structs.Job, nomadACLToken *s
return nil, nil
}
// multiregionCreateDeployment is used to create a deployment to register along
// with the job, if required.
func (j *Job) multiregionCreateDeployment(job *structs.Job, eval *structs.Evaluation) *structs.Deployment {
return nil
}
// multiregionRegister is used to send a job across multiple regions
func (j *Job) multiregionRegister(args *structs.JobRegisterRequest, reply *structs.JobRegisterResponse, newVersion uint64) (bool, error) {
return false, nil

View File

@ -536,8 +536,7 @@ func (s *StateStore) DeleteJobSummary(index uint64, namespace, id string) error
return txn.Commit()
}
// UpsertDeployment is used to insert a new deployment. If cancelPrior is set to
// true, all prior deployments for the same job will be cancelled.
// UpsertDeployment is used to insert or update a new deployment.
func (s *StateStore) UpsertDeployment(index uint64, deployment *structs.Deployment) error {
txn := s.db.WriteTxn(index)
defer txn.Abort()

View File

@ -614,6 +614,10 @@ type JobRegisterRequest struct {
// Eval is the evaluation that is associated with the job registration
Eval *Evaluation
// Deployment is the deployment to be create when the job is registered. If
// there is an active deployment for the job it will be canceled.
Deployment *Deployment
WriteRequest
}
@ -9216,14 +9220,15 @@ func (v *Vault) Validate() error {
const (
// DeploymentStatuses are the various states a deployment can be be in
DeploymentStatusRunning = "running"
DeploymentStatusPaused = "paused"
DeploymentStatusFailed = "failed"
DeploymentStatusSuccessful = "successful"
DeploymentStatusCancelled = "cancelled"
DeploymentStatusPending = "pending"
DeploymentStatusBlocked = "blocked"
DeploymentStatusUnblocking = "unblocking"
DeploymentStatusRunning = "running"
DeploymentStatusPaused = "paused"
DeploymentStatusFailed = "failed"
DeploymentStatusSuccessful = "successful"
DeploymentStatusCancelled = "cancelled"
DeploymentStatusInitializing = "initializing"
DeploymentStatusPending = "pending"
DeploymentStatusBlocked = "blocked"
DeploymentStatusUnblocking = "unblocking"
// TODO Statuses and Descriptions do not match 1:1 and we sometimes use the Description as a status flag
@ -9357,7 +9362,8 @@ func (d *Deployment) Copy() *Deployment {
// Active returns whether the deployment is active or terminal.
func (d *Deployment) Active() bool {
switch d.Status {
case DeploymentStatusRunning, DeploymentStatusPaused, DeploymentStatusBlocked, DeploymentStatusUnblocking, DeploymentStatusPending:
case DeploymentStatusRunning, DeploymentStatusPaused, DeploymentStatusBlocked,
DeploymentStatusUnblocking, DeploymentStatusInitializing, DeploymentStatusPending:
return true
default:
return false

View File

@ -231,24 +231,35 @@ func (a *allocReconciler) computeDeploymentComplete(m allocMatrix) bool {
}
func (a *allocReconciler) computeDeploymentUpdates(deploymentComplete bool) {
// Mark the deployment as complete if possible
if a.deployment != nil && deploymentComplete {
if a.job.IsMultiregion() {
// the unblocking/successful states come after blocked, so we
// need to make sure we don't revert those states
if a.deployment.Status != structs.DeploymentStatusUnblocking &&
a.deployment.Status != structs.DeploymentStatusSuccessful {
if a.deployment != nil {
// Mark the deployment as complete if possible
if deploymentComplete {
if a.job.IsMultiregion() {
// the unblocking/successful states come after blocked, so we
// need to make sure we don't revert those states
if a.deployment.Status != structs.DeploymentStatusUnblocking &&
a.deployment.Status != structs.DeploymentStatusSuccessful {
a.result.deploymentUpdates = append(a.result.deploymentUpdates, &structs.DeploymentStatusUpdate{
DeploymentID: a.deployment.ID,
Status: structs.DeploymentStatusBlocked,
StatusDescription: structs.DeploymentStatusDescriptionBlocked,
})
}
} else {
a.result.deploymentUpdates = append(a.result.deploymentUpdates, &structs.DeploymentStatusUpdate{
DeploymentID: a.deployment.ID,
Status: structs.DeploymentStatusBlocked,
StatusDescription: structs.DeploymentStatusDescriptionBlocked,
Status: structs.DeploymentStatusSuccessful,
StatusDescription: structs.DeploymentStatusDescriptionSuccessful,
})
}
} else {
}
// Mark the deployment as pending since its state is now computed.
if a.deployment.Status == structs.DeploymentStatusInitializing {
a.result.deploymentUpdates = append(a.result.deploymentUpdates, &structs.DeploymentStatusUpdate{
DeploymentID: a.deployment.ID,
Status: structs.DeploymentStatusSuccessful,
StatusDescription: structs.DeploymentStatusDescriptionSuccessful,
Status: structs.DeploymentStatusPending,
StatusDescription: structs.DeploymentStatusDescriptionPendingForPeer,
})
}
}
@ -269,26 +280,18 @@ func (a *allocReconciler) computeDeploymentUpdates(deploymentComplete bool) {
// allocReconciler that indicate the state of the deployment if one
// is required. The flags that are managed are:
// 1. deploymentFailed: Did the current deployment fail just as named.
// 2. deploymentPaused: Multiregion job types that use deployments run
// the deployments later during the fan-out stage. When the deployment
// is created it will be in a pending state. If an invariant violation
// is detected by the deploymentWatcher during it will enter a paused
// state. This flag tells Compute we're paused or pending, so we should
// not make placements on the deployment.
// 2. deploymentPaused: Set to true when the current deployment is paused,
// which is usually a manual user operation, or if the deployment is
// pending or initializing, which are the initial states for multi-region
// job deployments. This flag tells Compute that we should not make
// placements on the deployment.
func (a *allocReconciler) computeDeploymentPaused() {
if a.deployment != nil {
a.deploymentPaused = a.deployment.Status == structs.DeploymentStatusPaused ||
a.deployment.Status == structs.DeploymentStatusPending
a.deployment.Status == structs.DeploymentStatusPending ||
a.deployment.Status == structs.DeploymentStatusInitializing
a.deploymentFailed = a.deployment.Status == structs.DeploymentStatusFailed
}
if a.deployment == nil {
if a.job.IsMultiregion() &&
a.job.UsesDeployments() &&
!(a.job.IsPeriodic() || a.job.IsParameterized()) {
a.deploymentPaused = true
}
}
}
// cancelUnneededDeployments cancels any deployment that is not needed. If the
@ -512,6 +515,12 @@ func (a *allocReconciler) computeGroup(groupName string, all allocSet) bool {
a.computeMigrations(desiredChanges, migrate, tg, isCanarying)
a.createDeployment(tg.Name, tg.Update, existingDeployment, dstate, all, destructive)
// Deployments that are still initializing need to be sent in full in the
// plan so its internal state can be persisted by the plan applier.
if a.deployment != nil && a.deployment.Status == structs.DeploymentStatusInitializing {
a.result.deployment = a.deployment
}
deploymentComplete := a.isDeploymentComplete(groupName, destructive, inplace,
migrate, rescheduleNow, place, rescheduleLater, requiresCanaries)
@ -889,11 +898,6 @@ func (a *allocReconciler) createDeployment(groupName string, strategy *structs.U
// A previous group may have made the deployment already. If not create one.
if a.deployment == nil {
a.deployment = structs.NewDeployment(a.job, a.evalPriority)
// in multiregion jobs, most deployments start in a pending state
if a.job.IsMultiregion() && !(a.job.IsPeriodic() && a.job.IsParameterized()) {
a.deployment.Status = structs.DeploymentStatusPending
a.deployment.StatusDescription = structs.DeploymentStatusDescriptionPendingForPeer
}
a.result.deployment = a.deployment
}

View File

@ -6073,22 +6073,6 @@ func TestReconciler_ComputeDeploymentPaused(t *testing.T) {
isParameterized: false,
expected: true,
},
{
name: "multiregion periodic service is not paused",
jobType: structs.JobTypeService,
isMultiregion: true,
isPeriodic: true,
isParameterized: false,
expected: false,
},
{
name: "multiregion parameterized service is not paused",
jobType: structs.JobTypeService,
isMultiregion: true,
isPeriodic: false,
isParameterized: true,
expected: false,
},
{
name: "single region batch job is not paused",
jobType: structs.JobTypeBatch,
@ -6105,6 +6089,22 @@ func TestReconciler_ComputeDeploymentPaused(t *testing.T) {
isParameterized: false,
expected: false,
},
{
name: "multiregion parameterized batch is not paused",
jobType: structs.JobTypeBatch,
isMultiregion: true,
isPeriodic: false,
isParameterized: true,
expected: false,
},
{
name: "multiregion periodic batch is not paused",
jobType: structs.JobTypeBatch,
isMultiregion: true,
isPeriodic: true,
isParameterized: false,
expected: false,
},
}
for _, tc := range testCases {
@ -6119,8 +6119,18 @@ func TestReconciler_ComputeDeploymentPaused(t *testing.T) {
require.NotNil(t, job, "invalid job type", tc.jobType)
var deployment *structs.Deployment
if tc.isMultiregion {
job.Multiregion = multiregionCfg
// This deployment is created by the Job.Register RPC and
// fetched by the scheduler before handing it to the
// reconciler.
if job.UsesDeployments() {
deployment = structs.NewDeployment(job, 100)
deployment.Status = structs.DeploymentStatusInitializing
deployment.StatusDescription = structs.DeploymentStatusDescriptionPendingForPeer
}
}
if tc.isPeriodic {
@ -6132,8 +6142,8 @@ func TestReconciler_ComputeDeploymentPaused(t *testing.T) {
}
reconciler := NewAllocReconciler(
testlog.HCLogger(t), allocUpdateFnIgnore, false, job.ID, job,
nil, nil, nil, "", job.Priority, true)
testlog.HCLogger(t), allocUpdateFnIgnore, false, job.ID, job, deployment,
nil, nil, "", job.Priority, true)
_ = reconciler.Compute()