diff --git a/nomad/plan_apply.go b/nomad/plan_apply.go index a3f7e7421..96b6edca9 100644 --- a/nomad/plan_apply.go +++ b/nomad/plan_apply.go @@ -126,13 +126,10 @@ func (s *Server) applyPlan(plan *structs.Plan, result *structs.PlanResult, snap minUpdates := len(result.NodeUpdate) minUpdates += len(result.NodeAllocation) - // Grab the job - job := plan.Job - // Setup the update request req := structs.ApplyPlanResultsRequest{ AllocUpdateRequest: structs.AllocUpdateRequest{ - Job: job, + Job: plan.Job, Alloc: make([]*structs.Allocation, 0, minUpdates), }, CreatedDeployment: plan.CreatedDeployment, diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 13a2c5015..a595f9860 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -3710,6 +3710,11 @@ const ( DeploymentStatusSuccessful = "successful" DeploymentStatusCancelled = "cancelled" DeploymentStatusPaused = "paused" + + // DeploymentStatusDescriptions are the various descriptions of the states a + // deployment can be in. + DeploymentStatusDescriptionStoppedJob = "Cancelled because job is stopped" + DeploymentStatusDescriptionNewerJob = "Cancelled due to newer version of job" ) // Deployment is the object that represents a job deployment which is used to @@ -4591,9 +4596,23 @@ func (p *Plan) AppendAlloc(alloc *Allocation) { p.NodeAllocation[node] = append(existing, alloc) } +// AppendDeploymentUpdate attaches an deployment update to the plan for the +// given deployment ID. +func (p *Plan) AppendDeploymentUpdate(id, status, description string) { + update := &DeploymentStatusUpdate{ + DeploymentID: id, + Status: status, + StatusDescription: description, + } + p.DeploymentUpdates = append(p.DeploymentUpdates, update) +} + // IsNoOp checks if this plan would do nothing func (p *Plan) IsNoOp() bool { - return len(p.NodeUpdate) == 0 && len(p.NodeAllocation) == 0 + return len(p.NodeUpdate) == 0 && + len(p.NodeAllocation) == 0 && + p.CreatedDeployment == nil && + len(p.DeploymentUpdates) == 0 } // PlanResult is the result of a plan submitted to the leader. diff --git a/scheduler/generic_sched.go b/scheduler/generic_sched.go index 43f38f15e..a12cd1a8d 100644 --- a/scheduler/generic_sched.go +++ b/scheduler/generic_sched.go @@ -73,6 +73,8 @@ type GenericScheduler struct { limitReached bool nextEval *structs.Evaluation + deployment *structs.Deployment + blocked *structs.Evaluation failedTGAllocs map[string]*structs.AllocMetric queuedAllocs map[string]int @@ -187,11 +189,12 @@ func (s *GenericScheduler) process() (bool, error) { ws := memdb.NewWatchSet() s.job, err = s.state.JobByID(ws, s.eval.JobID) if err != nil { - return false, fmt.Errorf("failed to get job '%s': %v", - s.eval.JobID, err) + return false, fmt.Errorf("failed to get job %q: %v", s.eval.JobID, err) } + numTaskGroups := 0 - if !s.job.Stopped() { + stopped := s.job.Stopped() + if !stopped { numTaskGroups = len(s.job.TaskGroups) } s.queuedAllocs = make(map[string]int, numTaskGroups) @@ -199,6 +202,30 @@ func (s *GenericScheduler) process() (bool, error) { // Create a plan s.plan = s.eval.MakePlan(s.job) + if !s.batch { + // Get any existing deployment + s.deployment, err = s.state.LatestDeploymentByJobID(ws, s.eval.JobID) + if err != nil { + return false, fmt.Errorf("failed to get job deployment %q: %v", s.eval.JobID, err) + } + + // If there is an existing deployment, it should be cancelled under the + // following scenarios: + // 1) The current job is stopped. + // 2) The deployment is for an older version of the job. + if d := s.deployment; d != nil { + if stopped { + s.plan.AppendDeploymentUpdate(d.ID, structs.DeploymentStatusCancelled, + structs.DeploymentStatusDescriptionStoppedJob) + s.deployment = nil + } else if d.JobCreateIndex != s.job.CreateIndex || d.JobModifyIndex != s.job.JobModifyIndex { + s.plan.AppendDeploymentUpdate(d.ID, structs.DeploymentStatusCancelled, + structs.DeploymentStatusDescriptionNewerJob) + s.deployment = nil + } + } + } + // Reset the failed allocations s.failedTGAllocs = nil @@ -234,6 +261,7 @@ func (s *GenericScheduler) process() (bool, error) { return true, nil } + // XXX Don't need a next rolling update eval // If the limit of placements was reached we need to create an evaluation // to pickup from here after the stagger period. if s.limitReached && s.nextEval == nil { diff --git a/scheduler/generic_sched_test.go b/scheduler/generic_sched_test.go index afb0e48d4..e1616b22e 100644 --- a/scheduler/generic_sched_test.go +++ b/scheduler/generic_sched_test.go @@ -2845,3 +2845,140 @@ func TestServiceSched_NodeDrain_Sticky(t *testing.T) { h.AssertEvalStatus(t, structs.EvalStatusComplete) } + +// This test ensures that when a job is stopped, the scheduler properly cancels +// an outstanding deployment. +func TestServiceSched_CancelDeployment_Stopped(t *testing.T) { + h := NewHarness(t) + + // Generate a fake job + job := mock.Job() + job.JobModifyIndex = job.CreateIndex + 1 + job.ModifyIndex = job.CreateIndex + 1 + job.Stop = true + noErr(t, h.State.UpsertJob(h.NextIndex(), job)) + + // Create a deployment + d := mock.Deployment() + d.JobID = job.ID + d.JobCreateIndex = job.CreateIndex + d.JobModifyIndex = job.JobModifyIndex - 1 + noErr(t, h.State.UpsertDeployment(h.NextIndex(), d, false)) + + // Create a mock evaluation to deregister the job + eval := &structs.Evaluation{ + ID: structs.GenerateUUID(), + Priority: 50, + TriggeredBy: structs.EvalTriggerJobDeregister, + JobID: job.ID, + } + + // Process the evaluation + err := h.Process(NewServiceScheduler, eval) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Ensure a single plan + if len(h.Plans) != 1 { + t.Fatalf("bad: %#v", h.Plans) + } + plan := h.Plans[0] + + // Ensure the plan cancelled the existing deployment + ws := memdb.NewWatchSet() + out, err := h.State.LatestDeploymentByJobID(ws, job.ID) + noErr(t, err) + + if out == nil { + t.Fatalf("No deployment for job") + } + if out.ID != d.ID { + t.Fatalf("Latest deployment for job is different than original deployment") + } + if out.Status != structs.DeploymentStatusCancelled { + t.Fatalf("Deployment status is %q, want %q", out.Status, structs.DeploymentStatusCancelled) + } + if out.StatusDescription != structs.DeploymentStatusDescriptionStoppedJob { + t.Fatalf("Deployment status description is %q, want %q", + out.StatusDescription, structs.DeploymentStatusDescriptionStoppedJob) + } + + // Ensure the plan didn't allocate anything + var planned []*structs.Allocation + for _, allocList := range plan.NodeAllocation { + planned = append(planned, allocList...) + } + if len(planned) != 0 { + t.Fatalf("bad: %#v", plan) + } + + h.AssertEvalStatus(t, structs.EvalStatusComplete) +} + +// This test ensures that when a job is updated and had an old deployment, the scheduler properly cancels +// the deployment. +func TestServiceSched_CancelDeployment_NewerJob(t *testing.T) { + h := NewHarness(t) + + // Generate a fake job + job := mock.Job() + noErr(t, h.State.UpsertJob(h.NextIndex(), job)) + + // Create a deployment for an old version of the job + d := mock.Deployment() + d.JobID = job.ID + noErr(t, h.State.UpsertDeployment(h.NextIndex(), d, false)) + + // Upsert again to bump job version + noErr(t, h.State.UpsertJob(h.NextIndex(), job)) + + // Create a mock evaluation to kick the job + eval := &structs.Evaluation{ + ID: structs.GenerateUUID(), + Priority: 50, + TriggeredBy: structs.EvalTriggerJobRegister, + JobID: job.ID, + } + + // Process the evaluation + err := h.Process(NewServiceScheduler, eval) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Ensure a single plan + if len(h.Plans) != 1 { + t.Fatalf("bad: %#v", h.Plans) + } + plan := h.Plans[0] + + // Ensure the plan cancelled the existing deployment + ws := memdb.NewWatchSet() + out, err := h.State.LatestDeploymentByJobID(ws, job.ID) + noErr(t, err) + + if out == nil { + t.Fatalf("No deployment for job") + } + if out.ID != d.ID { + t.Fatalf("Latest deployment for job is different than original deployment") + } + if out.Status != structs.DeploymentStatusCancelled { + t.Fatalf("Deployment status is %q, want %q", out.Status, structs.DeploymentStatusCancelled) + } + if out.StatusDescription != structs.DeploymentStatusDescriptionNewerJob { + t.Fatalf("Deployment status description is %q, want %q", + out.StatusDescription, structs.DeploymentStatusDescriptionNewerJob) + } + // Ensure the plan didn't allocate anything + var planned []*structs.Allocation + for _, allocList := range plan.NodeAllocation { + planned = append(planned, allocList...) + } + if len(planned) != 0 { + t.Fatalf("bad: %#v", plan) + } + + h.AssertEvalStatus(t, structs.EvalStatusComplete) +} diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index ddbf855c4..995b3affd 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -79,6 +79,10 @@ type State interface { // GetJobByID is used to lookup a job by ID JobByID(ws memdb.WatchSet, id string) (*structs.Job, error) + + // LatestDeploymentByJobID returns the latest deployment matching the given + // job ID + LatestDeploymentByJobID(ws memdb.WatchSet, jobID string) (*structs.Deployment, error) } // Planner interface is used to submit a task allocation plan. diff --git a/scheduler/testing.go b/scheduler/testing.go index 74c01c486..af1411db1 100644 --- a/scheduler/testing.go +++ b/scheduler/testing.go @@ -6,6 +6,7 @@ import ( "os" "sync" "testing" + "time" memdb "github.com/hashicorp/go-memdb" "github.com/hashicorp/nomad/nomad/state" @@ -108,19 +109,27 @@ func (h *Harness) SubmitPlan(plan *structs.Plan) (*structs.PlanResult, State, er allocs = append(allocs, allocList...) } - // Attach the plan to all the allocations. It is pulled out in the - // payload to avoid the redundancy of encoding, but should be denormalized - // prior to being inserted into MemDB. - if j := plan.Job; j != nil { - for _, alloc := range allocs { - if alloc.Job == nil { - alloc.Job = j - } + // Set the time the alloc was applied for the first time. This can be used + // to approximate the scheduling time. + now := time.Now().UTC().UnixNano() + for _, alloc := range allocs { + if alloc.CreateTime == 0 { + alloc.CreateTime = now } } + // Setup the update request + req := structs.ApplyPlanResultsRequest{ + AllocUpdateRequest: structs.AllocUpdateRequest{ + Job: plan.Job, + Alloc: allocs, + }, + CreatedDeployment: plan.CreatedDeployment, + DeploymentUpdates: plan.DeploymentUpdates, + } + // Apply the full plan - err := h.State.UpsertAllocs(index, allocs) + err := h.State.UpsertPlanResults(index, &req) return result, nil, err }