backport of commit e8efe2d251bf3628f13c7eb3ce2422eb7e5b85f6 (#18884)

Co-authored-by: Juana De La Cuesta <juanita.delacuestamorales@hashicorp.com>
This commit is contained in:
hc-github-team-nomad-core 2023-10-27 10:20:53 -05:00 committed by GitHub
parent c21331bc21
commit 50c9af53b7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 560 additions and 260 deletions

View File

@ -134,6 +134,7 @@ const (
)
const (
// SystemInitializationType is used for messages that initialize parts of
// the system, such as the state store. These messages are not included in
// the event stream.
@ -10730,6 +10731,12 @@ func (a *Allocation) MigrateStrategy() *MigrateStrategy {
func (a *Allocation) NextRescheduleTime() (time.Time, bool) {
failTime := a.LastEventTime()
reschedulePolicy := a.ReschedulePolicy()
//If reschedule is disabled, return early
if reschedulePolicy.Attempts == 0 && !reschedulePolicy.Unlimited {
return time.Time{}, false
}
if a.DesiredStatus == AllocDesiredStatusStop || a.ClientStatus != AllocClientStatusFailed || failTime.IsZero() || reschedulePolicy == nil {
return time.Time{}, false
}
@ -10749,16 +10756,16 @@ func (a *Allocation) nextRescheduleTime(failTime time.Time, reschedulePolicy *Re
return nextRescheduleTime, rescheduleEligible
}
// NextRescheduleTimeByFailTime works like NextRescheduleTime but allows callers
// NextRescheduleTimeByTime works like NextRescheduleTime but allows callers
// specify a failure time. Useful for things like determining whether to reschedule
// an alloc on a disconnected node.
func (a *Allocation) NextRescheduleTimeByFailTime(failTime time.Time) (time.Time, bool) {
func (a *Allocation) NextRescheduleTimeByTime(t time.Time) (time.Time, bool) {
reschedulePolicy := a.ReschedulePolicy()
if reschedulePolicy == nil {
return time.Time{}, false
}
return a.nextRescheduleTime(failTime, reschedulePolicy)
return a.nextRescheduleTime(t, reschedulePolicy)
}
// ShouldClientStop tests an alloc for StopAfterClientDisconnect configuration
@ -11098,7 +11105,7 @@ func (a *Allocation) Expired(now time.Time) bool {
}
expiry := lastUnknown.Add(*tg.MaxClientDisconnect)
return now.UTC().After(expiry) || now.UTC().Equal(expiry)
return expiry.Sub(now) <= 0
}
// LastUnknown returns the timestamp for the last time the allocation

View File

@ -786,7 +786,7 @@ SUBMIT:
}
return err
} else {
w.logger.Debug("created evaluation", "eval", log.Fmt("%#v", eval))
w.logger.Debug("created evaluation", "eval", log.Fmt("%#v", eval), "waitUntil", log.Fmt("%#v", eval.WaitUntil.String()))
w.backoffReset()
}
return nil

View File

@ -6944,6 +6944,7 @@ func TestServiceSched_Client_Disconnect_Creates_Updates_and_Evals(t *testing.T)
NodeID: disconnectedNode.ID,
Status: structs.EvalStatusPending,
}}
nodeStatusUpdateEval := evals[0]
require.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), evals))
@ -6953,16 +6954,21 @@ func TestServiceSched_Client_Disconnect_Creates_Updates_and_Evals(t *testing.T)
require.Equal(t, structs.EvalStatusComplete, h.Evals[0].Status)
require.Len(t, h.Plans, 1, "plan")
// One followup delayed eval created
require.Len(t, h.CreateEvals, 1)
followUpEval := h.CreateEvals[0]
require.Equal(t, nodeStatusUpdateEval.ID, followUpEval.PreviousEval)
require.Equal(t, "pending", followUpEval.Status)
require.NotEmpty(t, followUpEval.WaitUntil)
// Two followup delayed eval created
require.Len(t, h.CreateEvals, 2)
followUpEval1 := h.CreateEvals[0]
require.Equal(t, nodeStatusUpdateEval.ID, followUpEval1.PreviousEval)
require.Equal(t, "pending", followUpEval1.Status)
require.NotEmpty(t, followUpEval1.WaitUntil)
// Insert eval in the state store
followUpEval2 := h.CreateEvals[1]
require.Equal(t, nodeStatusUpdateEval.ID, followUpEval2.PreviousEval)
require.Equal(t, "pending", followUpEval2.Status)
require.NotEmpty(t, followUpEval2.WaitUntil)
// Insert eval1 in the state store
testutil.WaitForResult(func() (bool, error) {
found, err := h.State.EvalByID(nil, followUpEval.ID)
found, err := h.State.EvalByID(nil, followUpEval1.ID)
if err != nil {
return false, err
}
@ -6976,12 +6982,34 @@ func TestServiceSched_Client_Disconnect_Creates_Updates_and_Evals(t *testing.T)
return true, nil
}, func(err error) {
require.NoError(t, err)
})
// Insert eval2 in the state store
testutil.WaitForResult(func() (bool, error) {
found, err := h.State.EvalByID(nil, followUpEval2.ID)
if err != nil {
return false, err
}
if found == nil {
return false, nil
}
require.Equal(t, nodeStatusUpdateEval.ID, found.PreviousEval)
require.Equal(t, "pending", found.Status)
require.NotEmpty(t, found.WaitUntil)
return true, nil
}, func(err error) {
require.NoError(t, err)
})
// Validate that the ClientStatus updates are part of the plan.
require.Len(t, h.Plans[0].NodeAllocation[disconnectedNode.ID], count)
// Pending update should have unknown status.
for _, nodeAlloc := range h.Plans[0].NodeAllocation[disconnectedNode.ID] {
require.Equal(t, nodeAlloc.ClientStatus, structs.AllocClientStatusUnknown)
}
@ -6991,6 +7019,7 @@ func TestServiceSched_Client_Disconnect_Creates_Updates_and_Evals(t *testing.T)
require.NoError(t, err, "plan.NodeUpdate")
// Validate that the StateStore Upsert applied the ClientStatus we specified.
for _, alloc := range unknownAllocs {
alloc, err = h.State.AllocByID(nil, alloc.ID)
require.NoError(t, err)

View File

@ -230,6 +230,7 @@ func (a *allocReconciler) computeDeploymentComplete(m allocMatrix) bool {
groupComplete := a.computeGroup(group, as)
complete = complete && groupComplete
}
return complete
}
@ -398,6 +399,7 @@ func (a *allocReconciler) markDelayed(allocs allocSet, clientStatus, statusDescr
// computeGroup reconciles state for a particular task group. It returns whether
// the deployment it is for is complete with regards to the task group.
func (a *allocReconciler) computeGroup(groupName string, all allocSet) bool {
// Create the desired update object for the group
desiredChanges := new(structs.DesiredUpdates)
a.result.desiredTGUpdates[groupName] = desiredChanges
@ -426,6 +428,9 @@ func (a *allocReconciler) computeGroup(groupName string, all allocSet) bool {
untainted, migrate, lost, disconnecting, reconnecting, ignore := all.filterByTainted(a.taintedNodes, a.supportsDisconnectedClients, a.now)
desiredChanges.Ignore += uint64(len(ignore))
// Determine what set of terminal allocations need to be rescheduled
untainted, rescheduleNow, rescheduleLater := untainted.filterByRescheduleable(a.batch, false, a.now, a.evalID, a.deployment)
// If there are allocations reconnecting we need to reconcile them and
// their replacements first because there is specific logic when deciding
// which ones to keep that can only be applied when the client reconnects.
@ -454,20 +459,28 @@ func (a *allocReconciler) computeGroup(groupName string, all allocSet) bool {
untainted = untainted.union(reconnect)
}
// Determine what set of terminal allocations need to be rescheduled
untainted, rescheduleNow, rescheduleLater := untainted.filterByRescheduleable(a.batch, false, a.now, a.evalID, a.deployment)
// Determine what set of disconnecting allocations need to be rescheduled now,
// which ones later and which ones can't be rescheduled at all.
timeoutLaterEvals := map[string]string{}
if len(disconnecting) > 0 {
untaintedDisconnecting, rescheduleDisconnecting, laterDisconnecting := disconnecting.filterByRescheduleable(a.batch, true, a.now, a.evalID, a.deployment)
// Determine what set of disconnecting allocations need to be rescheduled
_, rescheduleDisconnecting, _ := disconnecting.filterByRescheduleable(a.batch, true, a.now, a.evalID, a.deployment)
rescheduleNow = rescheduleNow.union(rescheduleDisconnecting)
// Find delays for any lost allocs that have stop_after_client_disconnect
lostLater := lost.delayByStopAfterClientDisconnect()
lostLaterEvals := a.createLostLaterEvals(lostLater, tg.Name)
untainted = untainted.union(untaintedDisconnecting)
rescheduleLater = append(rescheduleLater, laterDisconnecting...)
// Find delays for any disconnecting allocs that have max_client_disconnect,
// create followup evals, and update the ClientStatus to unknown.
timeoutLaterEvals := a.createTimeoutLaterEvals(disconnecting, tg.Name)
timeoutLaterEvals = a.createTimeoutLaterEvals(disconnecting, tg.Name)
}
// Find delays for any lost allocs that have stop_after_client_disconnect
lostLaterEvals := map[string]string{}
lostLater := []*delayedRescheduleInfo{}
if len(lost) > 0 {
lostLater = lost.delayByStopAfterClientDisconnect()
lostLaterEvals = a.createLostLaterEvals(lostLater, tg.Name)
}
// Merge disconnecting with the stop_after_client_disconnect set into the
// lostLaterEvals so that computeStop can add them to the stop set.
@ -486,13 +499,15 @@ func (a *allocReconciler) computeGroup(groupName string, all allocSet) bool {
// include stopped allocations.
isCanarying := dstate != nil && dstate.DesiredCanaries != 0 && !dstate.Promoted
stop := a.computeStop(tg, nameIndex, untainted, migrate, lost, canaries, isCanarying, lostLaterEvals)
desiredChanges.Stop += uint64(len(stop))
untainted = untainted.difference(stop)
// Do inplace upgrades where possible and capture the set of upgrades that
// need to be done destructively.
ignore, inplace, destructive := a.computeUpdates(tg, untainted)
desiredChanges.Ignore += uint64(len(ignore))
ignoreUpdates, inplace, destructive := a.computeUpdates(tg, untainted)
desiredChanges.Ignore += uint64(len(ignoreUpdates))
desiredChanges.InPlaceUpdate += uint64(len(inplace))
if !existingDeployment {
dstate.DesiredTotal += len(destructive) + len(inplace)
@ -796,7 +811,8 @@ func (a *allocReconciler) computeReplacements(deploymentPlaceReady bool, desired
// replacements based off that.
failed := make(allocSet)
for id, alloc := range rescheduleNow {
if _, ok := a.result.disconnectUpdates[id]; !ok {
_, ok := a.result.disconnectUpdates[id]
if !ok && alloc.ClientStatus != structs.AllocClientStatusUnknown {
failed[id] = alloc
}
}
@ -968,8 +984,11 @@ func (a *allocReconciler) computeStop(group *structs.TaskGroup, nameIndex *alloc
untainted = untainted.difference(canaries)
}
// Remove disconnected allocations so they won't be stopped
knownUntainted := untainted.filterOutByClientStatus(structs.AllocClientStatusUnknown)
// Hot path the nothing to do case
remove := len(untainted) + len(migrate) - group.Count
remove := len(knownUntainted) + len(migrate) - group.Count
if remove <= 0 {
return stop
}
@ -1072,7 +1091,7 @@ func (a *allocReconciler) computeStop(group *structs.TaskGroup, nameIndex *alloc
// - If the reconnecting allocation is to be stopped, its replacements may
// not be present in any of the returned sets. The rest of the reconciler
// logic will handle them.
func (a *allocReconciler) reconcileReconnecting(reconnecting allocSet, others allocSet) (allocSet, allocSet) {
func (a *allocReconciler) reconcileReconnecting(reconnecting allocSet, all allocSet) (allocSet, allocSet) {
stop := make(allocSet)
reconnect := make(allocSet)
@ -1111,14 +1130,11 @@ func (a *allocReconciler) reconcileReconnecting(reconnecting allocSet, others al
// Find replacement allocations and decide which one to stop. A
// reconnecting allocation may have multiple replacements.
for _, replacementAlloc := range others {
for _, replacementAlloc := range all {
// Skip allocations that are not a replacement of the one
// reconnecting. Replacement allocations have the same name but a
// higher CreateIndex and a different ID.
isReplacement := replacementAlloc.ID != reconnectingAlloc.ID &&
replacementAlloc.Name == reconnectingAlloc.Name &&
replacementAlloc.CreateIndex > reconnectingAlloc.CreateIndex
// reconnecting.
isReplacement := replacementAlloc.ID == reconnectingAlloc.NextAllocation
// Skip allocations that are server terminal.
// We don't want to replace a reconnecting allocation with one that
@ -1142,7 +1158,8 @@ func (a *allocReconciler) reconcileReconnecting(reconnecting allocSet, others al
}
} else {
// The reconnecting allocation is preferred, so stop this
// replacement.
// replacement, but avoid re-stopping stopped allocs
if replacementAlloc.ClientStatus != structs.AllocClientStatusFailed {
stop[replacementAlloc.ID] = replacementAlloc
a.result.stop = append(a.result.stop, allocStopResult{
alloc: replacementAlloc,
@ -1151,6 +1168,7 @@ func (a *allocReconciler) reconcileReconnecting(reconnecting allocSet, others al
}
}
}
}
// Any reconnecting allocation not set to stop must be reconnected.
for _, alloc := range reconnecting {
@ -1235,11 +1253,17 @@ func (a *allocReconciler) createRescheduleLaterEvals(rescheduleLater []*delayedR
allocIDToFollowupEvalID := a.createLostLaterEvals(rescheduleLater, tgName)
// Create updates that will be applied to the allocs to mark the FollowupEvalID
for allocID, evalID := range allocIDToFollowupEvalID {
existingAlloc := all[allocID]
for _, laterAlloc := range rescheduleLater {
existingAlloc := all[laterAlloc.alloc.ID]
updatedAlloc := existingAlloc.Copy()
updatedAlloc.FollowupEvalID = evalID
a.result.attributeUpdates[updatedAlloc.ID] = updatedAlloc
updatedAlloc.FollowupEvalID = allocIDToFollowupEvalID[laterAlloc.alloc.ID]
// Can't updated an allocation that is disconnected
if _, ok := a.result.disconnectUpdates[laterAlloc.allocID]; !ok {
a.result.attributeUpdates[laterAlloc.allocID] = updatedAlloc
} else {
a.result.disconnectUpdates[laterAlloc.allocID].FollowupEvalID = allocIDToFollowupEvalID[laterAlloc.alloc.ID]
}
}
}
@ -1355,8 +1379,8 @@ func (a *allocReconciler) createTimeoutLaterEvals(disconnecting allocSet, tgName
}
timeoutDelays, err := disconnecting.delayByMaxClientDisconnect(a.now)
if err != nil || len(timeoutDelays) != len(disconnecting) {
a.logger.Error("error computing disconnecting timeouts for task_group",
if err != nil {
a.logger.Error("error for task_group",
"task_group", tgName, "error", err)
return map[string]string{}
}

View File

@ -5327,7 +5327,6 @@ func TestReconciler_Disconnected_Client(t *testing.T) {
nodeScoreIncrement float64
disconnectedAllocStatus string
disconnectedAllocStates []*structs.AllocState
serverDesiredStatus string
isBatch bool
nodeStatusDisconnected bool
replace bool
@ -5347,8 +5346,8 @@ func TestReconciler_Disconnected_Client(t *testing.T) {
replace: false,
disconnectedAllocCount: 2,
disconnectedAllocStatus: structs.AllocClientStatusRunning,
disconnectedAllocStates: disconnectAllocState,
serverDesiredStatus: structs.AllocDesiredStatusRun,
shouldStopOnDisconnectedNode: false,
expected: &resultExpectation{
reconnectUpdates: 2,
@ -5365,8 +5364,8 @@ func TestReconciler_Disconnected_Client(t *testing.T) {
replace: true,
disconnectedAllocCount: 1,
disconnectedAllocStatus: structs.AllocClientStatusRunning,
disconnectedAllocStates: disconnectAllocState,
serverDesiredStatus: structs.AllocDesiredStatusRun,
shouldStopOnDisconnectedNode: false,
expected: &resultExpectation{
stop: 1,
@ -5385,8 +5384,8 @@ func TestReconciler_Disconnected_Client(t *testing.T) {
replace: true,
disconnectedAllocCount: 1,
disconnectedAllocStatus: structs.AllocClientStatusRunning,
disconnectedAllocStates: disconnectAllocState,
serverDesiredStatus: structs.AllocDesiredStatusRun,
shouldStopOnDisconnectedNode: true,
nodeScoreIncrement: 1,
expected: &resultExpectation{
@ -5405,8 +5404,8 @@ func TestReconciler_Disconnected_Client(t *testing.T) {
replace: true,
disconnectedAllocCount: 2,
disconnectedAllocStatus: structs.AllocClientStatusFailed,
disconnectedAllocStates: disconnectAllocState,
serverDesiredStatus: structs.AllocDesiredStatusRun,
shouldStopOnDisconnectedNode: true,
expected: &resultExpectation{
stop: 2,
@ -5424,8 +5423,8 @@ func TestReconciler_Disconnected_Client(t *testing.T) {
replace: false,
disconnectedAllocCount: 2,
disconnectedAllocStatus: structs.AllocClientStatusFailed,
disconnectedAllocStates: disconnectAllocState,
serverDesiredStatus: structs.AllocDesiredStatusRun,
shouldStopOnDisconnectedNode: true,
expected: &resultExpectation{
stop: 2,
@ -5445,13 +5444,15 @@ func TestReconciler_Disconnected_Client(t *testing.T) {
replace: false,
disconnectedAllocCount: 2,
disconnectedAllocStatus: structs.AllocClientStatusComplete,
disconnectedAllocStates: disconnectAllocState,
serverDesiredStatus: structs.AllocDesiredStatusRun,
isBatch: true,
expected: &resultExpectation{
place: 2,
desiredTGUpdates: map[string]*structs.DesiredUpdates{
"web": {
Ignore: 2,
Place: 2,
},
},
},
@ -5463,15 +5464,14 @@ func TestReconciler_Disconnected_Client(t *testing.T) {
failReplacement: true,
disconnectedAllocCount: 2,
disconnectedAllocStatus: structs.AllocClientStatusRunning,
disconnectedAllocStates: disconnectAllocState,
serverDesiredStatus: structs.AllocDesiredStatusRun,
expected: &resultExpectation{
reconnectUpdates: 2,
stop: 2,
stop: 0,
desiredTGUpdates: map[string]*structs.DesiredUpdates{
"web": {
Ignore: 3,
Stop: 2,
Ignore: 5,
},
},
},
@ -5483,8 +5483,8 @@ func TestReconciler_Disconnected_Client(t *testing.T) {
disconnectReplacement: true,
disconnectedAllocCount: 1,
disconnectedAllocStatus: structs.AllocClientStatusRunning,
disconnectedAllocStates: disconnectAllocState,
serverDesiredStatus: structs.AllocDesiredStatusRun,
expected: &resultExpectation{
reconnectUpdates: 1,
stop: 1,
@ -5503,8 +5503,8 @@ func TestReconciler_Disconnected_Client(t *testing.T) {
taintReplacement: true,
disconnectedAllocCount: 2,
disconnectedAllocStatus: structs.AllocClientStatusRunning,
disconnectedAllocStates: disconnectAllocState,
serverDesiredStatus: structs.AllocDesiredStatusRun,
expected: &resultExpectation{
reconnectUpdates: 2,
stop: 2,
@ -5522,8 +5522,8 @@ func TestReconciler_Disconnected_Client(t *testing.T) {
replace: true,
disconnectedAllocCount: 2,
disconnectedAllocStatus: structs.AllocClientStatusRunning,
disconnectedAllocStates: disconnectAllocState,
serverDesiredStatus: structs.AllocDesiredStatusRun,
shouldStopOnDisconnectedNode: true,
jobVersionIncrement: 1,
expected: &resultExpectation{
@ -5542,8 +5542,8 @@ func TestReconciler_Disconnected_Client(t *testing.T) {
replace: true,
disconnectedAllocCount: 2,
disconnectedAllocStatus: structs.AllocClientStatusRunning,
disconnectedAllocStates: disconnectAllocState,
serverDesiredStatus: structs.AllocDesiredStatusRun,
shouldStopOnDisconnectedNode: true,
jobVersionIncrement: 1,
expected: &resultExpectation{
@ -5564,16 +5564,17 @@ func TestReconciler_Disconnected_Client(t *testing.T) {
replaceFailedReplacement: true,
disconnectedAllocCount: 2,
disconnectedAllocStatus: structs.AllocClientStatusRunning,
disconnectedAllocStates: disconnectAllocState,
serverDesiredStatus: structs.AllocDesiredStatusRun,
shouldStopOnDisconnectedNode: true,
shouldStopOnDisconnectedNode: false,
jobVersionIncrement: 1,
expected: &resultExpectation{
stop: 2,
reconnectUpdates: 2,
desiredTGUpdates: map[string]*structs.DesiredUpdates{
"web": {
Stop: 2,
Ignore: 5,
Ignore: 7,
},
},
},
@ -5584,8 +5585,8 @@ func TestReconciler_Disconnected_Client(t *testing.T) {
replace: true,
disconnectedAllocCount: 1,
disconnectedAllocStatus: structs.AllocClientStatusPending,
disconnectedAllocStates: disconnectAllocState,
serverDesiredStatus: structs.AllocDesiredStatusRun,
shouldStopOnDisconnectedNode: true,
nodeStatusDisconnected: true,
expected: &resultExpectation{
@ -5605,17 +5606,17 @@ func TestReconciler_Disconnected_Client(t *testing.T) {
failReplacement: true,
disconnectedAllocCount: 2,
disconnectedAllocStatus: structs.AllocClientStatusFailed,
disconnectedAllocStates: disconnectAllocState,
serverDesiredStatus: structs.AllocDesiredStatusRun,
shouldStopOnDisconnectedNode: true,
expected: &resultExpectation{
stop: 4,
stop: 2,
place: 2,
desiredTGUpdates: map[string]*structs.DesiredUpdates{
"web": {
Stop: 4,
Stop: 2,
Place: 2,
Ignore: 3,
Ignore: 5,
},
},
},
@ -5627,7 +5628,6 @@ func TestReconciler_Disconnected_Client(t *testing.T) {
disconnectedAllocCount: 2,
disconnectedAllocStatus: structs.AllocClientStatusUnknown,
disconnectedAllocStates: disconnectAllocState,
serverDesiredStatus: structs.AllocDesiredStatusRun,
shouldStopOnDisconnectedNode: true,
nodeStatusDisconnected: true,
maxDisconnect: pointer.Of(2 * time.Second),
@ -5648,7 +5648,6 @@ func TestReconciler_Disconnected_Client(t *testing.T) {
disconnectedAllocCount: 2,
disconnectedAllocStatus: structs.AllocClientStatusRunning,
disconnectedAllocStates: []*structs.AllocState{},
serverDesiredStatus: structs.AllocDesiredStatusRun,
nodeStatusDisconnected: true,
expected: &resultExpectation{
place: 2,
@ -5687,7 +5686,7 @@ func TestReconciler_Disconnected_Client(t *testing.T) {
// Set alloc state
disconnectedAllocCount := tc.disconnectedAllocCount
for _, alloc := range allocs {
alloc.DesiredStatus = tc.serverDesiredStatus
alloc.DesiredStatus = structs.AllocDesiredStatusRun
if tc.maxDisconnect != nil {
alloc.Job.TaskGroups[0].MaxClientDisconnect = tc.maxDisconnect
@ -5699,7 +5698,6 @@ func TestReconciler_Disconnected_Client(t *testing.T) {
// Set the node id on all the disconnected allocs to the node under test.
alloc.NodeID = testNode.ID
alloc.NodeName = "disconnected"
disconnectedAllocCount--
}
}
@ -5791,6 +5789,12 @@ func TestReconciler_Disconnected_Client(t *testing.T) {
// Tests that a client disconnect while a canary is in progress generates the result.
func TestReconciler_Client_Disconnect_Canaries(t *testing.T) {
disconnectAllocState := []*structs.AllocState{{
Field: structs.AllocStateFieldClientStatus,
Value: structs.AllocClientStatusUnknown,
Time: time.Now(),
}}
type testCase struct {
name string
nodes []string
@ -5883,7 +5887,7 @@ func TestReconciler_Client_Disconnect_Canaries(t *testing.T) {
updatedJob.TaskGroups[0].Name: {
Place: 3,
Canary: 0,
Ignore: 3,
Ignore: 6,
},
},
},
@ -5947,7 +5951,7 @@ func TestReconciler_Client_Disconnect_Canaries(t *testing.T) {
updatedJob.TaskGroups[0].Name: {
Place: 2,
Canary: 0,
Ignore: 4,
Ignore: 7,
},
},
},
@ -6013,7 +6017,7 @@ func TestReconciler_Client_Disconnect_Canaries(t *testing.T) {
updatedJob.TaskGroups[0].Name: {
Place: 2,
Canary: 0,
Ignore: 3,
Ignore: 6,
// The 2 stops in this test are transient failures, but
// the deployment can still progress. We don't include
// them in the stop count since DesiredTGUpdates is used
@ -6083,6 +6087,12 @@ func TestReconciler_Client_Disconnect_Canaries(t *testing.T) {
if alloc.ClientStatus == structs.AllocClientStatusRunning {
alloc.DeploymentStatus.Healthy = pointer.Of(true)
}
if alloc.ClientStatus == structs.AllocClientStatusUnknown {
alloc.AllocStates = disconnectAllocState
alloc.FollowupEvalID = "eval-where-it-was-set-to-unknow"
}
tc.deploymentState.PlacedCanaries = append(tc.deploymentState.PlacedCanaries, alloc.ID)
handled[alloc.ID] = allocUpdateFnIgnore
canariesConfigured++

View File

@ -9,6 +9,7 @@ package scheduler
// all scheduler types before moving it into util.go
import (
"errors"
"fmt"
"sort"
"strings"
@ -264,10 +265,11 @@ func (a allocSet) filterByTainted(taintedNodes map[string]*structs.Node, serverS
taintedNode, nodeIsTainted := taintedNodes[alloc.NodeID]
if taintedNode != nil {
// Group disconnecting/reconnecting
// Group disconnecting
switch taintedNode.Status {
case structs.NodeStatusDisconnected:
if supportsDisconnectedClients {
// Filter running allocs on a node that is disconnected to be marked as unknown.
if alloc.ClientStatus == structs.AllocClientStatusRunning {
disconnecting[alloc.ID] = alloc
@ -289,6 +291,7 @@ func (a allocSet) filterByTainted(taintedNodes map[string]*structs.Node, serverS
lost[alloc.ID] = alloc
continue
}
reconnecting[alloc.ID] = alloc
continue
}
@ -296,9 +299,16 @@ func (a allocSet) filterByTainted(taintedNodes map[string]*structs.Node, serverS
}
}
if alloc.TerminalStatus() && !reconnect {
// Terminal allocs, if supportsDisconnectedClient and not reconnect,
// are probably stopped replacements and should be ignored
if supportsDisconnectedClients {
ignore[alloc.ID] = alloc
continue
}
// Terminal allocs, if not reconnect, are always untainted as they
// should never be migrated.
if alloc.TerminalStatus() && !reconnect {
untainted[alloc.ID] = alloc
continue
}
@ -315,11 +325,11 @@ func (a allocSet) filterByTainted(taintedNodes map[string]*structs.Node, serverS
continue
}
// Ignore unknown allocs that we want to reconnect eventually.
// Acknowledge unknown allocs that we want to reconnect eventually.
if supportsDisconnectedClients &&
alloc.ClientStatus == structs.AllocClientStatusUnknown &&
alloc.DesiredStatus == structs.AllocDesiredStatusRun {
ignore[alloc.ID] = alloc
untainted[alloc.ID] = alloc
continue
}
@ -366,12 +376,11 @@ func (a allocSet) filterByTainted(taintedNodes map[string]*structs.Node, serverS
// untainted or a set of allocations that must be rescheduled now. Allocations that can be rescheduled
// at a future time are also returned so that we can create follow up evaluations for them. Allocs are
// skipped or considered untainted according to logic defined in shouldFilter method.
func (a allocSet) filterByRescheduleable(isBatch, isDisconnecting bool, now time.Time, evalID string, deployment *structs.Deployment) (untainted, rescheduleNow allocSet, rescheduleLater []*delayedRescheduleInfo) {
untainted = make(map[string]*structs.Allocation)
rescheduleNow = make(map[string]*structs.Allocation)
func (a allocSet) filterByRescheduleable(isBatch, isDisconnecting bool, now time.Time, evalID string, deployment *structs.Deployment) (allocSet, allocSet, []*delayedRescheduleInfo) {
untainted := make(map[string]*structs.Allocation)
rescheduleNow := make(map[string]*structs.Allocation)
rescheduleLater := []*delayedRescheduleInfo{}
// When filtering disconnected sets, the untainted set is never populated.
// It has no purpose in that context.
for _, alloc := range a {
// Ignore disconnecting allocs that are already unknown. This can happen
// in the case of canaries that are interrupted by a disconnect.
@ -393,25 +402,27 @@ func (a allocSet) filterByRescheduleable(isBatch, isDisconnecting bool, now time
if isUntainted && !isDisconnecting {
untainted[alloc.ID] = alloc
}
if isUntainted || ignore {
if ignore {
continue
}
// Only failed allocs with desired state run get to this point
// If the failed alloc is not eligible for rescheduling now we
// add it to the untainted set. Disconnecting delay evals are
// handled by allocReconciler.createTimeoutLaterEvals
eligibleNow, eligibleLater, rescheduleTime = updateByReschedulable(alloc, now, evalID, deployment, isDisconnecting)
if !isDisconnecting && !eligibleNow {
if eligibleNow {
rescheduleNow[alloc.ID] = alloc
continue
}
// If the failed alloc is not eligible for rescheduling now we
// add it to the untainted set.
untainted[alloc.ID] = alloc
if eligibleLater {
rescheduleLater = append(rescheduleLater, &delayedRescheduleInfo{alloc.ID, alloc, rescheduleTime})
}
} else {
rescheduleNow[alloc.ID] = alloc
}
}
return
return untainted, rescheduleNow, rescheduleLater
}
// shouldFilter returns whether the alloc should be ignored or considered untainted.
@ -436,32 +447,31 @@ func shouldFilter(alloc *structs.Allocation, isBatch bool) (untainted, ignore bo
if alloc.RanSuccessfully() {
return true, false
}
return false, true
case structs.AllocDesiredStatusEvict:
return false, true
default:
}
switch alloc.ClientStatus {
case structs.AllocClientStatusFailed:
default:
return true, false
}
return false, false
}
return true, false
}
// Handle service jobs
switch alloc.DesiredStatus {
case structs.AllocDesiredStatusStop, structs.AllocDesiredStatusEvict:
return false, true
default:
}
switch alloc.ClientStatus {
case structs.AllocClientStatusComplete, structs.AllocClientStatusLost:
return false, true
default:
}
return false, false
}
@ -481,9 +491,15 @@ func updateByReschedulable(alloc *structs.Allocation, now time.Time, evalID stri
// Reschedule if the eval ID matches the alloc's followup evalID or if its close to its reschedule time
var eligible bool
if isDisconnecting {
rescheduleTime, eligible = alloc.NextRescheduleTimeByFailTime(now)
} else {
switch {
case isDisconnecting:
rescheduleTime, eligible = alloc.NextRescheduleTimeByTime(now)
case alloc.ClientStatus == structs.AllocClientStatusUnknown && alloc.FollowupEvalID == evalID:
lastDisconnectTime := alloc.LastUnknown()
rescheduleTime, eligible = alloc.NextRescheduleTimeByTime(lastDisconnectTime)
default:
rescheduleTime, eligible = alloc.NextRescheduleTime()
}
@ -491,9 +507,11 @@ func updateByReschedulable(alloc *structs.Allocation, now time.Time, evalID stri
rescheduleNow = true
return
}
if eligible && alloc.FollowupEvalID == "" {
if eligible && (alloc.FollowupEvalID == "" || isDisconnecting) {
rescheduleLater = true
}
return
}
@ -547,12 +565,13 @@ func (a allocSet) delayByStopAfterClientDisconnect() (later []*delayedReschedule
// delayByMaxClientDisconnect returns a delay for any unknown allocation
// that's got a max_client_reconnect configured
func (a allocSet) delayByMaxClientDisconnect(now time.Time) (later []*delayedRescheduleInfo, err error) {
func (a allocSet) delayByMaxClientDisconnect(now time.Time) ([]*delayedRescheduleInfo, error) {
var later []*delayedRescheduleInfo
for _, alloc := range a {
timeout := alloc.DisconnectTimeout(now)
if !timeout.After(now) {
continue
return nil, errors.New("unable to computing disconnecting timeouts")
}
later = append(later, &delayedRescheduleInfo{
@ -562,7 +581,19 @@ func (a allocSet) delayByMaxClientDisconnect(now time.Time) (later []*delayedRes
})
}
return
return later, nil
}
// filterOutByClientStatus returns all allocs from the set without the specified client status.
func (a allocSet) filterOutByClientStatus(clientStatus string) allocSet {
allocs := make(allocSet)
for _, alloc := range a {
if alloc.ClientStatus != clientStatus {
allocs[alloc.ID] = alloc
}
}
return allocs
}
// filterByClientStatus returns allocs from the set with the specified client status.

View File

@ -237,7 +237,6 @@ func TestAllocSet_filterByTainted(t *testing.T) {
},
},
},
{
name: "disco-client-disconnect-unset-max-disconnect",
supportsDisconnectedClients: true,
@ -273,7 +272,6 @@ func TestAllocSet_filterByTainted(t *testing.T) {
},
},
},
// Everything below this line tests the disconnected client mode.
{
name: "disco-client-untainted-reconnect-failed-and-replaced",
@ -381,10 +379,10 @@ func TestAllocSet_filterByTainted(t *testing.T) {
taintedNodes: nodes,
skipNilNodeTest: false,
all: allocSet{
// Allocs on reconnected nodes that are complete are untainted
"untainted-reconnect-complete": {
ID: "untainted-reconnect-complete",
Name: "untainted-reconnect-complete",
// Allocs on reconnected nodes that are complete are ignored
"ignored-reconnect-complete": {
ID: "ignored-reconnect-complete",
Name: "ignored-reconnect-complete",
ClientStatus: structs.AllocClientStatusComplete,
DesiredStatus: structs.AllocDesiredStatusRun,
Job: testJob,
@ -405,9 +403,9 @@ func TestAllocSet_filterByTainted(t *testing.T) {
AllocStates: unknownAllocState,
},
// Lost allocs on reconnected nodes don't get restarted
"untainted-reconnect-lost": {
ID: "untainted-reconnect-lost",
Name: "untainted-reconnect-lost",
"ignored-reconnect-lost": {
ID: "ignored-reconnect-lost",
Name: "ignored-reconnect-lost",
ClientStatus: structs.AllocClientStatusLost,
DesiredStatus: structs.AllocDesiredStatusStop,
Job: testJob,
@ -415,10 +413,10 @@ func TestAllocSet_filterByTainted(t *testing.T) {
TaskGroup: "web",
AllocStates: unknownAllocState,
},
// Replacement allocs that are complete are untainted
"untainted-reconnect-complete-replacement": {
ID: "untainted-reconnect-complete-replacement",
Name: "untainted-reconnect-complete",
// Replacement allocs that are complete are ignored
"ignored-reconnect-complete-replacement": {
ID: "ignored-reconnect-complete-replacement",
Name: "ignored-reconnect-complete",
ClientStatus: structs.AllocClientStatusComplete,
DesiredStatus: structs.AllocDesiredStatusRun,
Job: testJob,
@ -427,10 +425,10 @@ func TestAllocSet_filterByTainted(t *testing.T) {
AllocStates: unknownAllocState,
PreviousAllocation: "untainted-reconnect-complete",
},
// Replacement allocs on reconnected nodes that are failed are untainted
"untainted-reconnect-failed-replacement": {
ID: "untainted-reconnect-failed-replacement",
Name: "untainted-reconnect-failed",
// Replacement allocs on reconnected nodes that are failed are ignored
"ignored-reconnect-failed-replacement": {
ID: "ignored-reconnect-failed-replacement",
Name: "ignored-reconnect-failed",
ClientStatus: structs.AllocClientStatusFailed,
DesiredStatus: structs.AllocDesiredStatusStop,
Job: testJob,
@ -439,63 +437,9 @@ func TestAllocSet_filterByTainted(t *testing.T) {
PreviousAllocation: "reconnecting-failed",
},
// Lost replacement allocs on reconnected nodes don't get restarted
"untainted-reconnect-lost-replacement": {
ID: "untainted-reconnect-lost-replacement",
Name: "untainted-reconnect-lost",
ClientStatus: structs.AllocClientStatusLost,
DesiredStatus: structs.AllocDesiredStatusStop,
Job: testJob,
NodeID: "normal",
TaskGroup: "web",
AllocStates: unknownAllocState,
PreviousAllocation: "untainted-reconnect-lost",
},
},
untainted: allocSet{
"untainted-reconnect-complete": {
ID: "untainted-reconnect-complete",
Name: "untainted-reconnect-complete",
ClientStatus: structs.AllocClientStatusComplete,
DesiredStatus: structs.AllocDesiredStatusRun,
Job: testJob,
NodeID: "normal",
TaskGroup: "web",
AllocStates: unknownAllocState,
},
"untainted-reconnect-lost": {
ID: "untainted-reconnect-lost",
Name: "untainted-reconnect-lost",
ClientStatus: structs.AllocClientStatusLost,
DesiredStatus: structs.AllocDesiredStatusStop,
Job: testJob,
NodeID: "normal",
TaskGroup: "web",
AllocStates: unknownAllocState,
},
"untainted-reconnect-complete-replacement": {
ID: "untainted-reconnect-complete-replacement",
Name: "untainted-reconnect-complete",
ClientStatus: structs.AllocClientStatusComplete,
DesiredStatus: structs.AllocDesiredStatusRun,
Job: testJob,
NodeID: "normal",
TaskGroup: "web",
AllocStates: unknownAllocState,
PreviousAllocation: "untainted-reconnect-complete",
},
"untainted-reconnect-failed-replacement": {
ID: "untainted-reconnect-failed-replacement",
Name: "untainted-reconnect-failed",
ClientStatus: structs.AllocClientStatusFailed,
DesiredStatus: structs.AllocDesiredStatusStop,
Job: testJob,
NodeID: "normal",
TaskGroup: "web",
PreviousAllocation: "reconnecting-failed",
},
"untainted-reconnect-lost-replacement": {
ID: "untainted-reconnect-lost-replacement",
Name: "untainted-reconnect-lost",
"ignored-reconnect-lost-replacement": {
ID: "ignored-reconnect-lost-replacement",
Name: "ignored-reconnect-lost",
ClientStatus: structs.AllocClientStatusLost,
DesiredStatus: structs.AllocDesiredStatusStop,
Job: testJob,
@ -505,6 +449,7 @@ func TestAllocSet_filterByTainted(t *testing.T) {
PreviousAllocation: "untainted-reconnect-lost",
},
},
untainted: allocSet{},
migrate: allocSet{},
disconnecting: allocSet{},
reconnecting: allocSet{
@ -519,7 +464,61 @@ func TestAllocSet_filterByTainted(t *testing.T) {
AllocStates: unknownAllocState,
},
},
ignore: allocSet{},
ignore: allocSet{
"ignored-reconnect-complete": {
ID: "ignored-reconnect-complete",
Name: "ignored-reconnect-complete",
ClientStatus: structs.AllocClientStatusComplete,
DesiredStatus: structs.AllocDesiredStatusRun,
Job: testJob,
NodeID: "normal",
TaskGroup: "web",
AllocStates: unknownAllocState,
},
"ignored-reconnect-lost": {
ID: "ignored-reconnect-lost",
Name: "ignored-reconnect-lost",
ClientStatus: structs.AllocClientStatusLost,
DesiredStatus: structs.AllocDesiredStatusStop,
Job: testJob,
NodeID: "normal",
TaskGroup: "web",
AllocStates: unknownAllocState,
},
"ignored-reconnect-complete-replacement": {
ID: "ignored-reconnect-complete-replacement",
Name: "ignored-reconnect-complete",
ClientStatus: structs.AllocClientStatusComplete,
DesiredStatus: structs.AllocDesiredStatusRun,
Job: testJob,
NodeID: "normal",
TaskGroup: "web",
AllocStates: unknownAllocState,
PreviousAllocation: "untainted-reconnect-complete",
},
"ignored-reconnect-failed-replacement": {
ID: "ignored-reconnect-failed-replacement",
Name: "ignored-reconnect-failed",
ClientStatus: structs.AllocClientStatusFailed,
DesiredStatus: structs.AllocDesiredStatusStop,
Job: testJob,
NodeID: "normal",
TaskGroup: "web",
PreviousAllocation: "reconnecting-failed",
},
"ignored-reconnect-lost-replacement": {
ID: "ignored-reconnect-lost-replacement",
Name: "ignored-reconnect-lost",
ClientStatus: structs.AllocClientStatusLost,
DesiredStatus: structs.AllocDesiredStatusStop,
Job: testJob,
NodeID: "normal",
TaskGroup: "web",
AllocStates: unknownAllocState,
PreviousAllocation: "untainted-reconnect-lost",
},
},
lost: allocSet{},
},
{
@ -539,10 +538,10 @@ func TestAllocSet_filterByTainted(t *testing.T) {
NodeID: "disconnected",
TaskGroup: "web",
},
// Unknown allocs on disconnected nodes are ignored
"ignore-unknown": {
ID: "ignore-unknown",
Name: "ignore-unknown",
// Unknown allocs on disconnected nodes are acknowledge, so they wont be rescheduled again
"untainted-unknown": {
ID: "untainted-unknown",
Name: "untainted-unknown",
ClientStatus: structs.AllocClientStatusUnknown,
DesiredStatus: structs.AllocDesiredStatusRun,
Job: testJob,
@ -595,7 +594,19 @@ func TestAllocSet_filterByTainted(t *testing.T) {
AllocStates: unknownAllocState,
},
},
untainted: allocSet{},
untainted: allocSet{
// Unknown allocs on disconnected nodes are acknowledge, so they wont be rescheduled again
"untainted-unknown": {
ID: "untainted-unknown",
Name: "untainted-unknown",
ClientStatus: structs.AllocClientStatusUnknown,
DesiredStatus: structs.AllocDesiredStatusRun,
Job: testJob,
NodeID: "disconnected",
TaskGroup: "web",
AllocStates: unknownAllocState,
},
},
migrate: allocSet{},
disconnecting: allocSet{
"disconnect-running": {
@ -610,17 +621,6 @@ func TestAllocSet_filterByTainted(t *testing.T) {
},
reconnecting: allocSet{},
ignore: allocSet{
// Unknown allocs on disconnected nodes are ignored
"ignore-unknown": {
ID: "ignore-unknown",
Name: "ignore-unknown",
ClientStatus: structs.AllocClientStatusUnknown,
DesiredStatus: structs.AllocDesiredStatusRun,
Job: testJob,
NodeID: "disconnected",
TaskGroup: "web",
AllocStates: unknownAllocState,
},
"ignore-reconnected-failed-stopped": {
ID: "ignore-reconnected-failed-stopped",
Name: "ignore-reconnected-failed-stopped",
@ -1167,3 +1167,202 @@ func Test_allocNameIndex_Next(t *testing.T) {
})
}
}
func TestAllocSet_filterByRescheduleable(t *testing.T) {
ci.Parallel(t)
noRescheduleJob := mock.Job()
noRescheduleTG := &structs.TaskGroup{
Name: "noRescheduleTG",
ReschedulePolicy: &structs.ReschedulePolicy{
Attempts: 0,
Unlimited: false,
},
}
noRescheduleJob.TaskGroups[0] = noRescheduleTG
testJob := mock.Job()
rescheduleTG := &structs.TaskGroup{
Name: "rescheduleTG",
ReschedulePolicy: &structs.ReschedulePolicy{
Attempts: 1,
Unlimited: false,
},
}
testJob.TaskGroups[0] = rescheduleTG
now := time.Now()
type testCase struct {
name string
all allocSet
isBatch bool
supportsDisconnectedClients bool
isDisconnecting bool
deployment *structs.Deployment
// expected results
untainted allocSet
resNow allocSet
resLater []*delayedRescheduleInfo
}
testCases := []testCase{
{
name: "batch disconnecting allocation no reschedule",
isDisconnecting: true,
isBatch: true,
all: allocSet{
"untainted1": {
ID: "untainted1",
ClientStatus: structs.AllocClientStatusRunning,
Job: noRescheduleJob,
TaskGroup: "noRescheduleTG",
},
},
untainted: allocSet{
"untainted1": {
ID: "untainted1",
ClientStatus: structs.AllocClientStatusRunning,
Job: noRescheduleJob,
TaskGroup: "noRescheduleTG",
},
},
resNow: allocSet{},
resLater: []*delayedRescheduleInfo{},
},
{
name: "batch ignore unknown disconnecting allocs",
isDisconnecting: true,
isBatch: true,
all: allocSet{
"disconnecting1": {
ID: "disconnection1",
ClientStatus: structs.AllocClientStatusUnknown,
Job: testJob,
},
},
untainted: allocSet{},
resNow: allocSet{},
resLater: []*delayedRescheduleInfo{},
},
{
name: "batch disconnecting allocation reschedule",
isDisconnecting: true,
isBatch: true,
all: allocSet{
"rescheduleNow1": {
ID: "rescheduleNow1",
ClientStatus: structs.AllocClientStatusRunning,
Job: testJob,
TaskGroup: "rescheduleTG",
},
},
untainted: allocSet{},
resNow: allocSet{
"rescheduleNow1": {
ID: "rescheduleNow1",
ClientStatus: structs.AllocClientStatusRunning,
Job: testJob,
TaskGroup: "rescheduleTG",
},
},
resLater: []*delayedRescheduleInfo{},
},
{
name: "service disconnecting allocation no reschedule",
isDisconnecting: true,
isBatch: false,
all: allocSet{
"untainted1": {
ID: "untainted1",
ClientStatus: structs.AllocClientStatusRunning,
Job: noRescheduleJob,
TaskGroup: "noRescheduleTG",
},
},
untainted: allocSet{
"untainted1": {
ID: "untainted1",
ClientStatus: structs.AllocClientStatusRunning,
Job: noRescheduleJob,
TaskGroup: "noRescheduleTG",
},
},
resNow: allocSet{},
resLater: []*delayedRescheduleInfo{},
},
{
name: "service disconnecting allocation reschedule",
isDisconnecting: true,
isBatch: false,
all: allocSet{
"rescheduleNow1": {
ID: "rescheduleNow1",
ClientStatus: structs.AllocClientStatusRunning,
Job: testJob,
TaskGroup: "rescheduleTG",
},
},
untainted: allocSet{},
resNow: allocSet{
"rescheduleNow1": {
ID: "rescheduleNow1",
ClientStatus: structs.AllocClientStatusRunning,
Job: testJob,
TaskGroup: "rescheduleTG",
},
},
resLater: []*delayedRescheduleInfo{},
},
{
name: "service ignore unknown disconnecting allocs",
isDisconnecting: true,
isBatch: false,
all: allocSet{
"disconnecting1": {
ID: "disconnection1",
ClientStatus: structs.AllocClientStatusUnknown,
Job: testJob,
},
},
untainted: allocSet{},
resNow: allocSet{},
resLater: []*delayedRescheduleInfo{},
},
{
name: "service running allocation no reschedule",
isDisconnecting: false,
isBatch: true,
all: allocSet{
"untainted1": {
ID: "untainted1",
ClientStatus: structs.AllocClientStatusRunning,
Job: noRescheduleJob,
TaskGroup: "noRescheduleTG",
},
},
untainted: allocSet{
"untainted1": {
ID: "untainted1",
ClientStatus: structs.AllocClientStatusRunning,
Job: noRescheduleJob,
TaskGroup: "noRescheduleTG",
},
},
resNow: allocSet{},
resLater: []*delayedRescheduleInfo{},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
untainted, resNow, resLater := tc.all.filterByRescheduleable(tc.isBatch,
tc.isDisconnecting, now, "evailID", tc.deployment)
must.Eq(t, tc.untainted, untainted, must.Sprintf("with-nodes: untainted"))
must.Eq(t, tc.resNow, resNow, must.Sprintf("with-nodes: reschedule-now"))
must.Eq(t, tc.resLater, resLater, must.Sprintf("with-nodes: rescheduleLater"))
})
}
}