open-nomad/scheduler/reconcile.go

1368 lines
49 KiB
Go

package scheduler
import (
"fmt"
"sort"
"time"
"github.com/armon/go-metrics"
log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/structs"
)
const (
// batchedFailedAllocWindowSize is the window size used
// to batch up failed allocations before creating an eval
batchedFailedAllocWindowSize = 5 * time.Second
// rescheduleWindowSize is the window size relative to
// current time within which reschedulable allocations are placed.
// This helps protect against small clock drifts between servers
rescheduleWindowSize = 1 * time.Second
)
// allocUpdateType takes an existing allocation and a new job definition and
// returns whether the allocation can ignore the change, requires a destructive
// update, or can be inplace updated. If it can be inplace updated, an updated
// allocation that has the new resources and alloc metrics attached will be
// returned.
type allocUpdateType func(existing *structs.Allocation, newJob *structs.Job,
newTG *structs.TaskGroup) (ignore, destructive bool, updated *structs.Allocation)
// allocReconciler is used to determine the set of allocations that require
// placement, inplace updating or stopping given the job specification and
// existing cluster state. The reconciler should only be used for batch and
// service jobs.
type allocReconciler struct {
// logger is used to log debug information. Logging should be kept at a
// minimal here
logger log.Logger
// canInplace is used to check if the allocation can be inplace upgraded
allocUpdateFn allocUpdateType
// batch marks whether the job is a batch job
batch bool
// job is the job being operated on, it may be nil if the job is being
// stopped via a purge
job *structs.Job
// jobID is the ID of the job being operated on. The job may be nil if it is
// being stopped so we require this separately.
jobID string
// oldDeployment is the last deployment for the job
oldDeployment *structs.Deployment
// deployment is the current deployment for the job
deployment *structs.Deployment
// deploymentPaused marks whether the deployment is paused
deploymentPaused bool
// deploymentFailed marks whether the deployment is failed
deploymentFailed bool
// taintedNodes contains a map of nodes that are tainted
taintedNodes map[string]*structs.Node
// existingAllocs is non-terminal existing allocations
existingAllocs []*structs.Allocation
// evalID and evalPriority is the ID and Priority of the evaluation that
// triggered the reconciler.
evalID string
evalPriority int
// supportsDisconnectedClients indicates whether all servers meet the required
// minimum version to allow application of max_client_disconnect configuration.
supportsDisconnectedClients bool
// now is the time used when determining rescheduling eligibility
// defaults to time.Now, and overridden in unit tests
now time.Time
// result is the results of the reconcile. During computation it can be
// used to store intermediate state
result *reconcileResults
}
// reconcileResults contains the results of the reconciliation and should be
// applied by the scheduler.
type reconcileResults struct {
// deployment is the deployment that should be created or updated as a
// result of scheduling
deployment *structs.Deployment
// deploymentUpdates contains a set of deployment updates that should be
// applied as a result of scheduling
deploymentUpdates []*structs.DeploymentStatusUpdate
// place is the set of allocations to place by the scheduler
place []allocPlaceResult
// destructiveUpdate is the set of allocations to apply a destructive update to
destructiveUpdate []allocDestructiveResult
// inplaceUpdate is the set of allocations to apply an inplace update to
inplaceUpdate []*structs.Allocation
// stop is the set of allocations to stop
stop []allocStopResult
// attributeUpdates are updates to the allocation that are not from a
// jobspec change.
attributeUpdates map[string]*structs.Allocation
// disconnectUpdates is the set of allocations are on disconnected nodes, but
// have not yet had their ClientStatus set to AllocClientStatusUnknown.
disconnectUpdates map[string]*structs.Allocation
// reconnectUpdates is the set of allocations that have ClientStatus set to
// AllocClientStatusUnknown, but the associated Node has reconnected.
reconnectUpdates map[string]*structs.Allocation
// desiredTGUpdates captures the desired set of changes to make for each
// task group.
desiredTGUpdates map[string]*structs.DesiredUpdates
// desiredFollowupEvals is the map of follow up evaluations to create per task group
// This is used to create a delayed evaluation for rescheduling failed allocations.
desiredFollowupEvals map[string][]*structs.Evaluation
}
// delayedRescheduleInfo contains the allocation id and a time when its eligible to be rescheduled.
// this is used to create follow up evaluations
type delayedRescheduleInfo struct {
// allocID is the ID of the allocation eligible to be rescheduled
allocID string
alloc *structs.Allocation
// rescheduleTime is the time to use in the delayed evaluation
rescheduleTime time.Time
}
func (r *reconcileResults) GoString() string {
base := fmt.Sprintf("Total changes: (place %d) (destructive %d) (inplace %d) (stop %d) (disconnect %d) (reconnect %d)",
len(r.place), len(r.destructiveUpdate), len(r.inplaceUpdate), len(r.stop), len(r.disconnectUpdates), len(r.reconnectUpdates))
if r.deployment != nil {
base += fmt.Sprintf("\nCreated Deployment: %q", r.deployment.ID)
}
for _, u := range r.deploymentUpdates {
base += fmt.Sprintf("\nDeployment Update for ID %q: Status %q; Description %q",
u.DeploymentID, u.Status, u.StatusDescription)
}
for tg, u := range r.desiredTGUpdates {
base += fmt.Sprintf("\nDesired Changes for %q: %#v", tg, u)
}
return base
}
// Changes returns the number of total changes
func (r *reconcileResults) Changes() int {
return len(r.place) + len(r.inplaceUpdate) + len(r.stop)
}
// NewAllocReconciler creates a new reconciler that should be used to determine
// the changes required to bring the cluster state inline with the declared jobspec
func NewAllocReconciler(logger log.Logger, allocUpdateFn allocUpdateType, batch bool,
jobID string, job *structs.Job, deployment *structs.Deployment,
existingAllocs []*structs.Allocation, taintedNodes map[string]*structs.Node, evalID string,
evalPriority int, supportsDisconnectedClients bool) *allocReconciler {
return &allocReconciler{
logger: logger.Named("reconciler"),
allocUpdateFn: allocUpdateFn,
batch: batch,
jobID: jobID,
job: job,
deployment: deployment.Copy(),
existingAllocs: existingAllocs,
taintedNodes: taintedNodes,
evalID: evalID,
evalPriority: evalPriority,
supportsDisconnectedClients: supportsDisconnectedClients,
now: time.Now(),
result: &reconcileResults{
attributeUpdates: make(map[string]*structs.Allocation),
disconnectUpdates: make(map[string]*structs.Allocation),
reconnectUpdates: make(map[string]*structs.Allocation),
desiredTGUpdates: make(map[string]*structs.DesiredUpdates),
desiredFollowupEvals: make(map[string][]*structs.Evaluation),
},
}
}
// Compute reconciles the existing cluster state and returns the set of changes
// required to converge the job spec and state
func (a *allocReconciler) Compute() *reconcileResults {
// Create the allocation matrix
m := newAllocMatrix(a.job, a.existingAllocs)
a.cancelUnneededDeployments()
// If we are just stopping a job we do not need to do anything more than
// stopping all running allocs
if a.job.Stopped() {
a.handleStop(m)
return a.result
}
a.computeDeploymentPaused()
deploymentComplete := a.computeDeploymentComplete(m)
a.computeDeploymentUpdates(deploymentComplete)
return a.result
}
func (a *allocReconciler) computeDeploymentComplete(m allocMatrix) bool {
complete := true
for group, as := range m {
groupComplete := a.computeGroup(group, as)
complete = complete && groupComplete
}
return complete
}
func (a *allocReconciler) computeDeploymentUpdates(deploymentComplete bool) {
// Mark the deployment as complete if possible
if a.deployment != nil && deploymentComplete {
if a.job.IsMultiregion() {
// the unblocking/successful states come after blocked, so we
// need to make sure we don't revert those states
if a.deployment.Status != structs.DeploymentStatusUnblocking &&
a.deployment.Status != structs.DeploymentStatusSuccessful {
a.result.deploymentUpdates = append(a.result.deploymentUpdates, &structs.DeploymentStatusUpdate{
DeploymentID: a.deployment.ID,
Status: structs.DeploymentStatusBlocked,
StatusDescription: structs.DeploymentStatusDescriptionBlocked,
})
}
} else {
a.result.deploymentUpdates = append(a.result.deploymentUpdates, &structs.DeploymentStatusUpdate{
DeploymentID: a.deployment.ID,
Status: structs.DeploymentStatusSuccessful,
StatusDescription: structs.DeploymentStatusDescriptionSuccessful,
})
}
}
// Set the description of a created deployment
if d := a.result.deployment; d != nil {
if d.RequiresPromotion() {
if d.HasAutoPromote() {
d.StatusDescription = structs.DeploymentStatusDescriptionRunningAutoPromotion
} else {
d.StatusDescription = structs.DeploymentStatusDescriptionRunningNeedsPromotion
}
}
}
}
func (a *allocReconciler) computeDeploymentPaused() {
if a.deployment != nil {
a.deploymentPaused = a.deployment.Status == structs.DeploymentStatusPaused ||
a.deployment.Status == structs.DeploymentStatusPending
a.deploymentFailed = a.deployment.Status == structs.DeploymentStatusFailed
}
if a.deployment == nil {
// When we create the deployment later, it will be in a pending
// state. But we also need to tell Compute we're paused, otherwise we
// make placements on the paused deployment.
if a.job.IsMultiregion() && !(a.job.IsPeriodic() || a.job.IsParameterized()) {
a.deploymentPaused = true
}
}
}
// cancelUnneededDeployments cancels any deployment that is not needed. If the
// current deployment is not needed the deployment field is set to nil. A deployment
// update will be staged for jobs that should stop or have the wrong version.
// Unneeded deployments include:
// 1. Jobs that are marked for stop, but there is a non-terminal deployment.
// 2. Deployments that are active, but referencing a different job version.
// 3. Deployments that are already successful.
func (a *allocReconciler) cancelUnneededDeployments() {
// If the job is stopped and there is a non-terminal deployment, cancel it
if a.job.Stopped() {
if a.deployment != nil && a.deployment.Active() {
a.result.deploymentUpdates = append(a.result.deploymentUpdates, &structs.DeploymentStatusUpdate{
DeploymentID: a.deployment.ID,
Status: structs.DeploymentStatusCancelled,
StatusDescription: structs.DeploymentStatusDescriptionStoppedJob,
})
}
// Nothing else to do
a.oldDeployment = a.deployment
a.deployment = nil
return
}
d := a.deployment
if d == nil {
return
}
// Check if the deployment is active and referencing an older job and cancel it
if d.JobCreateIndex != a.job.CreateIndex || d.JobVersion != a.job.Version {
if d.Active() {
a.result.deploymentUpdates = append(a.result.deploymentUpdates, &structs.DeploymentStatusUpdate{
DeploymentID: a.deployment.ID,
Status: structs.DeploymentStatusCancelled,
StatusDescription: structs.DeploymentStatusDescriptionNewerJob,
})
}
a.oldDeployment = d
a.deployment = nil
}
// Clear it as the current deployment if it is successful
if d.Status == structs.DeploymentStatusSuccessful {
a.oldDeployment = d
a.deployment = nil
}
}
// handleStop marks all allocations to be stopped, handling the lost case
func (a *allocReconciler) handleStop(m allocMatrix) {
for group, as := range m {
as = filterByTerminal(as)
desiredChanges := new(structs.DesiredUpdates)
desiredChanges.Stop = a.filterAndStopAll(as)
a.result.desiredTGUpdates[group] = desiredChanges
}
}
// filterAndStopAll stops all allocations in an allocSet. This is useful in when
// stopping an entire job or task group.
func (a *allocReconciler) filterAndStopAll(set allocSet) uint64 {
untainted, migrate, lost, disconnecting, reconnecting, _ := set.filterByTainted(a.taintedNodes, a.supportsDisconnectedClients, a.now)
a.markStop(untainted, "", allocNotNeeded)
a.markStop(migrate, "", allocNotNeeded)
a.markStop(lost, structs.AllocClientStatusLost, allocLost)
a.markStop(disconnecting, "", allocNotNeeded)
a.markStop(reconnecting, "", allocNotNeeded)
return uint64(len(set))
}
// markStop is a helper for marking a set of allocation for stop with a
// particular client status and description.
func (a *allocReconciler) markStop(allocs allocSet, clientStatus, statusDescription string) {
for _, alloc := range allocs {
a.result.stop = append(a.result.stop, allocStopResult{
alloc: alloc,
clientStatus: clientStatus,
statusDescription: statusDescription,
})
}
}
// markDelayed does markStop, but optionally includes a FollowupEvalID so that we can update
// the stopped alloc with its delayed rescheduling evalID
func (a *allocReconciler) markDelayed(allocs allocSet, clientStatus, statusDescription string, followupEvals map[string]string) {
for _, alloc := range allocs {
a.result.stop = append(a.result.stop, allocStopResult{
alloc: alloc,
clientStatus: clientStatus,
statusDescription: statusDescription,
followupEvalID: followupEvals[alloc.ID],
})
}
}
// computeGroup reconciles state for a particular task group. It returns whether
// the deployment it is for is complete with regards to the task group.
func (a *allocReconciler) computeGroup(groupName string, all allocSet) bool {
// Create the desired update object for the group
desiredChanges := new(structs.DesiredUpdates)
a.result.desiredTGUpdates[groupName] = desiredChanges
// Get the task group. The task group may be nil if the job was updates such
// that the task group no longer exists
tg := a.job.LookupTaskGroup(groupName)
// If the task group is nil, then the task group has been removed so all we
// need to do is stop everything
if tg == nil {
desiredChanges.Stop = a.filterAndStopAll(all)
return true
}
dstate, existingDeployment := a.initializeDeploymentState(groupName, tg)
// Filter allocations that do not need to be considered because they are
// from an older job version and are terminal.
all, ignore := a.filterOldTerminalAllocs(all)
desiredChanges.Ignore += uint64(len(ignore))
canaries, all := a.cancelUnneededCanaries(all, desiredChanges)
// Determine what set of allocations are on tainted nodes
untainted, migrate, lost, disconnecting, reconnecting, ignore := all.filterByTainted(a.taintedNodes, a.supportsDisconnectedClients, a.now)
desiredChanges.Ignore += uint64(len(ignore))
// Determine what set of terminal allocations need to be rescheduled
untainted, rescheduleNow, rescheduleLater := untainted.filterByRescheduleable(a.batch, false, a.now, a.evalID, a.deployment)
// Determine what set of disconnecting allocations need to be rescheduled
_, rescheduleDisconnecting, _ := disconnecting.filterByRescheduleable(a.batch, true, a.now, a.evalID, a.deployment)
rescheduleNow = rescheduleNow.union(rescheduleDisconnecting)
// Find delays for any lost allocs that have stop_after_client_disconnect
lostLater := lost.delayByStopAfterClientDisconnect()
lostLaterEvals := a.createLostLaterEvals(lostLater, tg.Name)
// Find delays for any disconnecting allocs that have max_client_disconnect,
// create followup evals, and update the ClientStatus to unknown.
timeoutLaterEvals := a.createTimeoutLaterEvals(disconnecting, tg.Name)
// Merge disconnecting with the stop_after_client_disconnect set into the
// lostLaterEvals so that computeStop can add them to the stop set.
lostLaterEvals = helper.MergeMapStringString(lostLaterEvals, timeoutLaterEvals)
// Create batched follow-up evaluations for allocations that are
// reschedulable later and mark the allocations for in place updating
a.createRescheduleLaterEvals(rescheduleLater, all, tg.Name)
// Create a structure for choosing names. Seed with the taken names
// which is the union of untainted, rescheduled, allocs on migrating
// nodes, and allocs on down nodes (includes canaries)
nameIndex := newAllocNameIndex(a.jobID, groupName, tg.Count, untainted.union(migrate, rescheduleNow, lost))
// Stop any unneeded allocations and update the untainted set to not
// include stopped allocations.
isCanarying := dstate != nil && dstate.DesiredCanaries != 0 && !dstate.Promoted
stop, reconnecting := a.computeStop(tg, nameIndex, untainted, migrate, lost, canaries, reconnecting, isCanarying, lostLaterEvals)
desiredChanges.Stop += uint64(len(stop))
untainted = untainted.difference(stop)
// Validate and add reconnecting allocs to the plan so that they will be logged.
a.computeReconnecting(reconnecting)
desiredChanges.Ignore += uint64(len(a.result.reconnectUpdates))
// Do inplace upgrades where possible and capture the set of upgrades that
// need to be done destructively.
ignore, inplace, destructive := a.computeUpdates(tg, untainted)
desiredChanges.Ignore += uint64(len(ignore))
desiredChanges.InPlaceUpdate += uint64(len(inplace))
if !existingDeployment {
dstate.DesiredTotal += len(destructive) + len(inplace)
}
// Remove the canaries now that we have handled rescheduling so that we do
// not consider them when making placement decisions.
if isCanarying {
untainted = untainted.difference(canaries)
}
requiresCanaries := a.requiresCanaries(tg, dstate, destructive, canaries)
if requiresCanaries {
a.computeCanaries(tg, dstate, destructive, canaries, desiredChanges, nameIndex)
}
// Determine how many non-canary allocs we can place
isCanarying = dstate != nil && dstate.DesiredCanaries != 0 && !dstate.Promoted
underProvisionedBy := a.computeUnderProvisionedBy(tg, untainted, destructive, migrate, isCanarying)
// Place if:
// * The deployment is not paused or failed
// * Not placing any canaries
// * If there are any canaries that they have been promoted
// * There is no delayed stop_after_client_disconnect alloc, which delays scheduling for the whole group
// * An alloc was lost
// * There is not a corresponding reconnecting alloc.
var place []allocPlaceResult
if len(lostLater) == 0 {
place = a.computePlacements(tg, nameIndex, untainted, migrate, rescheduleNow, lost, reconnecting, isCanarying)
if !existingDeployment {
dstate.DesiredTotal += len(place)
}
}
// deploymentPlaceReady tracks whether the deployment is in a state where
// placements can be made without any other consideration.
deploymentPlaceReady := !a.deploymentPaused && !a.deploymentFailed && !isCanarying
underProvisionedBy = a.computeReplacements(deploymentPlaceReady, desiredChanges, place, rescheduleNow, lost, underProvisionedBy)
if deploymentPlaceReady {
a.computeDestructiveUpdates(destructive, underProvisionedBy, desiredChanges, tg)
} else {
desiredChanges.Ignore += uint64(len(destructive))
}
a.computeMigrations(desiredChanges, migrate, tg, isCanarying)
a.createDeployment(tg.Name, tg.Update, existingDeployment, dstate, all, destructive)
deploymentComplete := a.isDeploymentComplete(groupName, destructive, inplace,
migrate, rescheduleNow, place, rescheduleLater, requiresCanaries)
return deploymentComplete
}
func (a *allocReconciler) initializeDeploymentState(group string, tg *structs.TaskGroup) (*structs.DeploymentState, bool) {
var dstate *structs.DeploymentState
existingDeployment := false
if a.deployment != nil {
dstate, existingDeployment = a.deployment.TaskGroups[group]
}
if !existingDeployment {
dstate = &structs.DeploymentState{}
if !tg.Update.IsEmpty() {
dstate.AutoRevert = tg.Update.AutoRevert
dstate.AutoPromote = tg.Update.AutoPromote
dstate.ProgressDeadline = tg.Update.ProgressDeadline
}
}
return dstate, existingDeployment
}
// If we have destructive updates, and have fewer canaries than is desired, we need to create canaries.
func (a *allocReconciler) requiresCanaries(tg *structs.TaskGroup, dstate *structs.DeploymentState, destructive, canaries allocSet) bool {
canariesPromoted := dstate != nil && dstate.Promoted
return tg.Update != nil &&
len(destructive) != 0 &&
len(canaries) < tg.Update.Canary &&
!canariesPromoted
}
func (a *allocReconciler) computeCanaries(tg *structs.TaskGroup, dstate *structs.DeploymentState,
destructive, canaries allocSet, desiredChanges *structs.DesiredUpdates, nameIndex *allocNameIndex) {
dstate.DesiredCanaries = tg.Update.Canary
if !a.deploymentPaused && !a.deploymentFailed {
desiredChanges.Canary += uint64(tg.Update.Canary - len(canaries))
for _, name := range nameIndex.NextCanaries(uint(desiredChanges.Canary), canaries, destructive) {
a.result.place = append(a.result.place, allocPlaceResult{
name: name,
canary: true,
taskGroup: tg,
})
}
}
}
// filterOldTerminalAllocs filters allocations that should be ignored since they
// are allocations that are terminal from a previous job version.
func (a *allocReconciler) filterOldTerminalAllocs(all allocSet) (filtered, ignore allocSet) {
if !a.batch {
return all, nil
}
filtered = filtered.union(all)
ignored := make(map[string]*structs.Allocation)
// Ignore terminal batch jobs from older versions
for id, alloc := range filtered {
older := alloc.Job.Version < a.job.Version || alloc.Job.CreateIndex < a.job.CreateIndex
if older && alloc.TerminalStatus() {
delete(filtered, id)
ignored[id] = alloc
}
}
return filtered, ignored
}
// cancelUnneededCanaries handles the canaries for the group by stopping the
// unneeded ones and returning the current set of canaries and the updated total
// set of allocs for the group
func (a *allocReconciler) cancelUnneededCanaries(original allocSet, desiredChanges *structs.DesiredUpdates) (canaries, all allocSet) {
// Stop any canary from an older deployment or from a failed one
var stop []string
all = original
// Cancel any non-promoted canaries from the older deployment
if a.oldDeployment != nil {
for _, dstate := range a.oldDeployment.TaskGroups {
if !dstate.Promoted {
stop = append(stop, dstate.PlacedCanaries...)
}
}
}
// Cancel any non-promoted canaries from a failed deployment
if a.deployment != nil && a.deployment.Status == structs.DeploymentStatusFailed {
for _, dstate := range a.deployment.TaskGroups {
if !dstate.Promoted {
stop = append(stop, dstate.PlacedCanaries...)
}
}
}
// stopSet is the allocSet that contains the canaries we desire to stop from
// above.
stopSet := all.fromKeys(stop)
a.markStop(stopSet, "", allocNotNeeded)
desiredChanges.Stop += uint64(len(stopSet))
all = all.difference(stopSet)
// Capture our current set of canaries and handle any migrations that are
// needed by just stopping them.
if a.deployment != nil {
var canaryIDs []string
for _, dstate := range a.deployment.TaskGroups {
canaryIDs = append(canaryIDs, dstate.PlacedCanaries...)
}
canaries = all.fromKeys(canaryIDs)
untainted, migrate, lost, _, _, _ := canaries.filterByTainted(a.taintedNodes, a.supportsDisconnectedClients, a.now)
a.markStop(migrate, "", allocMigrating)
a.markStop(lost, structs.AllocClientStatusLost, allocLost)
canaries = untainted
all = all.difference(migrate, lost)
}
return
}
// computeUnderProvisionedBy returns the number of allocs that still need to be
// placed for a particular group. The inputs are the group definition, the untainted,
// destructive, and migrate allocation sets, and whether we are in a canary state.
func (a *allocReconciler) computeUnderProvisionedBy(group *structs.TaskGroup, untainted, destructive, migrate allocSet, isCanarying bool) int {
// If no update strategy, nothing is migrating, and nothing is being replaced,
// allow as many as defined in group.Count
if group.Update.IsEmpty() || len(destructive)+len(migrate) == 0 {
return group.Count
}
// If the deployment is nil, allow MaxParallel placements
if a.deployment == nil {
return group.Update.MaxParallel
}
// If the deployment is paused, failed, or we have un-promoted canaries, do not create anything else.
if a.deploymentPaused ||
a.deploymentFailed ||
isCanarying {
return 0
}
underProvisionedBy := group.Update.MaxParallel
partOf, _ := untainted.filterByDeployment(a.deployment.ID)
for _, alloc := range partOf {
// An unhealthy allocation means nothing else should happen.
if alloc.DeploymentStatus.IsUnhealthy() {
return 0
}
// If not yet explicitly set to healthy (nil) decrement.
if !alloc.DeploymentStatus.IsHealthy() {
underProvisionedBy--
}
}
// The limit can be less than zero in the case that the job was changed such
// that it required destructive changes and the count was scaled up.
if underProvisionedBy < 0 {
return 0
}
return underProvisionedBy
}
// computePlacements returns the set of allocations to place given the group
// definition, the set of untainted, migrating and reschedule allocations for the group.
//
// Placements will meet or exceed group count.
func (a *allocReconciler) computePlacements(group *structs.TaskGroup,
nameIndex *allocNameIndex, untainted, migrate, reschedule, lost, reconnecting allocSet,
isCanarying bool) []allocPlaceResult {
// Add rescheduled placement results
var place []allocPlaceResult
for _, alloc := range reschedule {
place = append(place, allocPlaceResult{
name: alloc.Name,
taskGroup: group,
previousAlloc: alloc,
reschedule: true,
canary: alloc.DeploymentStatus.IsCanary(),
downgradeNonCanary: isCanarying && !alloc.DeploymentStatus.IsCanary(),
minJobVersion: alloc.Job.Version,
lost: false,
})
}
// Add replacements for disconnected and lost allocs up to group.Count
existing := len(untainted) + len(migrate) + len(reschedule) + len(reconnecting) - len(reconnecting.filterByFailedReconnect())
// Add replacements for lost
for _, alloc := range lost {
if existing >= group.Count {
// Reached desired count, do not replace remaining lost
// allocs
break
}
existing++
place = append(place, allocPlaceResult{
name: alloc.Name,
taskGroup: group,
previousAlloc: alloc,
reschedule: false,
canary: alloc.DeploymentStatus.IsCanary(),
downgradeNonCanary: isCanarying && !alloc.DeploymentStatus.IsCanary(),
minJobVersion: alloc.Job.Version,
lost: true,
})
}
// Add remaining placement results
if existing < group.Count {
for _, name := range nameIndex.Next(uint(group.Count - existing)) {
place = append(place, allocPlaceResult{
name: name,
taskGroup: group,
downgradeNonCanary: isCanarying,
})
}
}
return place
}
// computeReplacements either applies the placements calculated by computePlacements,
// or computes more placements based on whether the deployment is ready for placement
// and if the placement is already rescheduling or part of a failed deployment.
// The input deploymentPlaceReady is calculated as the deployment is not paused, failed, or canarying.
// It returns the number of allocs still needed.
func (a *allocReconciler) computeReplacements(deploymentPlaceReady bool, desiredChanges *structs.DesiredUpdates,
place []allocPlaceResult, rescheduleNow, lost allocSet, underProvisionedBy int) int {
// Disconnecting allocs are not failing, but are included in rescheduleNow.
// Create a new set that only includes the actual failures and compute
// replacements based off that.
failed := make(allocSet)
for id, alloc := range rescheduleNow {
if _, ok := a.result.disconnectUpdates[id]; !ok {
failed[id] = alloc
}
}
// If the deployment is place ready, apply all placements and return
if deploymentPlaceReady {
desiredChanges.Place += uint64(len(place))
// This relies on the computePlacements having built this set, which in
// turn relies on len(lostLater) == 0.
a.result.place = append(a.result.place, place...)
a.markStop(failed, "", allocRescheduled)
desiredChanges.Stop += uint64(len(failed))
min := helper.IntMin(len(place), underProvisionedBy)
underProvisionedBy -= min
return underProvisionedBy
}
// We do not want to place additional allocations but in the case we
// have lost allocations or allocations that require rescheduling now,
// we do so regardless to avoid odd user experiences.
// If allocs have been lost, determine the number of replacements that are needed
// and add placements to the result for the lost allocs.
if len(lost) != 0 {
allowed := helper.IntMin(len(lost), len(place))
desiredChanges.Place += uint64(allowed)
a.result.place = append(a.result.place, place[:allowed]...)
}
// if no failures or there are no pending placements return.
if len(rescheduleNow) == 0 || len(place) == 0 {
return underProvisionedBy
}
// Handle rescheduling of failed allocations even if the deployment is failed.
// If the placement is rescheduling, and not part of a failed deployment, add
// to the place set. Add the previous alloc to the stop set unless it is disconnecting.
for _, p := range place {
prev := p.PreviousAllocation()
partOfFailedDeployment := a.deploymentFailed && prev != nil && a.deployment.ID == prev.DeploymentID
if !partOfFailedDeployment && p.IsRescheduling() {
a.result.place = append(a.result.place, p)
desiredChanges.Place++
_, prevIsDisconnecting := a.result.disconnectUpdates[prev.ID]
if prevIsDisconnecting {
continue
}
a.result.stop = append(a.result.stop, allocStopResult{
alloc: prev,
statusDescription: allocRescheduled,
})
desiredChanges.Stop++
}
}
return underProvisionedBy
}
func (a *allocReconciler) computeDestructiveUpdates(destructive allocSet, underProvisionedBy int,
desiredChanges *structs.DesiredUpdates, tg *structs.TaskGroup) {
// Do all destructive updates
min := helper.IntMin(len(destructive), underProvisionedBy)
desiredChanges.DestructiveUpdate += uint64(min)
desiredChanges.Ignore += uint64(len(destructive) - min)
for _, alloc := range destructive.nameOrder()[:min] {
a.result.destructiveUpdate = append(a.result.destructiveUpdate, allocDestructiveResult{
placeName: alloc.Name,
placeTaskGroup: tg,
stopAlloc: alloc,
stopStatusDescription: allocUpdating,
})
}
}
func (a *allocReconciler) computeMigrations(desiredChanges *structs.DesiredUpdates, migrate allocSet, tg *structs.TaskGroup, isCanarying bool) {
desiredChanges.Migrate += uint64(len(migrate))
for _, alloc := range migrate.nameOrder() {
a.result.stop = append(a.result.stop, allocStopResult{
alloc: alloc,
statusDescription: allocMigrating,
})
a.result.place = append(a.result.place, allocPlaceResult{
name: alloc.Name,
canary: alloc.DeploymentStatus.IsCanary(),
taskGroup: tg,
previousAlloc: alloc,
downgradeNonCanary: isCanarying && !alloc.DeploymentStatus.IsCanary(),
minJobVersion: alloc.Job.Version,
})
}
}
func (a *allocReconciler) createDeployment(groupName string, strategy *structs.UpdateStrategy,
existingDeployment bool, dstate *structs.DeploymentState, all, destructive allocSet) {
// Guard the simple cases that require no computation first.
if existingDeployment ||
strategy.IsEmpty() ||
dstate.DesiredTotal == 0 {
return
}
updatingSpec := len(destructive) != 0 || len(a.result.inplaceUpdate) != 0
hadRunning := false
for _, alloc := range all {
if alloc.Job.Version == a.job.Version && alloc.Job.CreateIndex == a.job.CreateIndex {
hadRunning = true
break
}
}
// Don't create a deployment if it's not the first time running the job
// and there are no updates to the spec.
if hadRunning && !updatingSpec {
return
}
// A previous group may have made the deployment already. If not create one.
if a.deployment == nil {
a.deployment = structs.NewDeployment(a.job, a.evalPriority)
// in multiregion jobs, most deployments start in a pending state
if a.job.IsMultiregion() && !(a.job.IsPeriodic() && a.job.IsParameterized()) {
a.deployment.Status = structs.DeploymentStatusPending
a.deployment.StatusDescription = structs.DeploymentStatusDescriptionPendingForPeer
}
a.result.deployment = a.deployment
}
// Attach the groups deployment state to the deployment
a.deployment.TaskGroups[groupName] = dstate
}
func (a *allocReconciler) isDeploymentComplete(groupName string, destructive, inplace, migrate, rescheduleNow allocSet,
place []allocPlaceResult, rescheduleLater []*delayedRescheduleInfo, requiresCanaries bool) bool {
complete := len(destructive)+len(inplace)+len(place)+len(migrate)+len(rescheduleNow)+len(rescheduleLater) == 0 &&
!requiresCanaries
if !complete || a.deployment == nil {
return false
}
// Final check to see if the deployment is complete is to ensure everything is healthy
if dstate, ok := a.deployment.TaskGroups[groupName]; ok {
if dstate.HealthyAllocs < helper.IntMax(dstate.DesiredTotal, dstate.DesiredCanaries) || // Make sure we have enough healthy allocs
(dstate.DesiredCanaries > 0 && !dstate.Promoted) { // Make sure we are promoted if we have canaries
complete = false
}
}
return complete
}
// computeStop returns the set of allocations that are marked for stopping given
// the group definition, the set of allocations in various states and whether we
// are canarying.
func (a *allocReconciler) computeStop(group *structs.TaskGroup, nameIndex *allocNameIndex,
untainted, migrate, lost, canaries, reconnecting allocSet, isCanarying bool, followupEvals map[string]string) (allocSet, allocSet) {
// Mark all lost allocations for stop.
var stop allocSet
stop = stop.union(lost)
a.markDelayed(lost, structs.AllocClientStatusLost, allocLost, followupEvals)
// Mark all failed reconnects for stop.
failedReconnects := reconnecting.filterByFailedReconnect()
stop = stop.union(failedReconnects)
a.markStop(failedReconnects, structs.AllocClientStatusFailed, allocRescheduled)
reconnecting = reconnecting.difference(failedReconnects)
// If we are still deploying or creating canaries, don't stop them
if isCanarying {
untainted = untainted.difference(canaries)
}
// Hot path the nothing to do case
remove := len(untainted) + len(migrate) + len(reconnecting) - group.Count
if remove <= 0 {
return stop, reconnecting
}
// Filter out any terminal allocations from the untainted set
// This is so that we don't try to mark them as stopped redundantly
untainted = filterByTerminal(untainted)
// Prefer stopping any alloc that has the same name as the canaries if we
// are promoted
if !isCanarying && len(canaries) != 0 {
canaryNames := canaries.nameSet()
for id, alloc := range untainted.difference(canaries) {
if _, match := canaryNames[alloc.Name]; match {
stop[id] = alloc
a.result.stop = append(a.result.stop, allocStopResult{
alloc: alloc,
statusDescription: allocNotNeeded,
})
delete(untainted, id)
remove--
if remove == 0 {
return stop, reconnecting
}
}
}
}
// Prefer selecting from the migrating set before stopping existing allocs
if len(migrate) != 0 {
migratingNames := newAllocNameIndex(a.jobID, group.Name, group.Count, migrate)
removeNames := migratingNames.Highest(uint(remove))
for id, alloc := range migrate {
if _, match := removeNames[alloc.Name]; !match {
continue
}
a.result.stop = append(a.result.stop, allocStopResult{
alloc: alloc,
statusDescription: allocNotNeeded,
})
delete(migrate, id)
stop[id] = alloc
nameIndex.UnsetIndex(alloc.Index())
remove--
if remove == 0 {
return stop, reconnecting
}
}
}
// Handle allocs that might be able to reconnect.
if len(reconnecting) != 0 {
remove = a.computeStopByReconnecting(untainted, reconnecting, stop, remove)
if remove == 0 {
return stop, reconnecting
}
}
// Select the allocs with the highest count to remove
removeNames := nameIndex.Highest(uint(remove))
for id, alloc := range untainted {
if _, ok := removeNames[alloc.Name]; ok {
stop[id] = alloc
a.result.stop = append(a.result.stop, allocStopResult{
alloc: alloc,
statusDescription: allocNotNeeded,
})
delete(untainted, id)
remove--
if remove == 0 {
return stop, reconnecting
}
}
}
// It is possible that we didn't stop as many as we should have if there
// were allocations with duplicate names.
for id, alloc := range untainted {
stop[id] = alloc
a.result.stop = append(a.result.stop, allocStopResult{
alloc: alloc,
statusDescription: allocNotNeeded,
})
delete(untainted, id)
remove--
if remove == 0 {
return stop, reconnecting
}
}
return stop, reconnecting
}
// computeStopByReconnecting moves allocations from either the untainted or reconnecting
// sets to the stop set and returns the number of allocations that still need to be removed.
func (a *allocReconciler) computeStopByReconnecting(untainted, reconnecting, stop allocSet, remove int) int {
if remove == 0 {
return remove
}
for _, reconnectingAlloc := range reconnecting {
// if the desired status is not run, or if the user-specified desired
// transition is not run, stop the reconnecting allocation.
if reconnectingAlloc.DesiredStatus != structs.AllocDesiredStatusRun ||
reconnectingAlloc.DesiredTransition.ShouldMigrate() ||
reconnectingAlloc.DesiredTransition.ShouldReschedule() ||
reconnectingAlloc.DesiredTransition.ShouldForceReschedule() ||
reconnectingAlloc.Job.Version < a.job.Version ||
reconnectingAlloc.Job.CreateIndex < a.job.CreateIndex {
stop[reconnectingAlloc.ID] = reconnectingAlloc
a.result.stop = append(a.result.stop, allocStopResult{
alloc: reconnectingAlloc,
statusDescription: allocNotNeeded,
})
delete(reconnecting, reconnectingAlloc.ID)
remove--
// if we've removed all we need to, stop iterating and return.
if remove == 0 {
return remove
}
continue
}
// Compare reconnecting to untainted and decide which to keep.
for _, untaintedAlloc := range untainted {
// If not a match by name and previous alloc continue
if reconnectingAlloc.Name != untaintedAlloc.Name {
continue
}
// By default, we prefer stopping the replacement alloc unless
// the replacement has a higher metrics score.
stopAlloc := untaintedAlloc
deleteSet := untainted
untaintedMaxScoreMeta := untaintedAlloc.Metrics.MaxNormScore()
reconnectingMaxScoreMeta := reconnectingAlloc.Metrics.MaxNormScore()
if untaintedMaxScoreMeta == nil {
a.logger.Error("error computing stop: replacement allocation metrics not available", "alloc_name", untaintedAlloc.Name, "alloc_id", untaintedAlloc.ID)
continue
}
if reconnectingMaxScoreMeta == nil {
a.logger.Error("error computing stop: reconnecting allocation metrics not available", "alloc_name", reconnectingAlloc.Name, "alloc_id", reconnectingAlloc.ID)
continue
}
statusDescription := allocNotNeeded
if untaintedAlloc.Job.Version > reconnectingAlloc.Job.Version ||
untaintedAlloc.Job.CreateIndex > reconnectingAlloc.Job.CreateIndex ||
untaintedMaxScoreMeta.NormScore > reconnectingMaxScoreMeta.NormScore {
stopAlloc = reconnectingAlloc
deleteSet = reconnecting
} else {
statusDescription = allocReconnected
}
stop[stopAlloc.ID] = stopAlloc
a.result.stop = append(a.result.stop, allocStopResult{
alloc: stopAlloc,
statusDescription: statusDescription,
})
delete(deleteSet, stopAlloc.ID)
remove--
// if we've removed all we need to, stop iterating and return.
if remove == 0 {
return remove
}
}
}
return remove
}
// computeUpdates determines which allocations for the passed group require
// updates. Three groups are returned:
// 1. Those that require no upgrades
// 2. Those that can be upgraded in-place. These are added to the results
// automatically since the function contains the correct state to do so,
// 3. Those that require destructive updates
func (a *allocReconciler) computeUpdates(group *structs.TaskGroup, untainted allocSet) (ignore, inplace, destructive allocSet) {
// Determine the set of allocations that need to be updated
ignore = make(map[string]*structs.Allocation)
inplace = make(map[string]*structs.Allocation)
destructive = make(map[string]*structs.Allocation)
for _, alloc := range untainted {
ignoreChange, destructiveChange, inplaceAlloc := a.allocUpdateFn(alloc, a.job, group)
if ignoreChange {
ignore[alloc.ID] = alloc
} else if destructiveChange {
destructive[alloc.ID] = alloc
} else {
inplace[alloc.ID] = alloc
a.result.inplaceUpdate = append(a.result.inplaceUpdate, inplaceAlloc)
}
}
return
}
// createRescheduleLaterEvals creates batched followup evaluations with the WaitUntil field
// set for allocations that are eligible to be rescheduled later, and marks the alloc with
// the followupEvalID
func (a *allocReconciler) createRescheduleLaterEvals(rescheduleLater []*delayedRescheduleInfo, all allocSet, tgName string) {
// followupEvals are created in the same way as for delayed lost allocs
allocIDToFollowupEvalID := a.createLostLaterEvals(rescheduleLater, tgName)
// Create updates that will be applied to the allocs to mark the FollowupEvalID
for allocID, evalID := range allocIDToFollowupEvalID {
existingAlloc := all[allocID]
updatedAlloc := existingAlloc.Copy()
updatedAlloc.FollowupEvalID = evalID
a.result.attributeUpdates[updatedAlloc.ID] = updatedAlloc
}
}
// computeReconnecting copies existing allocations in the unknown state, but
// whose nodes have been identified as ready. The Allocations DesiredStatus is
// set to running, and these allocs are appended to the Plan as non-destructive
// updates. Clients are responsible for reconciling the DesiredState with the
// actual state as the node comes back online.
func (a *allocReconciler) computeReconnecting(reconnecting allocSet) {
if len(reconnecting) == 0 {
return
}
// Create updates that will be appended to the plan.
for _, alloc := range reconnecting {
// If the user has defined a DesiredTransition don't resume the alloc.
if alloc.DesiredTransition.ShouldMigrate() ||
alloc.DesiredTransition.ShouldReschedule() ||
alloc.DesiredTransition.ShouldForceReschedule() ||
alloc.Job.Version < a.job.Version ||
alloc.Job.CreateIndex < a.job.CreateIndex {
continue
}
// If the scheduler has defined a terminal DesiredStatus don't resume the alloc.
if alloc.DesiredStatus != structs.AllocDesiredStatusRun {
continue
}
// If the alloc has failed don't reconnect.
if alloc.ClientStatus != structs.AllocClientStatusRunning {
continue
}
a.result.reconnectUpdates[alloc.ID] = alloc
}
}
// handleDelayedLost creates batched followup evaluations with the WaitUntil field set for
// lost allocations. followupEvals are appended to a.result as a side effect, we return a
// map of alloc IDs to their followupEval IDs.
func (a *allocReconciler) createLostLaterEvals(rescheduleLater []*delayedRescheduleInfo, tgName string) map[string]string {
if len(rescheduleLater) == 0 {
return map[string]string{}
}
// Sort by time
sort.Slice(rescheduleLater, func(i, j int) bool {
return rescheduleLater[i].rescheduleTime.Before(rescheduleLater[j].rescheduleTime)
})
var evals []*structs.Evaluation
nextReschedTime := rescheduleLater[0].rescheduleTime
allocIDToFollowupEvalID := make(map[string]string, len(rescheduleLater))
// Create a new eval for the first batch
eval := &structs.Evaluation{
ID: uuid.Generate(),
Namespace: a.job.Namespace,
Priority: a.evalPriority,
Type: a.job.Type,
TriggeredBy: structs.EvalTriggerRetryFailedAlloc,
JobID: a.job.ID,
JobModifyIndex: a.job.ModifyIndex,
Status: structs.EvalStatusPending,
StatusDescription: reschedulingFollowupEvalDesc,
WaitUntil: nextReschedTime,
}
evals = append(evals, eval)
for _, allocReschedInfo := range rescheduleLater {
if allocReschedInfo.rescheduleTime.Sub(nextReschedTime) < batchedFailedAllocWindowSize {
allocIDToFollowupEvalID[allocReschedInfo.allocID] = eval.ID
} else {
// Start a new batch
nextReschedTime = allocReschedInfo.rescheduleTime
// Create a new eval for the new batch
eval = &structs.Evaluation{
ID: uuid.Generate(),
Namespace: a.job.Namespace,
Priority: a.evalPriority,
Type: a.job.Type,
TriggeredBy: structs.EvalTriggerRetryFailedAlloc,
JobID: a.job.ID,
JobModifyIndex: a.job.ModifyIndex,
Status: structs.EvalStatusPending,
WaitUntil: nextReschedTime,
}
evals = append(evals, eval)
// Set the evalID for the first alloc in this new batch
allocIDToFollowupEvalID[allocReschedInfo.allocID] = eval.ID
}
emitRescheduleInfo(allocReschedInfo.alloc, eval)
}
a.appendFollowupEvals(tgName, evals)
return allocIDToFollowupEvalID
}
// createTimeoutLaterEvals creates followup evaluations with the
// WaitUntil field set for allocations in an unknown state on disconnected nodes.
// Followup Evals are appended to a.result as a side effect. It returns a map of
// allocIDs to their associated followUpEvalIDs.
func (a *allocReconciler) createTimeoutLaterEvals(disconnecting allocSet, tgName string) map[string]string {
if len(disconnecting) == 0 {
return map[string]string{}
}
timeoutDelays, err := disconnecting.delayByMaxClientDisconnect(a.now)
if err != nil || len(timeoutDelays) != len(disconnecting) {
a.logger.Error("error computing disconnecting timeouts for task_group", "task_group", tgName, "err", err)
return map[string]string{}
}
// Sort by time
sort.Slice(timeoutDelays, func(i, j int) bool {
return timeoutDelays[i].rescheduleTime.Before(timeoutDelays[j].rescheduleTime)
})
var evals []*structs.Evaluation
nextReschedTime := timeoutDelays[0].rescheduleTime
allocIDToFollowupEvalID := make(map[string]string, len(timeoutDelays))
eval := &structs.Evaluation{
ID: uuid.Generate(),
Namespace: a.job.Namespace,
Priority: a.evalPriority,
Type: a.job.Type,
TriggeredBy: structs.EvalTriggerMaxDisconnectTimeout,
JobID: a.job.ID,
JobModifyIndex: a.job.ModifyIndex,
Status: structs.EvalStatusPending,
StatusDescription: disconnectTimeoutFollowupEvalDesc,
WaitUntil: nextReschedTime,
}
evals = append(evals, eval)
// Important to remember that these are sorted. The rescheduleTime can only
// get farther into the future. If this loop detects the next delay is greater
// than the batch window (5s) it creates another batch.
for _, timeoutInfo := range timeoutDelays {
if timeoutInfo.rescheduleTime.Sub(nextReschedTime) < batchedFailedAllocWindowSize {
allocIDToFollowupEvalID[timeoutInfo.allocID] = eval.ID
} else {
// Start a new batch
nextReschedTime = timeoutInfo.rescheduleTime
// Create a new eval for the new batch
eval = &structs.Evaluation{
ID: uuid.Generate(),
Namespace: a.job.Namespace,
Priority: a.evalPriority,
Type: a.job.Type,
TriggeredBy: structs.EvalTriggerMaxDisconnectTimeout,
JobID: a.job.ID,
JobModifyIndex: a.job.ModifyIndex,
Status: structs.EvalStatusPending,
StatusDescription: disconnectTimeoutFollowupEvalDesc,
WaitUntil: timeoutInfo.rescheduleTime,
}
evals = append(evals, eval)
allocIDToFollowupEvalID[timeoutInfo.allocID] = eval.ID
}
emitRescheduleInfo(timeoutInfo.alloc, eval)
// Create updates that will be applied to the allocs to mark the FollowupEvalID
// and the unknown ClientStatus and AllocState.
updatedAlloc := timeoutInfo.alloc.Copy()
updatedAlloc.ClientStatus = structs.AllocClientStatusUnknown
updatedAlloc.AppendState(structs.AllocStateFieldClientStatus, structs.AllocClientStatusUnknown)
updatedAlloc.ClientDescription = allocUnknown
updatedAlloc.FollowupEvalID = eval.ID
a.result.disconnectUpdates[updatedAlloc.ID] = updatedAlloc
}
a.appendFollowupEvals(tgName, evals)
return allocIDToFollowupEvalID
}
// appendFollowupEvals appends a set of followup evals for a task group to the
// desiredFollowupEvals map which is later added to the scheduler's followUpEvals set.
func (a *allocReconciler) appendFollowupEvals(tgName string, evals []*structs.Evaluation) {
// Merge with
if existingFollowUpEvals, ok := a.result.desiredFollowupEvals[tgName]; ok {
evals = append(existingFollowUpEvals, evals...)
}
a.result.desiredFollowupEvals[tgName] = evals
}
// emitRescheduleInfo emits metrics about the rescheduling decision of an evaluation. If a followup evaluation is
// provided, the waitUntil time is emitted.
func emitRescheduleInfo(alloc *structs.Allocation, followupEval *structs.Evaluation) {
// Emit short-lived metrics data point. Note, these expire and stop emitting after about a minute.
baseMetric := []string{"scheduler", "allocs", "reschedule"}
labels := []metrics.Label{
{Name: "alloc_id", Value: alloc.ID},
{Name: "job", Value: alloc.JobID},
{Name: "namespace", Value: alloc.Namespace},
{Name: "task_group", Value: alloc.TaskGroup},
}
if followupEval != nil {
labels = append(labels, metrics.Label{Name: "followup_eval_id", Value: followupEval.ID})
metrics.SetGaugeWithLabels(append(baseMetric, "wait_until"), float32(followupEval.WaitUntil.Unix()), labels)
}
attempted, availableAttempts := alloc.RescheduleInfo()
metrics.SetGaugeWithLabels(append(baseMetric, "attempted"), float32(attempted), labels)
metrics.SetGaugeWithLabels(append(baseMetric, "limit"), float32(availableAttempts), labels)
}