scheduler: fix reconciliation of reconnecting allocs (#16609)
When a disconnect client reconnects the `allocReconciler` must find the allocations that were created to replace the original disconnected allocations. This process was being done in only a subset of non-terminal untainted allocations, meaning that, if the replacement allocations were not in this state the reconciler didn't stop them, leaving the job in an inconsistent state. This inconsistency is only solved in a future job evaluation, but at that point the allocation is considered reconnected and so the specific reconnection logic was not applied, leading to unexpected outcomes. This commit fixes the problem by running reconnecting allocation reconciliation logic earlier into the process, leaving the rest of the reconciler oblivious of reconnecting allocations. It also uses the full set of allocations to search for replacements, stopping them even if they are not in the `untainted` set. The system `SystemScheduler` is not affected by this bug because disconnected clients don't trigger replacements: every eligible client is already running an allocation.
This commit is contained in:
parent
743414739d
commit
8070882c4b
|
@ -0,0 +1,3 @@
|
||||||
|
```release-note:bug
|
||||||
|
scheduler: Fix reconciliation of reconnecting allocs when the replacement allocations are not running
|
||||||
|
```
|
|
@ -428,6 +428,34 @@ func (a *allocReconciler) computeGroup(groupName string, all allocSet) bool {
|
||||||
untainted, migrate, lost, disconnecting, reconnecting, ignore := all.filterByTainted(a.taintedNodes, a.supportsDisconnectedClients, a.now)
|
untainted, migrate, lost, disconnecting, reconnecting, ignore := all.filterByTainted(a.taintedNodes, a.supportsDisconnectedClients, a.now)
|
||||||
desiredChanges.Ignore += uint64(len(ignore))
|
desiredChanges.Ignore += uint64(len(ignore))
|
||||||
|
|
||||||
|
// If there are allocations reconnecting we need to reconcile them and
|
||||||
|
// their replacements first because there is specific logic when deciding
|
||||||
|
// which ones to keep that can only be applied when the client reconnects.
|
||||||
|
if len(reconnecting) > 0 {
|
||||||
|
// Pass all allocations because the replacements we need to find may be
|
||||||
|
// in any state, including themselves being reconnected.
|
||||||
|
reconnect, stop := a.reconcileReconnecting(reconnecting, all)
|
||||||
|
|
||||||
|
// Stop the reconciled allocations and remove them from the other sets
|
||||||
|
// since they have been already handled.
|
||||||
|
desiredChanges.Stop += uint64(len(stop))
|
||||||
|
|
||||||
|
untainted = untainted.difference(stop)
|
||||||
|
migrate = migrate.difference(stop)
|
||||||
|
lost = lost.difference(stop)
|
||||||
|
disconnecting = disconnecting.difference(stop)
|
||||||
|
reconnecting = reconnecting.difference(stop)
|
||||||
|
ignore = ignore.difference(stop)
|
||||||
|
|
||||||
|
// Validate and add reconnecting allocations to the plan so they are
|
||||||
|
// logged.
|
||||||
|
a.computeReconnecting(reconnect)
|
||||||
|
|
||||||
|
// The rest of the reconnecting allocations is now untainted and will
|
||||||
|
// be further reconciled below.
|
||||||
|
untainted = untainted.union(reconnect)
|
||||||
|
}
|
||||||
|
|
||||||
// Determine what set of terminal allocations need to be rescheduled
|
// Determine what set of terminal allocations need to be rescheduled
|
||||||
untainted, rescheduleNow, rescheduleLater := untainted.filterByRescheduleable(a.batch, false, a.now, a.evalID, a.deployment)
|
untainted, rescheduleNow, rescheduleLater := untainted.filterByRescheduleable(a.batch, false, a.now, a.evalID, a.deployment)
|
||||||
|
|
||||||
|
@ -459,14 +487,10 @@ func (a *allocReconciler) computeGroup(groupName string, all allocSet) bool {
|
||||||
// Stop any unneeded allocations and update the untainted set to not
|
// Stop any unneeded allocations and update the untainted set to not
|
||||||
// include stopped allocations.
|
// include stopped allocations.
|
||||||
isCanarying := dstate != nil && dstate.DesiredCanaries != 0 && !dstate.Promoted
|
isCanarying := dstate != nil && dstate.DesiredCanaries != 0 && !dstate.Promoted
|
||||||
stop, reconnecting := a.computeStop(tg, nameIndex, untainted, migrate, lost, canaries, reconnecting, isCanarying, lostLaterEvals)
|
stop := a.computeStop(tg, nameIndex, untainted, migrate, lost, canaries, isCanarying, lostLaterEvals)
|
||||||
desiredChanges.Stop += uint64(len(stop))
|
desiredChanges.Stop += uint64(len(stop))
|
||||||
untainted = untainted.difference(stop)
|
untainted = untainted.difference(stop)
|
||||||
|
|
||||||
// Validate and add reconnecting allocs to the plan so that they will be logged.
|
|
||||||
a.computeReconnecting(reconnecting)
|
|
||||||
desiredChanges.Ignore += uint64(len(a.result.reconnectUpdates))
|
|
||||||
|
|
||||||
// Do inplace upgrades where possible and capture the set of upgrades that
|
// Do inplace upgrades where possible and capture the set of upgrades that
|
||||||
// need to be done destructively.
|
// need to be done destructively.
|
||||||
ignore, inplace, destructive := a.computeUpdates(tg, untainted)
|
ignore, inplace, destructive := a.computeUpdates(tg, untainted)
|
||||||
|
@ -496,10 +520,9 @@ func (a *allocReconciler) computeGroup(groupName string, all allocSet) bool {
|
||||||
// * If there are any canaries that they have been promoted
|
// * If there are any canaries that they have been promoted
|
||||||
// * There is no delayed stop_after_client_disconnect alloc, which delays scheduling for the whole group
|
// * There is no delayed stop_after_client_disconnect alloc, which delays scheduling for the whole group
|
||||||
// * An alloc was lost
|
// * An alloc was lost
|
||||||
// * There is not a corresponding reconnecting alloc.
|
|
||||||
var place []allocPlaceResult
|
var place []allocPlaceResult
|
||||||
if len(lostLater) == 0 {
|
if len(lostLater) == 0 {
|
||||||
place = a.computePlacements(tg, nameIndex, untainted, migrate, rescheduleNow, lost, reconnecting, isCanarying)
|
place = a.computePlacements(tg, nameIndex, untainted, migrate, rescheduleNow, lost, isCanarying)
|
||||||
if !existingDeployment {
|
if !existingDeployment {
|
||||||
dstate.DesiredTotal += len(place)
|
dstate.DesiredTotal += len(place)
|
||||||
}
|
}
|
||||||
|
@ -705,7 +728,7 @@ func (a *allocReconciler) computeUnderProvisionedBy(group *structs.TaskGroup, un
|
||||||
//
|
//
|
||||||
// Placements will meet or exceed group count.
|
// Placements will meet or exceed group count.
|
||||||
func (a *allocReconciler) computePlacements(group *structs.TaskGroup,
|
func (a *allocReconciler) computePlacements(group *structs.TaskGroup,
|
||||||
nameIndex *allocNameIndex, untainted, migrate, reschedule, lost, reconnecting allocSet,
|
nameIndex *allocNameIndex, untainted, migrate, reschedule, lost allocSet,
|
||||||
isCanarying bool) []allocPlaceResult {
|
isCanarying bool) []allocPlaceResult {
|
||||||
|
|
||||||
// Add rescheduled placement results
|
// Add rescheduled placement results
|
||||||
|
@ -725,7 +748,7 @@ func (a *allocReconciler) computePlacements(group *structs.TaskGroup,
|
||||||
}
|
}
|
||||||
|
|
||||||
// Add replacements for disconnected and lost allocs up to group.Count
|
// Add replacements for disconnected and lost allocs up to group.Count
|
||||||
existing := len(untainted) + len(migrate) + len(reschedule) + len(reconnecting) - len(reconnecting.filterByFailedReconnect())
|
existing := len(untainted) + len(migrate) + len(reschedule)
|
||||||
|
|
||||||
// Add replacements for lost
|
// Add replacements for lost
|
||||||
for _, alloc := range lost {
|
for _, alloc := range lost {
|
||||||
|
@ -935,28 +958,22 @@ func (a *allocReconciler) isDeploymentComplete(groupName string, destructive, in
|
||||||
// the group definition, the set of allocations in various states and whether we
|
// the group definition, the set of allocations in various states and whether we
|
||||||
// are canarying.
|
// are canarying.
|
||||||
func (a *allocReconciler) computeStop(group *structs.TaskGroup, nameIndex *allocNameIndex,
|
func (a *allocReconciler) computeStop(group *structs.TaskGroup, nameIndex *allocNameIndex,
|
||||||
untainted, migrate, lost, canaries, reconnecting allocSet, isCanarying bool, followupEvals map[string]string) (allocSet, allocSet) {
|
untainted, migrate, lost, canaries allocSet, isCanarying bool, followupEvals map[string]string) allocSet {
|
||||||
|
|
||||||
// Mark all lost allocations for stop.
|
// Mark all lost allocations for stop.
|
||||||
var stop allocSet
|
var stop allocSet
|
||||||
stop = stop.union(lost)
|
stop = stop.union(lost)
|
||||||
a.markDelayed(lost, structs.AllocClientStatusLost, allocLost, followupEvals)
|
a.markDelayed(lost, structs.AllocClientStatusLost, allocLost, followupEvals)
|
||||||
|
|
||||||
// Mark all failed reconnects for stop.
|
|
||||||
failedReconnects := reconnecting.filterByFailedReconnect()
|
|
||||||
stop = stop.union(failedReconnects)
|
|
||||||
a.markStop(failedReconnects, structs.AllocClientStatusFailed, allocRescheduled)
|
|
||||||
reconnecting = reconnecting.difference(failedReconnects)
|
|
||||||
|
|
||||||
// If we are still deploying or creating canaries, don't stop them
|
// If we are still deploying or creating canaries, don't stop them
|
||||||
if isCanarying {
|
if isCanarying {
|
||||||
untainted = untainted.difference(canaries)
|
untainted = untainted.difference(canaries)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Hot path the nothing to do case
|
// Hot path the nothing to do case
|
||||||
remove := len(untainted) + len(migrate) + len(reconnecting) - group.Count
|
remove := len(untainted) + len(migrate) - group.Count
|
||||||
if remove <= 0 {
|
if remove <= 0 {
|
||||||
return stop, reconnecting
|
return stop
|
||||||
}
|
}
|
||||||
|
|
||||||
// Filter out any terminal allocations from the untainted set
|
// Filter out any terminal allocations from the untainted set
|
||||||
|
@ -978,7 +995,7 @@ func (a *allocReconciler) computeStop(group *structs.TaskGroup, nameIndex *alloc
|
||||||
|
|
||||||
remove--
|
remove--
|
||||||
if remove == 0 {
|
if remove == 0 {
|
||||||
return stop, reconnecting
|
return stop
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1002,19 +1019,11 @@ func (a *allocReconciler) computeStop(group *structs.TaskGroup, nameIndex *alloc
|
||||||
|
|
||||||
remove--
|
remove--
|
||||||
if remove == 0 {
|
if remove == 0 {
|
||||||
return stop, reconnecting
|
return stop
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Handle allocs that might be able to reconnect.
|
|
||||||
if len(reconnecting) != 0 {
|
|
||||||
remove = a.computeStopByReconnecting(untainted, reconnecting, stop, remove)
|
|
||||||
if remove == 0 {
|
|
||||||
return stop, reconnecting
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Select the allocs with the highest count to remove
|
// Select the allocs with the highest count to remove
|
||||||
removeNames := nameIndex.Highest(uint(remove))
|
removeNames := nameIndex.Highest(uint(remove))
|
||||||
for id, alloc := range untainted {
|
for id, alloc := range untainted {
|
||||||
|
@ -1028,7 +1037,7 @@ func (a *allocReconciler) computeStop(group *structs.TaskGroup, nameIndex *alloc
|
||||||
|
|
||||||
remove--
|
remove--
|
||||||
if remove == 0 {
|
if remove == 0 {
|
||||||
return stop, reconnecting
|
return stop
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1045,95 +1054,152 @@ func (a *allocReconciler) computeStop(group *structs.TaskGroup, nameIndex *alloc
|
||||||
|
|
||||||
remove--
|
remove--
|
||||||
if remove == 0 {
|
if remove == 0 {
|
||||||
return stop, reconnecting
|
return stop
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return stop, reconnecting
|
return stop
|
||||||
}
|
}
|
||||||
|
|
||||||
// computeStopByReconnecting moves allocations from either the untainted or reconnecting
|
// reconcileReconnecting receives the set of allocations that are reconnecting
|
||||||
// sets to the stop set and returns the number of allocations that still need to be removed.
|
// and all other allocations for the same group and determines which ones to
|
||||||
func (a *allocReconciler) computeStopByReconnecting(untainted, reconnecting, stop allocSet, remove int) int {
|
// reconnect which ones or stop.
|
||||||
if remove == 0 {
|
//
|
||||||
return remove
|
// - Every reconnecting allocation MUST be present in one, and only one, of
|
||||||
}
|
// the returned set.
|
||||||
|
// - Every replacement allocation that is not preferred MUST be returned in
|
||||||
|
// the stop set.
|
||||||
|
// - Only reconnecting allocations are allowed to be present in the returned
|
||||||
|
// reconnect set.
|
||||||
|
// - If the reconnecting allocation is to be stopped, its replacements may
|
||||||
|
// not be present in any of the returned sets. The rest of the reconciler
|
||||||
|
// logic will handle them.
|
||||||
|
func (a *allocReconciler) reconcileReconnecting(reconnecting allocSet, others allocSet) (allocSet, allocSet) {
|
||||||
|
stop := make(allocSet)
|
||||||
|
reconnect := make(allocSet)
|
||||||
|
|
||||||
for _, reconnectingAlloc := range reconnecting {
|
for _, reconnectingAlloc := range reconnecting {
|
||||||
// if the desired status is not run, or if the user-specified desired
|
// Stop allocations that failed to reconnect.
|
||||||
|
reconnectFailed := !reconnectingAlloc.ServerTerminalStatus() &&
|
||||||
|
reconnectingAlloc.ClientStatus == structs.AllocClientStatusFailed
|
||||||
|
|
||||||
|
if reconnectFailed {
|
||||||
|
stop[reconnectingAlloc.ID] = reconnectingAlloc
|
||||||
|
a.result.stop = append(a.result.stop, allocStopResult{
|
||||||
|
alloc: reconnectingAlloc,
|
||||||
|
clientStatus: structs.AllocClientStatusFailed,
|
||||||
|
statusDescription: allocRescheduled,
|
||||||
|
})
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// If the desired status is not run, or if the user-specified desired
|
||||||
// transition is not run, stop the reconnecting allocation.
|
// transition is not run, stop the reconnecting allocation.
|
||||||
if reconnectingAlloc.DesiredStatus != structs.AllocDesiredStatusRun ||
|
stopReconnecting := reconnectingAlloc.DesiredStatus != structs.AllocDesiredStatusRun ||
|
||||||
reconnectingAlloc.DesiredTransition.ShouldMigrate() ||
|
reconnectingAlloc.DesiredTransition.ShouldMigrate() ||
|
||||||
reconnectingAlloc.DesiredTransition.ShouldReschedule() ||
|
reconnectingAlloc.DesiredTransition.ShouldReschedule() ||
|
||||||
reconnectingAlloc.DesiredTransition.ShouldForceReschedule() ||
|
reconnectingAlloc.DesiredTransition.ShouldForceReschedule() ||
|
||||||
reconnectingAlloc.Job.Version < a.job.Version ||
|
reconnectingAlloc.Job.Version < a.job.Version ||
|
||||||
reconnectingAlloc.Job.CreateIndex < a.job.CreateIndex {
|
reconnectingAlloc.Job.CreateIndex < a.job.CreateIndex
|
||||||
|
|
||||||
|
if stopReconnecting {
|
||||||
stop[reconnectingAlloc.ID] = reconnectingAlloc
|
stop[reconnectingAlloc.ID] = reconnectingAlloc
|
||||||
a.result.stop = append(a.result.stop, allocStopResult{
|
a.result.stop = append(a.result.stop, allocStopResult{
|
||||||
alloc: reconnectingAlloc,
|
alloc: reconnectingAlloc,
|
||||||
statusDescription: allocNotNeeded,
|
statusDescription: allocNotNeeded,
|
||||||
})
|
})
|
||||||
delete(reconnecting, reconnectingAlloc.ID)
|
|
||||||
|
|
||||||
remove--
|
|
||||||
// if we've removed all we need to, stop iterating and return.
|
|
||||||
if remove == 0 {
|
|
||||||
return remove
|
|
||||||
}
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
// Compare reconnecting to untainted and decide which to keep.
|
// Find replacement allocations and decide which one to stop. A
|
||||||
for _, untaintedAlloc := range untainted {
|
// reconnecting allocation may have multiple replacements.
|
||||||
// If not a match by name and previous alloc continue
|
for _, replacementAlloc := range others {
|
||||||
if reconnectingAlloc.Name != untaintedAlloc.Name {
|
|
||||||
|
// Skip allocations that are not a replacement of the one
|
||||||
|
// reconnecting. Replacement allocations have the same name but a
|
||||||
|
// higher CreateIndex and a different ID.
|
||||||
|
isReplacement := replacementAlloc.ID != reconnectingAlloc.ID &&
|
||||||
|
replacementAlloc.Name == reconnectingAlloc.Name &&
|
||||||
|
replacementAlloc.CreateIndex > reconnectingAlloc.CreateIndex
|
||||||
|
|
||||||
|
// Skip allocations that are server terminal.
|
||||||
|
// We don't want to replace a reconnecting allocation with one that
|
||||||
|
// is or will terminate and we don't need to stop them since they
|
||||||
|
// are already marked as terminal by the servers.
|
||||||
|
if !isReplacement || replacementAlloc.ServerTerminalStatus() {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
// By default, we prefer stopping the replacement alloc unless
|
// Pick which allocation we want to keep.
|
||||||
// the replacement has a higher metrics score.
|
keepAlloc := pickReconnectingAlloc(reconnectingAlloc, replacementAlloc)
|
||||||
stopAlloc := untaintedAlloc
|
if keepAlloc == replacementAlloc {
|
||||||
deleteSet := untainted
|
// The replacement allocation is preferred, so stop the one
|
||||||
untaintedMaxScoreMeta := untaintedAlloc.Metrics.MaxNormScore()
|
// reconnecting if not stopped yet.
|
||||||
reconnectingMaxScoreMeta := reconnectingAlloc.Metrics.MaxNormScore()
|
if _, ok := stop[reconnectingAlloc.ID]; !ok {
|
||||||
|
stop[reconnectingAlloc.ID] = reconnectingAlloc
|
||||||
if untaintedMaxScoreMeta == nil {
|
a.result.stop = append(a.result.stop, allocStopResult{
|
||||||
a.logger.Error("error computing stop: replacement allocation metrics not available", "alloc_name", untaintedAlloc.Name, "alloc_id", untaintedAlloc.ID)
|
alloc: reconnectingAlloc,
|
||||||
continue
|
statusDescription: allocNotNeeded,
|
||||||
}
|
})
|
||||||
|
}
|
||||||
if reconnectingMaxScoreMeta == nil {
|
|
||||||
a.logger.Error("error computing stop: reconnecting allocation metrics not available", "alloc_name", reconnectingAlloc.Name, "alloc_id", reconnectingAlloc.ID)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
statusDescription := allocNotNeeded
|
|
||||||
if untaintedAlloc.Job.Version > reconnectingAlloc.Job.Version ||
|
|
||||||
untaintedAlloc.Job.CreateIndex > reconnectingAlloc.Job.CreateIndex ||
|
|
||||||
untaintedMaxScoreMeta.NormScore > reconnectingMaxScoreMeta.NormScore {
|
|
||||||
stopAlloc = reconnectingAlloc
|
|
||||||
deleteSet = reconnecting
|
|
||||||
} else {
|
} else {
|
||||||
statusDescription = allocReconnected
|
// The reconnecting allocation is preferred, so stop this
|
||||||
}
|
// replacement.
|
||||||
|
stop[replacementAlloc.ID] = replacementAlloc
|
||||||
stop[stopAlloc.ID] = stopAlloc
|
a.result.stop = append(a.result.stop, allocStopResult{
|
||||||
a.result.stop = append(a.result.stop, allocStopResult{
|
alloc: replacementAlloc,
|
||||||
alloc: stopAlloc,
|
statusDescription: allocReconnected,
|
||||||
statusDescription: statusDescription,
|
})
|
||||||
})
|
|
||||||
delete(deleteSet, stopAlloc.ID)
|
|
||||||
|
|
||||||
remove--
|
|
||||||
// if we've removed all we need to, stop iterating and return.
|
|
||||||
if remove == 0 {
|
|
||||||
return remove
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return remove
|
// Any reconnecting allocation not set to stop must be reconnected.
|
||||||
|
for _, alloc := range reconnecting {
|
||||||
|
if _, ok := stop[alloc.ID]; !ok {
|
||||||
|
reconnect[alloc.ID] = alloc
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return reconnect, stop
|
||||||
|
}
|
||||||
|
|
||||||
|
// pickReconnectingAlloc returns the allocation to keep between the original
|
||||||
|
// one that is reconnecting and one of its replacements.
|
||||||
|
//
|
||||||
|
// This function is not commutative, meaning that pickReconnectingAlloc(A, B)
|
||||||
|
// is not the same as pickReconnectingAlloc(B, A). Preference is given to keep
|
||||||
|
// the original allocation when possible.
|
||||||
|
func pickReconnectingAlloc(original *structs.Allocation, replacement *structs.Allocation) *structs.Allocation {
|
||||||
|
// Check if the replacement is newer.
|
||||||
|
// Always prefer the replacement if true.
|
||||||
|
replacementIsNewer := replacement.Job.Version > original.Job.Version ||
|
||||||
|
replacement.Job.CreateIndex > original.Job.CreateIndex
|
||||||
|
if replacementIsNewer {
|
||||||
|
return replacement
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if the replacement has better placement score.
|
||||||
|
// If any of the scores is not available, only pick the replacement if
|
||||||
|
// itself does have scores.
|
||||||
|
originalMaxScoreMeta := original.Metrics.MaxNormScore()
|
||||||
|
replacementMaxScoreMeta := replacement.Metrics.MaxNormScore()
|
||||||
|
|
||||||
|
replacementHasBetterScore := originalMaxScoreMeta == nil && replacementMaxScoreMeta != nil ||
|
||||||
|
(originalMaxScoreMeta != nil && replacementMaxScoreMeta != nil &&
|
||||||
|
replacementMaxScoreMeta.NormScore > originalMaxScoreMeta.NormScore)
|
||||||
|
|
||||||
|
// Check if the replacement has better client status.
|
||||||
|
// Even with a better placement score make sure we don't replace a running
|
||||||
|
// allocation with one that is not.
|
||||||
|
replacementIsRunning := replacement.ClientStatus == structs.AllocClientStatusRunning
|
||||||
|
originalNotRunning := original.ClientStatus != structs.AllocClientStatusRunning
|
||||||
|
|
||||||
|
if replacementHasBetterScore && (replacementIsRunning || originalNotRunning) {
|
||||||
|
return replacement
|
||||||
|
}
|
||||||
|
|
||||||
|
return original
|
||||||
}
|
}
|
||||||
|
|
||||||
// computeUpdates determines which allocations for the passed group require
|
// computeUpdates determines which allocations for the passed group require
|
||||||
|
|
|
@ -8,6 +8,7 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/hashicorp/go-set"
|
||||||
"github.com/hashicorp/nomad/ci"
|
"github.com/hashicorp/nomad/ci"
|
||||||
"github.com/hashicorp/nomad/helper/pointer"
|
"github.com/hashicorp/nomad/helper/pointer"
|
||||||
"github.com/hashicorp/nomad/helper/testlog"
|
"github.com/hashicorp/nomad/helper/testlog"
|
||||||
|
@ -5328,6 +5329,9 @@ func TestReconciler_Disconnected_Client(t *testing.T) {
|
||||||
nodeStatusDisconnected bool
|
nodeStatusDisconnected bool
|
||||||
replace bool
|
replace bool
|
||||||
failReplacement bool
|
failReplacement bool
|
||||||
|
taintReplacement bool
|
||||||
|
disconnectReplacement bool
|
||||||
|
replaceFailedReplacement bool
|
||||||
shouldStopOnDisconnectedNode bool
|
shouldStopOnDisconnectedNode bool
|
||||||
maxDisconnect *time.Duration
|
maxDisconnect *time.Duration
|
||||||
expected *resultExpectation
|
expected *resultExpectation
|
||||||
|
@ -5449,6 +5453,66 @@ func TestReconciler_Disconnected_Client(t *testing.T) {
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
name: "keep-original-alloc-and-stop-failed-replacement",
|
||||||
|
allocCount: 3,
|
||||||
|
replace: true,
|
||||||
|
failReplacement: true,
|
||||||
|
disconnectedAllocCount: 2,
|
||||||
|
disconnectedAllocStatus: structs.AllocClientStatusRunning,
|
||||||
|
disconnectedAllocStates: disconnectAllocState,
|
||||||
|
serverDesiredStatus: structs.AllocDesiredStatusRun,
|
||||||
|
expected: &resultExpectation{
|
||||||
|
reconnectUpdates: 2,
|
||||||
|
stop: 2,
|
||||||
|
desiredTGUpdates: map[string]*structs.DesiredUpdates{
|
||||||
|
"web": {
|
||||||
|
Ignore: 3,
|
||||||
|
Stop: 2,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "keep-original-and-stop-reconnecting-replacement",
|
||||||
|
allocCount: 2,
|
||||||
|
replace: true,
|
||||||
|
disconnectReplacement: true,
|
||||||
|
disconnectedAllocCount: 1,
|
||||||
|
disconnectedAllocStatus: structs.AllocClientStatusRunning,
|
||||||
|
disconnectedAllocStates: disconnectAllocState,
|
||||||
|
serverDesiredStatus: structs.AllocDesiredStatusRun,
|
||||||
|
expected: &resultExpectation{
|
||||||
|
reconnectUpdates: 1,
|
||||||
|
stop: 1,
|
||||||
|
desiredTGUpdates: map[string]*structs.DesiredUpdates{
|
||||||
|
"web": {
|
||||||
|
Ignore: 2,
|
||||||
|
Stop: 1,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "keep-original-and-stop-tainted-replacement",
|
||||||
|
allocCount: 3,
|
||||||
|
replace: true,
|
||||||
|
taintReplacement: true,
|
||||||
|
disconnectedAllocCount: 2,
|
||||||
|
disconnectedAllocStatus: structs.AllocClientStatusRunning,
|
||||||
|
disconnectedAllocStates: disconnectAllocState,
|
||||||
|
serverDesiredStatus: structs.AllocDesiredStatusRun,
|
||||||
|
expected: &resultExpectation{
|
||||||
|
reconnectUpdates: 2,
|
||||||
|
stop: 2,
|
||||||
|
desiredTGUpdates: map[string]*structs.DesiredUpdates{
|
||||||
|
"web": {
|
||||||
|
Ignore: 3,
|
||||||
|
Stop: 2,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
{
|
{
|
||||||
name: "stop-original-alloc-with-old-job-version",
|
name: "stop-original-alloc-with-old-job-version",
|
||||||
allocCount: 5,
|
allocCount: 5,
|
||||||
|
@ -5490,14 +5554,15 @@ func TestReconciler_Disconnected_Client(t *testing.T) {
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "stop-original-alloc-with-old-job-version-and-failed-replacements",
|
name: "stop-original-alloc-with-old-job-version-and-failed-replacements-replaced",
|
||||||
allocCount: 5,
|
allocCount: 5,
|
||||||
replace: true,
|
replace: true,
|
||||||
|
failReplacement: true,
|
||||||
|
replaceFailedReplacement: true,
|
||||||
disconnectedAllocCount: 2,
|
disconnectedAllocCount: 2,
|
||||||
disconnectedAllocStatus: structs.AllocClientStatusRunning,
|
disconnectedAllocStatus: structs.AllocClientStatusRunning,
|
||||||
disconnectedAllocStates: disconnectAllocState,
|
disconnectedAllocStates: disconnectAllocState,
|
||||||
serverDesiredStatus: structs.AllocDesiredStatusRun,
|
serverDesiredStatus: structs.AllocDesiredStatusRun,
|
||||||
failReplacement: true,
|
|
||||||
shouldStopOnDisconnectedNode: true,
|
shouldStopOnDisconnectedNode: true,
|
||||||
jobVersionIncrement: 1,
|
jobVersionIncrement: 1,
|
||||||
expected: &resultExpectation{
|
expected: &resultExpectation{
|
||||||
|
@ -5530,6 +5595,28 @@ func TestReconciler_Disconnected_Client(t *testing.T) {
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
name: "stop-failed-original-and-failed-replacements-and-place-new",
|
||||||
|
allocCount: 5,
|
||||||
|
replace: true,
|
||||||
|
failReplacement: true,
|
||||||
|
disconnectedAllocCount: 2,
|
||||||
|
disconnectedAllocStatus: structs.AllocClientStatusFailed,
|
||||||
|
disconnectedAllocStates: disconnectAllocState,
|
||||||
|
serverDesiredStatus: structs.AllocDesiredStatusRun,
|
||||||
|
shouldStopOnDisconnectedNode: true,
|
||||||
|
expected: &resultExpectation{
|
||||||
|
stop: 4,
|
||||||
|
place: 2,
|
||||||
|
desiredTGUpdates: map[string]*structs.DesiredUpdates{
|
||||||
|
"web": {
|
||||||
|
Stop: 4,
|
||||||
|
Place: 2,
|
||||||
|
Ignore: 3,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
{
|
{
|
||||||
name: "stop-expired-allocs",
|
name: "stop-expired-allocs",
|
||||||
allocCount: 5,
|
allocCount: 5,
|
||||||
|
@ -5585,6 +5672,11 @@ func TestReconciler_Disconnected_Client(t *testing.T) {
|
||||||
// Create resumable allocs
|
// Create resumable allocs
|
||||||
job, allocs := buildResumableAllocations(tc.allocCount, structs.AllocClientStatusRunning, structs.AllocDesiredStatusRun, 2)
|
job, allocs := buildResumableAllocations(tc.allocCount, structs.AllocClientStatusRunning, structs.AllocDesiredStatusRun, 2)
|
||||||
|
|
||||||
|
origAllocs := set.New[string](len(allocs))
|
||||||
|
for _, alloc := range allocs {
|
||||||
|
origAllocs.Insert(alloc.ID)
|
||||||
|
}
|
||||||
|
|
||||||
if tc.isBatch {
|
if tc.isBatch {
|
||||||
job.Type = structs.JobTypeBatch
|
job.Type = structs.JobTypeBatch
|
||||||
}
|
}
|
||||||
|
@ -5623,6 +5715,7 @@ func TestReconciler_Disconnected_Client(t *testing.T) {
|
||||||
replacement.PreviousAllocation = alloc.ID
|
replacement.PreviousAllocation = alloc.ID
|
||||||
replacement.AllocStates = nil
|
replacement.AllocStates = nil
|
||||||
replacement.TaskStates = nil
|
replacement.TaskStates = nil
|
||||||
|
replacement.CreateIndex += 1
|
||||||
alloc.NextAllocation = replacement.ID
|
alloc.NextAllocation = replacement.ID
|
||||||
|
|
||||||
if tc.jobVersionIncrement != 0 {
|
if tc.jobVersionIncrement != 0 {
|
||||||
|
@ -5631,19 +5724,33 @@ func TestReconciler_Disconnected_Client(t *testing.T) {
|
||||||
if tc.nodeScoreIncrement != 0 {
|
if tc.nodeScoreIncrement != 0 {
|
||||||
replacement.Metrics.ScoreMetaData[0].NormScore = replacement.Metrics.ScoreMetaData[0].NormScore + tc.nodeScoreIncrement
|
replacement.Metrics.ScoreMetaData[0].NormScore = replacement.Metrics.ScoreMetaData[0].NormScore + tc.nodeScoreIncrement
|
||||||
}
|
}
|
||||||
|
if tc.taintReplacement {
|
||||||
replacements = append(replacements, replacement)
|
replacement.DesiredTransition.Migrate = pointer.Of(true)
|
||||||
|
}
|
||||||
|
if tc.disconnectReplacement {
|
||||||
|
replacement.AllocStates = tc.disconnectedAllocStates
|
||||||
|
}
|
||||||
|
|
||||||
// If we want to test intermediate replacement failures simulate that.
|
// If we want to test intermediate replacement failures simulate that.
|
||||||
if tc.failReplacement {
|
if tc.failReplacement {
|
||||||
replacement.ClientStatus = structs.AllocClientStatusFailed
|
replacement.ClientStatus = structs.AllocClientStatusFailed
|
||||||
nextReplacement := replacement.Copy()
|
|
||||||
nextReplacement.ID = uuid.Generate()
|
if tc.replaceFailedReplacement {
|
||||||
nextReplacement.ClientStatus = structs.AllocClientStatusRunning
|
nextReplacement := replacement.Copy()
|
||||||
nextReplacement.PreviousAllocation = replacement.ID
|
nextReplacement.ID = uuid.Generate()
|
||||||
replacement.NextAllocation = nextReplacement.ID
|
nextReplacement.ClientStatus = structs.AllocClientStatusRunning
|
||||||
replacements = append(replacements, nextReplacement)
|
nextReplacement.DesiredStatus = structs.AllocDesiredStatusRun
|
||||||
|
nextReplacement.PreviousAllocation = replacement.ID
|
||||||
|
nextReplacement.CreateIndex += 1
|
||||||
|
|
||||||
|
replacement.NextAllocation = nextReplacement.ID
|
||||||
|
replacement.DesiredStatus = structs.AllocDesiredStatusStop
|
||||||
|
|
||||||
|
replacements = append(replacements, nextReplacement)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
replacements = append(replacements, replacement)
|
||||||
}
|
}
|
||||||
|
|
||||||
allocs = append(allocs, replacements...)
|
allocs = append(allocs, replacements...)
|
||||||
|
@ -5661,6 +5768,11 @@ func TestReconciler_Disconnected_Client(t *testing.T) {
|
||||||
assertResults(t, results, tc.expected)
|
assertResults(t, results, tc.expected)
|
||||||
|
|
||||||
for _, stopResult := range results.stop {
|
for _, stopResult := range results.stop {
|
||||||
|
// Skip replacement allocs.
|
||||||
|
if !origAllocs.Contains(stopResult.alloc.ID) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
if tc.shouldStopOnDisconnectedNode {
|
if tc.shouldStopOnDisconnectedNode {
|
||||||
require.Equal(t, testNode.ID, stopResult.alloc.NodeID)
|
require.Equal(t, testNode.ID, stopResult.alloc.NodeID)
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -520,19 +520,6 @@ func (a allocSet) filterByDeployment(id string) (match, nonmatch allocSet) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// filterByFailedReconnect filters allocation into a set that have failed on the
|
|
||||||
// client but do not have a terminal status at the server so that they can be
|
|
||||||
// marked as stop at the server.
|
|
||||||
func (a allocSet) filterByFailedReconnect() allocSet {
|
|
||||||
failed := make(allocSet)
|
|
||||||
for _, alloc := range a {
|
|
||||||
if !alloc.ServerTerminalStatus() && alloc.ClientStatus == structs.AllocClientStatusFailed {
|
|
||||||
failed[alloc.ID] = alloc
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return failed
|
|
||||||
}
|
|
||||||
|
|
||||||
// delayByStopAfterClientDisconnect returns a delay for any lost allocation that's got a
|
// delayByStopAfterClientDisconnect returns a delay for any lost allocation that's got a
|
||||||
// stop_after_client_disconnect configured
|
// stop_after_client_disconnect configured
|
||||||
func (a allocSet) delayByStopAfterClientDisconnect() (later []*delayedRescheduleInfo) {
|
func (a allocSet) delayByStopAfterClientDisconnect() (later []*delayedRescheduleInfo) {
|
||||||
|
|
Loading…
Reference in New Issue