cancel deployments

This commit is contained in:
Alex Dadgar 2017-05-18 12:36:04 -07:00
parent c418c409b1
commit b3f4db0930
6 changed files with 211 additions and 17 deletions

View File

@ -126,13 +126,10 @@ func (s *Server) applyPlan(plan *structs.Plan, result *structs.PlanResult, snap
minUpdates := len(result.NodeUpdate)
minUpdates += len(result.NodeAllocation)
// Grab the job
job := plan.Job
// Setup the update request
req := structs.ApplyPlanResultsRequest{
AllocUpdateRequest: structs.AllocUpdateRequest{
Job: job,
Job: plan.Job,
Alloc: make([]*structs.Allocation, 0, minUpdates),
},
CreatedDeployment: plan.CreatedDeployment,

View File

@ -3710,6 +3710,11 @@ const (
DeploymentStatusSuccessful = "successful"
DeploymentStatusCancelled = "cancelled"
DeploymentStatusPaused = "paused"
// DeploymentStatusDescriptions are the various descriptions of the states a
// deployment can be in.
DeploymentStatusDescriptionStoppedJob = "Cancelled because job is stopped"
DeploymentStatusDescriptionNewerJob = "Cancelled due to newer version of job"
)
// Deployment is the object that represents a job deployment which is used to
@ -4591,9 +4596,23 @@ func (p *Plan) AppendAlloc(alloc *Allocation) {
p.NodeAllocation[node] = append(existing, alloc)
}
// AppendDeploymentUpdate attaches an deployment update to the plan for the
// given deployment ID.
func (p *Plan) AppendDeploymentUpdate(id, status, description string) {
update := &DeploymentStatusUpdate{
DeploymentID: id,
Status: status,
StatusDescription: description,
}
p.DeploymentUpdates = append(p.DeploymentUpdates, update)
}
// IsNoOp checks if this plan would do nothing
func (p *Plan) IsNoOp() bool {
return len(p.NodeUpdate) == 0 && len(p.NodeAllocation) == 0
return len(p.NodeUpdate) == 0 &&
len(p.NodeAllocation) == 0 &&
p.CreatedDeployment == nil &&
len(p.DeploymentUpdates) == 0
}
// PlanResult is the result of a plan submitted to the leader.

View File

@ -73,6 +73,8 @@ type GenericScheduler struct {
limitReached bool
nextEval *structs.Evaluation
deployment *structs.Deployment
blocked *structs.Evaluation
failedTGAllocs map[string]*structs.AllocMetric
queuedAllocs map[string]int
@ -187,11 +189,12 @@ func (s *GenericScheduler) process() (bool, error) {
ws := memdb.NewWatchSet()
s.job, err = s.state.JobByID(ws, s.eval.JobID)
if err != nil {
return false, fmt.Errorf("failed to get job '%s': %v",
s.eval.JobID, err)
return false, fmt.Errorf("failed to get job %q: %v", s.eval.JobID, err)
}
numTaskGroups := 0
if !s.job.Stopped() {
stopped := s.job.Stopped()
if !stopped {
numTaskGroups = len(s.job.TaskGroups)
}
s.queuedAllocs = make(map[string]int, numTaskGroups)
@ -199,6 +202,30 @@ func (s *GenericScheduler) process() (bool, error) {
// Create a plan
s.plan = s.eval.MakePlan(s.job)
if !s.batch {
// Get any existing deployment
s.deployment, err = s.state.LatestDeploymentByJobID(ws, s.eval.JobID)
if err != nil {
return false, fmt.Errorf("failed to get job deployment %q: %v", s.eval.JobID, err)
}
// If there is an existing deployment, it should be cancelled under the
// following scenarios:
// 1) The current job is stopped.
// 2) The deployment is for an older version of the job.
if d := s.deployment; d != nil {
if stopped {
s.plan.AppendDeploymentUpdate(d.ID, structs.DeploymentStatusCancelled,
structs.DeploymentStatusDescriptionStoppedJob)
s.deployment = nil
} else if d.JobCreateIndex != s.job.CreateIndex || d.JobModifyIndex != s.job.JobModifyIndex {
s.plan.AppendDeploymentUpdate(d.ID, structs.DeploymentStatusCancelled,
structs.DeploymentStatusDescriptionNewerJob)
s.deployment = nil
}
}
}
// Reset the failed allocations
s.failedTGAllocs = nil
@ -234,6 +261,7 @@ func (s *GenericScheduler) process() (bool, error) {
return true, nil
}
// XXX Don't need a next rolling update eval
// If the limit of placements was reached we need to create an evaluation
// to pickup from here after the stagger period.
if s.limitReached && s.nextEval == nil {

View File

@ -2845,3 +2845,140 @@ func TestServiceSched_NodeDrain_Sticky(t *testing.T) {
h.AssertEvalStatus(t, structs.EvalStatusComplete)
}
// This test ensures that when a job is stopped, the scheduler properly cancels
// an outstanding deployment.
func TestServiceSched_CancelDeployment_Stopped(t *testing.T) {
h := NewHarness(t)
// Generate a fake job
job := mock.Job()
job.JobModifyIndex = job.CreateIndex + 1
job.ModifyIndex = job.CreateIndex + 1
job.Stop = true
noErr(t, h.State.UpsertJob(h.NextIndex(), job))
// Create a deployment
d := mock.Deployment()
d.JobID = job.ID
d.JobCreateIndex = job.CreateIndex
d.JobModifyIndex = job.JobModifyIndex - 1
noErr(t, h.State.UpsertDeployment(h.NextIndex(), d, false))
// Create a mock evaluation to deregister the job
eval := &structs.Evaluation{
ID: structs.GenerateUUID(),
Priority: 50,
TriggeredBy: structs.EvalTriggerJobDeregister,
JobID: job.ID,
}
// Process the evaluation
err := h.Process(NewServiceScheduler, eval)
if err != nil {
t.Fatalf("err: %v", err)
}
// Ensure a single plan
if len(h.Plans) != 1 {
t.Fatalf("bad: %#v", h.Plans)
}
plan := h.Plans[0]
// Ensure the plan cancelled the existing deployment
ws := memdb.NewWatchSet()
out, err := h.State.LatestDeploymentByJobID(ws, job.ID)
noErr(t, err)
if out == nil {
t.Fatalf("No deployment for job")
}
if out.ID != d.ID {
t.Fatalf("Latest deployment for job is different than original deployment")
}
if out.Status != structs.DeploymentStatusCancelled {
t.Fatalf("Deployment status is %q, want %q", out.Status, structs.DeploymentStatusCancelled)
}
if out.StatusDescription != structs.DeploymentStatusDescriptionStoppedJob {
t.Fatalf("Deployment status description is %q, want %q",
out.StatusDescription, structs.DeploymentStatusDescriptionStoppedJob)
}
// Ensure the plan didn't allocate anything
var planned []*structs.Allocation
for _, allocList := range plan.NodeAllocation {
planned = append(planned, allocList...)
}
if len(planned) != 0 {
t.Fatalf("bad: %#v", plan)
}
h.AssertEvalStatus(t, structs.EvalStatusComplete)
}
// This test ensures that when a job is updated and had an old deployment, the scheduler properly cancels
// the deployment.
func TestServiceSched_CancelDeployment_NewerJob(t *testing.T) {
h := NewHarness(t)
// Generate a fake job
job := mock.Job()
noErr(t, h.State.UpsertJob(h.NextIndex(), job))
// Create a deployment for an old version of the job
d := mock.Deployment()
d.JobID = job.ID
noErr(t, h.State.UpsertDeployment(h.NextIndex(), d, false))
// Upsert again to bump job version
noErr(t, h.State.UpsertJob(h.NextIndex(), job))
// Create a mock evaluation to kick the job
eval := &structs.Evaluation{
ID: structs.GenerateUUID(),
Priority: 50,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: job.ID,
}
// Process the evaluation
err := h.Process(NewServiceScheduler, eval)
if err != nil {
t.Fatalf("err: %v", err)
}
// Ensure a single plan
if len(h.Plans) != 1 {
t.Fatalf("bad: %#v", h.Plans)
}
plan := h.Plans[0]
// Ensure the plan cancelled the existing deployment
ws := memdb.NewWatchSet()
out, err := h.State.LatestDeploymentByJobID(ws, job.ID)
noErr(t, err)
if out == nil {
t.Fatalf("No deployment for job")
}
if out.ID != d.ID {
t.Fatalf("Latest deployment for job is different than original deployment")
}
if out.Status != structs.DeploymentStatusCancelled {
t.Fatalf("Deployment status is %q, want %q", out.Status, structs.DeploymentStatusCancelled)
}
if out.StatusDescription != structs.DeploymentStatusDescriptionNewerJob {
t.Fatalf("Deployment status description is %q, want %q",
out.StatusDescription, structs.DeploymentStatusDescriptionNewerJob)
}
// Ensure the plan didn't allocate anything
var planned []*structs.Allocation
for _, allocList := range plan.NodeAllocation {
planned = append(planned, allocList...)
}
if len(planned) != 0 {
t.Fatalf("bad: %#v", plan)
}
h.AssertEvalStatus(t, structs.EvalStatusComplete)
}

View File

@ -79,6 +79,10 @@ type State interface {
// GetJobByID is used to lookup a job by ID
JobByID(ws memdb.WatchSet, id string) (*structs.Job, error)
// LatestDeploymentByJobID returns the latest deployment matching the given
// job ID
LatestDeploymentByJobID(ws memdb.WatchSet, jobID string) (*structs.Deployment, error)
}
// Planner interface is used to submit a task allocation plan.

View File

@ -6,6 +6,7 @@ import (
"os"
"sync"
"testing"
"time"
memdb "github.com/hashicorp/go-memdb"
"github.com/hashicorp/nomad/nomad/state"
@ -108,19 +109,27 @@ func (h *Harness) SubmitPlan(plan *structs.Plan) (*structs.PlanResult, State, er
allocs = append(allocs, allocList...)
}
// Attach the plan to all the allocations. It is pulled out in the
// payload to avoid the redundancy of encoding, but should be denormalized
// prior to being inserted into MemDB.
if j := plan.Job; j != nil {
// Set the time the alloc was applied for the first time. This can be used
// to approximate the scheduling time.
now := time.Now().UTC().UnixNano()
for _, alloc := range allocs {
if alloc.Job == nil {
alloc.Job = j
}
if alloc.CreateTime == 0 {
alloc.CreateTime = now
}
}
// Setup the update request
req := structs.ApplyPlanResultsRequest{
AllocUpdateRequest: structs.AllocUpdateRequest{
Job: plan.Job,
Alloc: allocs,
},
CreatedDeployment: plan.CreatedDeployment,
DeploymentUpdates: plan.DeploymentUpdates,
}
// Apply the full plan
err := h.State.UpsertAllocs(index, allocs)
err := h.State.UpsertPlanResults(index, &req)
return result, nil, err
}