Respect alloc job version for lost/failed allocs

This change fixes a bug where lost/failed allocations are replaced by
allocations with the latest versions, even if the version hasn't been
promoted yet.

Now, when generating a plan for lost/failed allocations, the scheduler
first checks if the current deployment is in Canary stage, and if so, it
ensures that any lost/failed allocations is replaced one with the latest
promoted version instead.
This commit is contained in:
Mahmood Ali 2020-08-13 09:35:09 -04:00
parent a49732816c
commit 8a342926b7
10 changed files with 278 additions and 14 deletions

View File

@ -5,6 +5,10 @@ IMPROVEMENTS:
* api: Added node purge SDK functionality. [[GH-8142](https://github.com/hashicorp/nomad/issues/8142)]
* driver/docker: Allow configurable image pull context timeout setting. [[GH-5718](https://github.com/hashicorp/nomad/issues/5718)]
BUG FIXES:
* core: Fixed a bug where unpromoted job versions are used when rescheduling failed allocations [[GH-8691](https://github.com/hashicorp/nomad/issues/8691)]
## 0.12.3 (August 13, 2020)
BUG FIXES:

View File

@ -9830,12 +9830,18 @@ func (p *Plan) PopUpdate(alloc *Allocation) {
}
}
func (p *Plan) AppendAlloc(alloc *Allocation) {
// AppendAlloc appends the alloc to the plan allocations.
// To save space, it clears the Job field so it can be derived from the plan Job.
// If keepJob is true, the normalizatin skipped to accommodate cases where a plan
// needs to support multiple versions of the same job.
func (p *Plan) AppendAlloc(alloc *Allocation, keepJob bool) {
node := alloc.NodeID
existing := p.NodeAllocation[node]
// Normalize the job
alloc.Job = nil
if !keepJob {
alloc.Job = nil
}
p.NodeAllocation[node] = append(existing, alloc)
}

View File

@ -2,6 +2,7 @@ package scheduler
import (
"fmt"
"sort"
"time"
log "github.com/hashicorp/go-hclog"
@ -387,12 +388,12 @@ func (s *GenericScheduler) computeJobAllocs() error {
update.DeploymentID = s.deployment.GetID()
update.DeploymentStatus = nil
}
s.ctx.Plan().AppendAlloc(update)
s.ctx.Plan().AppendAlloc(update, false)
}
// Handle the annotation updates
for _, update := range results.attributeUpdates {
s.ctx.Plan().AppendAlloc(update)
s.ctx.Plan().AppendAlloc(update, false)
}
// Nothing remaining to do if placement is not required
@ -429,6 +430,32 @@ func (s *GenericScheduler) computeJobAllocs() error {
return s.computePlacements(destructive, place)
}
// downgradedJobForPlacement returns the job appropriate for non-canary placement replacement
func (s *GenericScheduler) downgradedJobForPlacement(p placementResult) (string, *structs.Job, error) {
ns, jobID := s.job.Namespace, s.job.ID
tgName := p.TaskGroup().Name
// find deployments and use the latest promoted or canaried version
deployments, err := s.state.DeploymentsByJobID(nil, ns, jobID, false)
if err != nil {
return "", nil, fmt.Errorf("failed to lookup job deployments: %v", err)
}
sort.Slice(deployments, func(i, j int) bool { return deployments[i].JobVersion > deployments[i].JobVersion })
for _, d := range deployments {
// It's unexpected to have a recent deployment that doesn't contain the TaskGroup; as all allocations
// should be destroyed. In such cases, attempt to find the deployment for that TaskGroup and hopefully
// we will kill it soon. This is a defensive measure, have not seen it in practice
//
// Zero dstate.DesiredCanaries indicates that the TaskGroup allocates were updated in-place without using canaries.
if dstate := d.TaskGroups[tgName]; dstate != nil && (dstate.Promoted || dstate.DesiredCanaries == 0) {
job, err := s.state.JobByIDAndVersion(nil, ns, jobID, d.JobVersion)
return d.ID, job, err
}
}
return "", nil, nil
}
// computePlacements computes placements for allocations. It is given the set of
// destructive updates to place and the set of new placements to place.
func (s *GenericScheduler) computePlacements(destructive, place []placementResult) error {
@ -457,6 +484,31 @@ func (s *GenericScheduler) computePlacements(destructive, place []placementResul
// Get the task group
tg := missing.TaskGroup()
var downgradedJob *structs.Job
if missing.DowngradeNonCanary() {
jobDeploymentID, job, err := s.downgradedJobForPlacement(missing)
if err != nil {
return err
}
// Defensive check - if there is no appropriate deployment for this job, use the latest
if job != nil && job.Version >= missing.MinJobVersion() && job.LookupTaskGroup(tg.Name) != nil {
tg = job.LookupTaskGroup(tg.Name)
downgradedJob = job
deploymentID = jobDeploymentID
// ensure we are operating on the correct job
s.stack.SetJob(job)
} else {
jobVersion := -1
if job != nil {
jobVersion = int(job.Version)
}
s.logger.Warn("failed to find appropriate job; using the latest", "expected_version", missing.MinJobVersion, "found_version", jobVersion)
}
}
// Check if this task group has already failed
if metric, ok := s.failedTGAllocs[tg.Name]; ok {
metric.CoalescedFailures += 1
@ -489,6 +541,11 @@ func (s *GenericScheduler) computePlacements(destructive, place []placementResul
// Compute top K scoring node metadata
s.ctx.Metrics().PopulateScoreMetaData()
// restore stack to use the latest job version again
if downgradedJob != nil {
s.stack.SetJob(s.job)
}
// Set fields based on if we found an allocation option
if option != nil {
resources := &structs.AllocatedResources{
@ -544,10 +601,14 @@ func (s *GenericScheduler) computePlacements(destructive, place []placementResul
}
}
if downgradedJob != nil {
alloc.Job = downgradedJob
}
s.handlePreemptions(option, alloc, missing)
// Track the placement
s.plan.AppendAlloc(alloc)
s.plan.AppendAlloc(alloc, downgradedJob != nil)
} else {
// Lazy initialize the failed map

View File

@ -5342,3 +5342,162 @@ func TestServiceSched_Preemption(t *testing.T) {
}
require.Equal(expectedPreemptedAllocs, actualPreemptedAllocs)
}
// TestServiceSched_Migrate_CanaryStatus asserts that migrations/rescheduling
// of allocations use the proper versions of allocs rather than latest:
// Canaries should be replaced by canaries, and non-canaries should be replaced
// with the latest promoted version.
func TestServiceSched_Migrate_CanaryStatus(t *testing.T) {
h := NewHarness(t)
node1 := mock.Node()
require.NoError(t, h.State.UpsertNode(h.NextIndex(), node1))
totalCount := 3
desiredCanaries := 1
job := mock.Job()
job.Stable = true
job.TaskGroups[0].Count = totalCount
job.TaskGroups[0].Update = &structs.UpdateStrategy{
MaxParallel: 1,
Canary: desiredCanaries,
}
require.NoError(t, h.State.UpsertJob(h.NextIndex(), job))
deployment := &structs.Deployment{
ID: uuid.Generate(),
JobID: job.ID,
Namespace: job.Namespace,
JobVersion: job.Version,
JobModifyIndex: job.JobModifyIndex,
JobCreateIndex: job.CreateIndex,
TaskGroups: map[string]*structs.DeploymentState{
"web": {DesiredTotal: totalCount},
},
Status: structs.DeploymentStatusSuccessful,
StatusDescription: structs.DeploymentStatusDescriptionSuccessful,
}
require.NoError(t, h.State.UpsertDeployment(h.NextIndex(), deployment))
var allocs []*structs.Allocation
for i := 0; i < 3; i++ {
alloc := mock.Alloc()
alloc.Job = job
alloc.JobID = job.ID
alloc.NodeID = node1.ID
alloc.DeploymentID = deployment.ID
alloc.Name = fmt.Sprintf("my-job.web[%d]", i)
allocs = append(allocs, alloc)
}
require.NoError(t, h.State.UpsertAllocs(h.NextIndex(), allocs))
// new update with new task group
job2 := job.Copy()
job2.Stable = false
job2.TaskGroups[0].Tasks[0].Config["command"] = "/bin/other"
require.NoError(t, h.State.UpsertJob(h.NextIndex(), job2))
// Create a mock evaluation
eval := &structs.Evaluation{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Priority: 50,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: job.ID,
Status: structs.EvalStatusPending,
}
require.NoError(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval}))
// Process the evaluation
err := h.Process(NewServiceScheduler, eval)
require.NoError(t, err)
// Ensure a single plan
require.Len(t, h.Plans, 1)
plan := h.Plans[0]
// Ensure a deployment was created
require.NotNil(t, plan.Deployment)
updateDeployment := plan.Deployment.ID
// Check status first - should be 4 allocs, only one is canary
{
ws := memdb.NewWatchSet()
allocs, err := h.State.AllocsByJob(ws, job.Namespace, job.ID, true)
require.NoError(t, err)
require.Len(t, allocs, 4)
sort.Slice(allocs, func(i, j int) bool { return allocs[i].CreateIndex < allocs[j].CreateIndex })
for _, a := range allocs[:3] {
require.Equal(t, structs.AllocDesiredStatusRun, a.DesiredStatus)
require.Equal(t, uint64(0), a.Job.Version)
require.False(t, a.DeploymentStatus.IsCanary())
require.Equal(t, node1.ID, a.NodeID)
require.Equal(t, deployment.ID, a.DeploymentID)
}
require.Equal(t, structs.AllocDesiredStatusRun, allocs[3].DesiredStatus)
require.Equal(t, uint64(1), allocs[3].Job.Version)
require.True(t, allocs[3].DeploymentStatus.Canary)
require.Equal(t, node1.ID, allocs[3].NodeID)
require.Equal(t, updateDeployment, allocs[3].DeploymentID)
}
// now, drain node1 and ensure all are migrated to node2
node1 = node1.Copy()
node1.Status = structs.NodeStatusDown
require.NoError(t, h.State.UpsertNode(h.NextIndex(), node1))
node2 := mock.Node()
require.NoError(t, h.State.UpsertNode(h.NextIndex(), node2))
neval := &structs.Evaluation{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Priority: 50,
TriggeredBy: structs.EvalTriggerNodeUpdate,
NodeID: node1.ID,
JobID: job.ID,
Status: structs.EvalStatusPending,
}
require.NoError(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{neval}))
// Process the evaluation
err = h.Process(NewServiceScheduler, eval)
require.NoError(t, err)
// Now test that all node1 allocs are migrated while preserving Version and Canary info
{
ws := memdb.NewWatchSet()
allocs, err := h.State.AllocsByJob(ws, job.Namespace, job.ID, true)
require.NoError(t, err)
require.Len(t, allocs, 8)
nodeAllocs := map[string][]*structs.Allocation{}
for _, a := range allocs {
nodeAllocs[a.NodeID] = append(nodeAllocs[a.NodeID], a)
}
require.Len(t, nodeAllocs[node1.ID], 4)
for _, a := range nodeAllocs[node1.ID] {
require.Equal(t, structs.AllocDesiredStatusStop, a.DesiredStatus)
require.Equal(t, node1.ID, a.NodeID)
}
node2Allocs := nodeAllocs[node2.ID]
require.Len(t, node2Allocs, 4)
sort.Slice(node2Allocs, func(i, j int) bool { return node2Allocs[i].Job.Version < node2Allocs[j].Job.Version })
for _, a := range node2Allocs[:3] {
require.Equal(t, structs.AllocDesiredStatusRun, a.DesiredStatus)
require.Equal(t, uint64(0), a.Job.Version)
require.Equal(t, node2.ID, a.NodeID)
require.Equal(t, deployment.ID, a.DeploymentID)
}
require.Equal(t, structs.AllocDesiredStatusRun, node2Allocs[3].DesiredStatus)
require.Equal(t, uint64(1), node2Allocs[3].Job.Version)
require.Equal(t, node2.ID, node2Allocs[3].NodeID)
require.Equal(t, updateDeployment, node2Allocs[3].DeploymentID)
}
}

View File

@ -424,10 +424,10 @@ func (a *allocReconciler) computeGroup(group string, all allocSet) bool {
// The fact that we have destructive updates and have less canaries than is
// desired means we need to create canaries
numDestructive := len(destructive)
strategy := tg.Update
canariesPromoted := dstate != nil && dstate.Promoted
requireCanary := numDestructive != 0 && strategy != nil && len(canaries) < strategy.Canary && !canariesPromoted
requireCanary := (len(destructive) != 0 || (len(untainted) == 0 && len(migrate)+len(lost) != 0)) &&
strategy != nil && len(canaries) < strategy.Canary && !canariesPromoted
if requireCanary {
dstate.DesiredCanaries = strategy.Canary
}
@ -455,7 +455,7 @@ func (a *allocReconciler) computeGroup(group string, all allocSet) bool {
// * There is no delayed stop_after_client_disconnect alloc, which delays scheduling for the whole group
var place []allocPlaceResult
if len(lostLater) == 0 {
place = a.computePlacements(tg, nameIndex, untainted, migrate, rescheduleNow)
place = a.computePlacements(tg, nameIndex, untainted, migrate, rescheduleNow, canaryState)
if !existingDeployment {
dstate.DesiredTotal += len(place)
}
@ -533,9 +533,12 @@ func (a *allocReconciler) computeGroup(group string, all allocSet) bool {
})
a.result.place = append(a.result.place, allocPlaceResult{
name: alloc.Name,
canary: false,
canary: alloc.DeploymentStatus.IsCanary(),
taskGroup: tg,
previousAlloc: alloc,
downgradeNonCanary: canaryState && !alloc.DeploymentStatus.IsCanary(),
minJobVersion: alloc.Job.Version,
})
}
@ -708,7 +711,7 @@ func (a *allocReconciler) computeLimit(group *structs.TaskGroup, untainted, dest
// computePlacement returns the set of allocations to place given the group
// definition, the set of untainted, migrating and reschedule allocations for the group.
func (a *allocReconciler) computePlacements(group *structs.TaskGroup,
nameIndex *allocNameIndex, untainted, migrate allocSet, reschedule allocSet) []allocPlaceResult {
nameIndex *allocNameIndex, untainted, migrate allocSet, reschedule allocSet, canaryState bool) []allocPlaceResult {
// Add rescheduled placement results
var place []allocPlaceResult
@ -719,6 +722,9 @@ func (a *allocReconciler) computePlacements(group *structs.TaskGroup,
previousAlloc: alloc,
reschedule: true,
canary: alloc.DeploymentStatus.IsCanary(),
downgradeNonCanary: canaryState && (alloc.DeploymentStatus == nil || !alloc.DeploymentStatus.IsCanary()),
minJobVersion: alloc.Job.Version,
})
}
@ -732,8 +738,9 @@ func (a *allocReconciler) computePlacements(group *structs.TaskGroup,
if existing < group.Count {
for _, name := range nameIndex.Next(uint(group.Count - existing)) {
place = append(place, allocPlaceResult{
name: name,
taskGroup: group,
name: name,
taskGroup: group,
downgradeNonCanary: canaryState,
})
}
}

View File

@ -34,6 +34,12 @@ type placementResult interface {
// StopPreviousAlloc returns whether the previous allocation should be
// stopped and if so the status description.
StopPreviousAlloc() (bool, string)
// DowngradeNonCanary indicates that placement should use the latest stable job
// with the MinJobVersion, rather than the current deployment version
DowngradeNonCanary() bool
MinJobVersion() uint64
}
// allocStopResult contains the information required to stop a single allocation
@ -52,6 +58,9 @@ type allocPlaceResult struct {
taskGroup *structs.TaskGroup
previousAlloc *structs.Allocation
reschedule bool
downgradeNonCanary bool
minJobVersion uint64
}
func (a allocPlaceResult) TaskGroup() *structs.TaskGroup { return a.taskGroup }
@ -60,6 +69,8 @@ func (a allocPlaceResult) Canary() bool { return a.ca
func (a allocPlaceResult) PreviousAllocation() *structs.Allocation { return a.previousAlloc }
func (a allocPlaceResult) IsRescheduling() bool { return a.reschedule }
func (a allocPlaceResult) StopPreviousAlloc() (bool, string) { return false, "" }
func (a allocPlaceResult) DowngradeNonCanary() bool { return a.downgradeNonCanary }
func (a allocPlaceResult) MinJobVersion() uint64 { return a.minJobVersion }
// allocDestructiveResult contains the information required to do a destructive
// update. Destructive changes should be applied atomically, as in the old alloc
@ -79,6 +90,8 @@ func (a allocDestructiveResult) IsRescheduling() bool { retur
func (a allocDestructiveResult) StopPreviousAlloc() (bool, string) {
return true, a.stopStatusDescription
}
func (a allocDestructiveResult) DowngradeNonCanary() bool { return false }
func (a allocDestructiveResult) MinJobVersion() uint64 { return 0 }
// allocMatrix is a mapping of task groups to their allocation set.
type allocMatrix map[string]allocSet

View File

@ -88,6 +88,12 @@ type State interface {
// GetJobByID is used to lookup a job by ID
JobByID(ws memdb.WatchSet, namespace, id string) (*structs.Job, error)
// DeploymentsByJobID returns the deployments associated with the job
DeploymentsByJobID(ws memdb.WatchSet, namespace, jobID string, all bool) ([]*structs.Deployment, error)
// JobByIDAndVersion returns the job associated with id and specific version
JobByIDAndVersion(ws memdb.WatchSet, namespace, id string, version uint64) (*structs.Job, error)
// LatestDeploymentByJobID returns the latest deployment matching the given
// job ID
LatestDeploymentByJobID(ws memdb.WatchSet, namespace, jobID string) (*structs.Deployment, error)

View File

@ -46,6 +46,7 @@ type GenericStack struct {
wrappedChecks *FeasibilityWrapper
quota FeasibleIterator
jobVersion *uint64
jobConstraint *ConstraintChecker
taskGroupDrivers *DriverChecker
taskGroupConstraint *ConstraintChecker
@ -89,6 +90,13 @@ func (s *GenericStack) SetNodes(baseNodes []*structs.Node) {
}
func (s *GenericStack) SetJob(job *structs.Job) {
if s.jobVersion != nil && *s.jobVersion == job.Version {
return
}
jobVer := job.Version
s.jobVersion = &jobVer
s.jobConstraint.SetConstraints(job.Constraints)
s.distinctHostsConstraint.SetJob(job)
s.distinctPropertyConstraint.SetJob(job)

View File

@ -398,7 +398,7 @@ func (s *SystemScheduler) computePlacements(place []allocTuple) error {
alloc.PreemptedAllocations = preemptedAllocIDs
}
s.plan.AppendAlloc(alloc)
s.plan.AppendAlloc(alloc, false)
}
return nil

View File

@ -655,7 +655,7 @@ func inplaceUpdate(ctx Context, eval *structs.Evaluation, job *structs.Job,
},
}
newAlloc.Metrics = ctx.Metrics()
ctx.Plan().AppendAlloc(newAlloc)
ctx.Plan().AppendAlloc(newAlloc, false)
// Remove this allocation from the slice
doInplace(&i, &n, &inplaceCount)