reconciler: support disconnected clients (#12058)

* Add merge helper for string maps
* structs: add statuses, MaxClientDisconnect, and helper funcs
* taintedNodes: Include disconnected nodes
* upsertAllocsImpl: don't use existing ClientStatus when upserting unknown
* allocSet: update filterByTainted and add delayByMaxClientDisconnect
* allocReconciler: support disconnecting and reconnecting allocs
* GenericScheduler: upsert unknown and queue reconnecting

Co-authored-by: Tim Gross <tgross@hashicorp.com>
This commit is contained in:
Derek Strickland 2022-02-16 13:50:20 -05:00 committed by DerekStrickland
parent a6801f73d1
commit b128769e19
11 changed files with 944 additions and 56 deletions

View File

@ -352,6 +352,29 @@ func CopyMapStringInterface(m map[string]interface{}) map[string]interface{} {
return c
}
// MergeMapStringString will merge two maps into one. If a duplicate key exists
// the value in the second map will replace the value in the first map. If both
// maps are empty or nil this returns an empty map.
func MergeMapStringString(m map[string]string, n map[string]string) map[string]string {
if len(m) == 0 && len(n) == 0 {
return map[string]string{}
}
if len(m) == 0 {
return n
}
if len(n) == 0 {
return m
}
result := CopyMapStringString(m)
for k, v := range n {
result[k] = v
}
return result
}
func CopyMapStringInt(m map[string]int) map[string]int {
l := len(m)
if l == 0 {

View File

@ -207,6 +207,27 @@ func TestCopyMapSliceInterface(t *testing.T) {
require.False(t, reflect.DeepEqual(m, c))
}
func TestMergeMapStringString(t *testing.T) {
type testCase struct {
map1 map[string]string
map2 map[string]string
expected map[string]string
}
cases := []testCase{
{map[string]string{"foo": "bar"}, map[string]string{"baz": "qux"}, map[string]string{"foo": "bar", "baz": "qux"}},
{map[string]string{"foo": "bar"}, nil, map[string]string{"foo": "bar"}},
{nil, map[string]string{"baz": "qux"}, map[string]string{"baz": "qux"}},
{nil, nil, map[string]string{}},
}
for _, c := range cases {
if output := MergeMapStringString(c.map1, c.map2); !CompareMapStringString(output, c.expected) {
t.Errorf("MergeMapStringString(%q, %q) -> %q != %q", c.map1, c.map2, output, c.expected)
}
}
}
func TestCleanEnvVar(t *testing.T) {
type testCase struct {
input string

View File

@ -3528,9 +3528,10 @@ func (s *StateStore) upsertAllocsImpl(index uint64, allocs []*structs.Allocation
// Keep the clients task states
alloc.TaskStates = exist.TaskStates
// If the scheduler is marking this allocation as lost we do not
// If the scheduler is marking this allocation as lost or unknown we do not
// want to reuse the status of the existing allocation.
if alloc.ClientStatus != structs.AllocClientStatusLost {
if alloc.ClientStatus != structs.AllocClientStatusLost &&
alloc.ClientStatus != structs.AllocClientStatusUnknown {
alloc.ClientStatus = exist.ClientStatus
alloc.ClientDescription = exist.ClientDescription
}

View File

@ -1694,16 +1694,17 @@ func (ne *NodeEvent) AddDetail(k, v string) *NodeEvent {
}
const (
NodeStatusInit = "initializing"
NodeStatusReady = "ready"
NodeStatusDown = "down"
NodeStatusInit = "initializing"
NodeStatusReady = "ready"
NodeStatusDown = "down"
NodeStatusDisconnected = "disconnected"
)
// ShouldDrainNode checks if a given node status should trigger an
// evaluation. Some states don't require any further action.
func ShouldDrainNode(status string) bool {
switch status {
case NodeStatusInit, NodeStatusReady:
case NodeStatusInit, NodeStatusReady, NodeStatusDisconnected:
return false
case NodeStatusDown:
return true
@ -1715,7 +1716,7 @@ func ShouldDrainNode(status string) bool {
// ValidNodeStatus is used to check if a node status is valid
func ValidNodeStatus(status string) bool {
switch status {
case NodeStatusInit, NodeStatusReady, NodeStatusDown:
case NodeStatusInit, NodeStatusReady, NodeStatusDown, NodeStatusDisconnected:
return true
default:
return false
@ -6165,6 +6166,10 @@ type TaskGroup struct {
// StopAfterClientDisconnect, if set, configures the client to stop the task group
// after this duration since the last known good heartbeat
StopAfterClientDisconnect *time.Duration
// MaxClientDisconnect, if set, configures the client to allow placed
// allocations for tasks in this group to attempt to resume running without a restart.
MaxClientDisconnect *time.Duration
}
func (tg *TaskGroup) Copy() *TaskGroup {
@ -9424,6 +9429,7 @@ const (
AllocClientStatusComplete = "complete"
AllocClientStatusFailed = "failed"
AllocClientStatusLost = "lost"
AllocClientStatusUnknown = "unknown"
)
// Allocation is used to allocate the placement of a task group to a node.
@ -9874,6 +9880,26 @@ func (a *Allocation) WaitClientStop() time.Time {
return t.Add(*tg.StopAfterClientDisconnect + kill)
}
// DisconnectTimeout uses the MaxClientDisconnect to compute when the allocation
// should transition to lost.
func (a *Allocation) DisconnectTimeout(now time.Time) time.Time {
if a == nil || a.Job == nil {
return now
}
tg := a.Job.LookupTaskGroup(a.TaskGroup)
// Prefer the duration from the task group.
timeout := tg.MaxClientDisconnect
// If not configured, return now
if timeout == nil {
return now
}
return now.Add(*timeout)
}
// NextDelay returns a duration after which the allocation can be rescheduled.
// It is calculated according to the delay function and previous reschedule attempts.
func (a *Allocation) NextDelay() time.Duration {
@ -10374,6 +10400,15 @@ func (a *AllocMetric) PopulateScoreMetaData() {
}
}
// MaxNormScore returns the ScoreMetaData entry with the highest normalized
// score.
func (a *AllocMetric) MaxNormScore() *NodeScoreMeta {
if a == nil || len(a.ScoreMetaData) == 0 {
return nil
}
return a.ScoreMetaData[0]
}
// NodeScoreMeta captures scoring meta data derived from
// different scoring factors.
type NodeScoreMeta struct {
@ -10502,21 +10537,22 @@ const (
)
const (
EvalTriggerJobRegister = "job-register"
EvalTriggerJobDeregister = "job-deregister"
EvalTriggerPeriodicJob = "periodic-job"
EvalTriggerNodeDrain = "node-drain"
EvalTriggerNodeUpdate = "node-update"
EvalTriggerAllocStop = "alloc-stop"
EvalTriggerScheduled = "scheduled"
EvalTriggerRollingUpdate = "rolling-update"
EvalTriggerDeploymentWatcher = "deployment-watcher"
EvalTriggerFailedFollowUp = "failed-follow-up"
EvalTriggerMaxPlans = "max-plan-attempts"
EvalTriggerRetryFailedAlloc = "alloc-failure"
EvalTriggerQueuedAllocs = "queued-allocs"
EvalTriggerPreemption = "preemption"
EvalTriggerScaling = "job-scaling"
EvalTriggerJobRegister = "job-register"
EvalTriggerJobDeregister = "job-deregister"
EvalTriggerPeriodicJob = "periodic-job"
EvalTriggerNodeDrain = "node-drain"
EvalTriggerNodeUpdate = "node-update"
EvalTriggerAllocStop = "alloc-stop"
EvalTriggerScheduled = "scheduled"
EvalTriggerRollingUpdate = "rolling-update"
EvalTriggerDeploymentWatcher = "deployment-watcher"
EvalTriggerFailedFollowUp = "failed-follow-up"
EvalTriggerMaxPlans = "max-plan-attempts"
EvalTriggerRetryFailedAlloc = "alloc-failure"
EvalTriggerQueuedAllocs = "queued-allocs"
EvalTriggerPreemption = "preemption"
EvalTriggerScaling = "job-scaling"
EvalTriggerMaxDisconnectTimeout = "max-disconnect-timeout"
)
const (
@ -10618,7 +10654,8 @@ type Evaluation struct {
Wait time.Duration
// WaitUntil is the time when this eval should be run. This is used to
// supported delayed rescheduling of failed allocations
// supported delayed rescheduling of failed allocations, and delayed
// stopping of allocations that are configured with resume_after_client_reconnect.
WaitUntil time.Time
// NextEval is the evaluation ID for the eval created to do a followup.
@ -11133,6 +11170,17 @@ func (p *Plan) AppendPreemptedAlloc(alloc *Allocation, preemptingAllocID string)
p.NodePreemptions[node] = append(existing, newAlloc)
}
// AppendUnknownAlloc marks an allocation as unknown.
func (p *Plan) AppendUnknownAlloc(alloc *Allocation) {
// Strip the job as it's set once on the ApplyPlanResultRequest.
alloc.Job = nil
// Strip the resources as they can be rebuilt.
alloc.Resources = nil
existing := p.NodeAllocation[alloc.NodeID]
p.NodeAllocation[alloc.NodeID] = append(existing, alloc)
}
func (p *Plan) PopUpdate(alloc *Allocation) {
existing := p.NodeUpdate[alloc.NodeID]
n := len(existing)

View File

@ -33,6 +33,9 @@ const (
// allocLost is the status used when an allocation is lost
allocLost = "alloc is lost since its node is down"
// allocUnknown is the status used when an allocation is unknown
allocUnknown = "alloc is unknown since its node is disconnected"
// allocInPlace is the status used when speculating on an in-place update
allocInPlace = "alloc updating in-place"
@ -55,6 +58,11 @@ const (
// up evals for delayed rescheduling
reschedulingFollowupEvalDesc = "created for delayed rescheduling"
// disconnectTimeoutFollowupEvalDesc is the description used when creating follow
// up evals for allocations that be should be stopped after its disconnect
// timeout has passed.
disconnectTimeoutFollowupEvalDesc = "created for delayed disconnect timeout"
// maxPastRescheduleEvents is the maximum number of past reschedule event
// that we track when unlimited rescheduling is enabled
maxPastRescheduleEvents = 5
@ -148,7 +156,7 @@ func (s *GenericScheduler) Process(eval *structs.Evaluation) (err error) {
structs.EvalTriggerPeriodicJob, structs.EvalTriggerMaxPlans,
structs.EvalTriggerDeploymentWatcher, structs.EvalTriggerRetryFailedAlloc,
structs.EvalTriggerFailedFollowUp, structs.EvalTriggerPreemption,
structs.EvalTriggerScaling:
structs.EvalTriggerScaling, structs.EvalTriggerMaxDisconnectTimeout:
default:
desc := fmt.Sprintf("scheduler cannot handle '%s' evaluation reason",
eval.TriggeredBy)
@ -392,6 +400,11 @@ func (s *GenericScheduler) computeJobAllocs() error {
s.plan.AppendStoppedAlloc(stop.alloc, stop.statusDescription, stop.clientStatus, stop.followupEvalID)
}
// Handle disconnect updates
for _, update := range results.disconnectUpdates {
s.plan.AppendUnknownAlloc(update)
}
// Handle the in-place updates
for _, update := range results.inplaceUpdate {
if update.DeploymentID != s.deployment.GetID() {
@ -406,6 +419,11 @@ func (s *GenericScheduler) computeJobAllocs() error {
s.ctx.Plan().AppendAlloc(update, nil)
}
// Handle reconnect updates
for _, update := range results.reconnectUpdates {
s.ctx.Plan().AppendAlloc(update, nil)
}
// Nothing remaining to do if placement is not required
if len(results.place)+len(results.destructiveUpdate) == 0 {
// If the job has been purged we don't have access to the job. Otherwise

View File

@ -6514,3 +6514,115 @@ func TestPropagateTaskState(t *testing.T) {
})
}
}
// Tests that a client disconnect generates attribute updates and follow up evals.
func TestServiceSched_Client_Disconnect_Creates_Updates_and_Evals(t *testing.T) {
h := NewHarness(t)
count := 1
maxClientDisconnect := 10 * time.Minute
disconnectedNode, job, unknownAllocs := initNodeAndAllocs(t, h, count, maxClientDisconnect,
structs.NodeStatusReady, structs.AllocClientStatusRunning)
// Now disconnect the node
disconnectedNode.Status = structs.NodeStatusDisconnected
require.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), disconnectedNode))
// Create an evaluation triggered by the disconnect
evals := []*structs.Evaluation{{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Priority: 50,
TriggeredBy: structs.EvalTriggerNodeUpdate,
JobID: job.ID,
NodeID: disconnectedNode.ID,
Status: structs.EvalStatusPending,
}}
nodeStatusUpdateEval := evals[0]
require.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), evals))
// Process the evaluation
err := h.Process(NewServiceScheduler, nodeStatusUpdateEval)
require.NoError(t, err)
require.Equal(t, structs.EvalStatusComplete, h.Evals[0].Status)
require.Len(t, h.Plans, 1, "plan")
// One followup delayed eval created
require.Len(t, h.CreateEvals, 1)
followUpEval := h.CreateEvals[0]
require.Equal(t, nodeStatusUpdateEval.ID, followUpEval.PreviousEval)
require.Equal(t, "pending", followUpEval.Status)
require.NotEmpty(t, followUpEval.WaitUntil)
// Insert eval in the state store
testutil.WaitForResult(func() (bool, error) {
found, err := h.State.EvalByID(nil, followUpEval.ID)
if err != nil {
return false, err
}
if found == nil {
return false, nil
}
require.Equal(t, nodeStatusUpdateEval.ID, found.PreviousEval)
require.Equal(t, "pending", found.Status)
require.NotEmpty(t, found.WaitUntil)
return true, nil
}, func(err error) {
require.NoError(t, err)
})
// Validate that the ClientStatus updates are part of the plan.
require.Len(t, h.Plans[0].NodeAllocation[disconnectedNode.ID], count)
// Pending update should have unknown status.
for _, nodeAlloc := range h.Plans[0].NodeAllocation[disconnectedNode.ID] {
require.Equal(t, nodeAlloc.ClientStatus, structs.AllocClientStatusUnknown)
}
// Simulate that NodeAllocation got processed.
err = h.State.UpsertAllocs(structs.MsgTypeTestSetup, h.NextIndex(), h.Plans[0].NodeAllocation[disconnectedNode.ID])
require.NoError(t, err, "plan.NodeUpdate")
// Validate that the StateStore Upsert applied the ClientStatus we specified.
for _, alloc := range unknownAllocs {
alloc, err = h.State.AllocByID(nil, alloc.ID)
require.NoError(t, err)
require.Equal(t, alloc.ClientStatus, structs.AllocClientStatusUnknown)
// Allocations have been transitioned to unknown
require.Equal(t, structs.AllocDesiredStatusRun, alloc.DesiredStatus)
require.Equal(t, structs.AllocClientStatusUnknown, alloc.ClientStatus)
}
}
func initNodeAndAllocs(t *testing.T, h *Harness, allocCount int,
maxClientDisconnect time.Duration, nodeStatus, clientStatus string) (*structs.Node, *structs.Job, []*structs.Allocation) {
// Node, which is ready
node := mock.Node()
node.Status = nodeStatus
require.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node))
// Job with allocations and max_client_disconnect
job := mock.Job()
job.TaskGroups[0].Count = allocCount
job.TaskGroups[0].MaxClientDisconnect = &maxClientDisconnect
require.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), job))
allocs := make([]*structs.Allocation, allocCount)
for i := 0; i < allocCount; i++ {
// Alloc for the running group
alloc := mock.Alloc()
alloc.Job = job
alloc.JobID = job.ID
alloc.NodeID = node.ID
alloc.Name = fmt.Sprintf("my-job.web[%d]", i)
alloc.DesiredStatus = structs.AllocDesiredStatusRun
alloc.ClientStatus = clientStatus
allocs[i] = alloc
}
require.NoError(t, h.State.UpsertAllocs(structs.MsgTypeTestSetup, h.NextIndex(), allocs))
return node, job, allocs
}

View File

@ -114,6 +114,14 @@ type reconcileResults struct {
// jobspec change.
attributeUpdates map[string]*structs.Allocation
// disconnectUpdates is the set of allocations are on disconnected nodes, but
// have not yet had their ClientStatus set to AllocClientStatusUnknown.
disconnectUpdates map[string]*structs.Allocation
// reconnectUpdates is the set of allocations that have ClientStatus set to
// AllocClientStatusUnknown, but the associated Node has reconnected.
reconnectUpdates map[string]*structs.Allocation
// desiredTGUpdates captures the desired set of changes to make for each
// task group.
desiredTGUpdates map[string]*structs.DesiredUpdates
@ -178,6 +186,8 @@ func NewAllocReconciler(logger log.Logger, allocUpdateFn allocUpdateType, batch
now: time.Now(),
result: &reconcileResults{
attributeUpdates: make(map[string]*structs.Allocation),
disconnectUpdates: make(map[string]*structs.Allocation),
reconnectUpdates: make(map[string]*structs.Allocation),
desiredTGUpdates: make(map[string]*structs.DesiredUpdates),
desiredFollowupEvals: make(map[string][]*structs.Evaluation),
},
@ -326,11 +336,15 @@ func (a *allocReconciler) handleStop(m allocMatrix) {
}
}
// filterAndStopAll stops all allocations in an allocSet. This is useful in when
// stopping an entire job or task group.
func (a *allocReconciler) filterAndStopAll(set allocSet) uint64 {
untainted, migrate, lost := set.filterByTainted(a.taintedNodes)
untainted, migrate, lost, disconnecting, reconnecting := set.filterByTainted(a.taintedNodes)
a.markStop(untainted, "", allocNotNeeded)
a.markStop(migrate, "", allocNotNeeded)
a.markStop(lost, structs.AllocClientStatusLost, allocLost)
a.markStop(disconnecting, "", allocNotNeeded)
a.markStop(reconnecting, "", allocNotNeeded)
return uint64(len(set))
}
@ -387,7 +401,7 @@ func (a *allocReconciler) computeGroup(groupName string, all allocSet) bool {
canaries, all := a.cancelUnneededCanaries(all, desiredChanges)
// Determine what set of allocations are on tainted nodes
untainted, migrate, lost := all.filterByTainted(a.taintedNodes)
untainted, migrate, lost, disconnecting, reconnecting := all.filterByTainted(a.taintedNodes)
// Determine what set of terminal allocations need to be rescheduled
untainted, rescheduleNow, rescheduleLater := untainted.filterByRescheduleable(a.batch, a.now, a.evalID, a.deployment)
@ -396,7 +410,14 @@ func (a *allocReconciler) computeGroup(groupName string, all allocSet) bool {
lostLater := lost.delayByStopAfterClientDisconnect()
lostLaterEvals := a.createLostLaterEvals(lostLater, all, tg.Name)
// Create batched follow up evaluations for allocations that are
// Find delays for any disconnecting allocs that have resume_after_client_reconnect,
// create followup evals, and update the ClientStatus to unknown.
timeoutLaterEvals := a.createTimeoutLaterEvals(disconnecting, tg.Name)
// Merge disconnecting with the stop_after_client_disconnect set into the
// lostLaterEvals so that computeStop can add them to the stop set.
lostLaterEvals = helper.MergeMapStringString(lostLaterEvals, timeoutLaterEvals)
// Create batched follow-up evaluations for allocations that are
// reschedulable later and mark the allocations for in place updating
a.createRescheduleLaterEvals(rescheduleLater, all, tg.Name)
@ -408,10 +429,13 @@ func (a *allocReconciler) computeGroup(groupName string, all allocSet) bool {
// Stop any unneeded allocations and update the untainted set to not
// include stopped allocations.
isCanarying := dstate != nil && dstate.DesiredCanaries != 0 && !dstate.Promoted
stop := a.computeStop(tg, nameIndex, untainted, migrate, lost, canaries, isCanarying, lostLaterEvals)
stop := a.computeStop(tg, nameIndex, untainted, migrate, lost, canaries, reconnecting, isCanarying, lostLaterEvals)
desiredChanges.Stop += uint64(len(stop))
untainted = untainted.difference(stop)
// Validate and add reconnecting allocs to the plan so that they will be synced by the client on next poll.
a.computeReconnecting(reconnecting)
// Do inplace upgrades where possible and capture the set of upgrades that
// need to be done destructively.
ignore, inplace, destructive := a.computeUpdates(tg, untainted)
@ -442,9 +466,10 @@ func (a *allocReconciler) computeGroup(groupName string, all allocSet) bool {
// * If there are any canaries that they have been promoted
// * There is no delayed stop_after_client_disconnect alloc, which delays scheduling for the whole group
// * An alloc was lost
// * There is not a corresponding reconnecting alloc.
var place []allocPlaceResult
if len(lostLater) == 0 {
place = a.computePlacements(tg, nameIndex, untainted, migrate, rescheduleNow, lost, isCanarying)
place = a.computePlacements(tg, nameIndex, untainted, migrate, rescheduleNow, lost, reconnecting, isCanarying)
if !existingDeployment {
dstate.DesiredTotal += len(place)
}
@ -579,7 +604,7 @@ func (a *allocReconciler) cancelUnneededCanaries(all allocSet, desiredChanges *s
}
canaries = all.fromKeys(canaryIDs)
untainted, migrate, lost := canaries.filterByTainted(a.taintedNodes)
untainted, migrate, lost, _, _ := canaries.filterByTainted(a.taintedNodes)
a.markStop(migrate, "", allocMigrating)
a.markStop(lost, structs.AllocClientStatusLost, allocLost)
@ -639,7 +664,7 @@ func (a *allocReconciler) computeUnderProvisionedBy(group *structs.TaskGroup, un
//
// Placements will meet or exceed group count.
func (a *allocReconciler) computePlacements(group *structs.TaskGroup,
nameIndex *allocNameIndex, untainted, migrate, reschedule, lost allocSet,
nameIndex *allocNameIndex, untainted, migrate, reschedule, lost, reconnecting allocSet,
isCanarying bool) []allocPlaceResult {
// Add rescheduled placement results
@ -659,7 +684,7 @@ func (a *allocReconciler) computePlacements(group *structs.TaskGroup,
}
// Add replacements for lost allocs up to group.Count
existing := len(untainted) + len(migrate) + len(reschedule)
existing := len(untainted) + len(migrate) + len(reschedule) + len(reconnecting)
for _, alloc := range lost {
if existing >= group.Count {
@ -859,7 +884,7 @@ func (a *allocReconciler) isDeploymentComplete(groupName string, destructive, in
// the group definition, the set of allocations in various states and whether we
// are canarying.
func (a *allocReconciler) computeStop(group *structs.TaskGroup, nameIndex *allocNameIndex,
untainted, migrate, lost, canaries allocSet, isCanarying bool, followupEvals map[string]string) allocSet {
untainted, migrate, lost, canaries, reconnecting allocSet, isCanarying bool, followupEvals map[string]string) allocSet {
// Mark all lost allocations for stop.
var stop allocSet
@ -872,7 +897,7 @@ func (a *allocReconciler) computeStop(group *structs.TaskGroup, nameIndex *alloc
}
// Hot path the nothing to do case
remove := len(untainted) + len(migrate) - group.Count
remove := len(untainted) + len(migrate) + len(reconnecting) - group.Count
if remove <= 0 {
return stop
}
@ -925,6 +950,14 @@ func (a *allocReconciler) computeStop(group *structs.TaskGroup, nameIndex *alloc
}
}
// Handle allocs that might be able to reconnect.
if len(reconnecting) != 0 {
remove = a.computeStopByReconnecting(untainted, reconnecting, stop, remove)
if remove == 0 {
return stop
}
}
// Select the allocs with the highest count to remove
removeNames := nameIndex.Highest(uint(remove))
for id, alloc := range untainted {
@ -962,6 +995,82 @@ func (a *allocReconciler) computeStop(group *structs.TaskGroup, nameIndex *alloc
return stop
}
// computeStopByReconnecting moves allocations from either the untainted or reconnecting
// sets to the stop set and returns the number of allocations that still need to be removed.
func (a *allocReconciler) computeStopByReconnecting(untainted, reconnecting, stop allocSet, remove int) int {
if remove == 0 {
return remove
}
for _, reconnectingAlloc := range reconnecting {
// if the desired status is not run, or if the user-specified desired
// transition is not run, stop the allocation.
if reconnectingAlloc.DesiredStatus != structs.AllocDesiredStatusRun ||
reconnectingAlloc.DesiredTransition.ShouldMigrate() ||
reconnectingAlloc.DesiredTransition.ShouldReschedule() ||
reconnectingAlloc.DesiredTransition.ShouldForceReschedule() {
stop[reconnectingAlloc.ID] = reconnectingAlloc
a.result.stop = append(a.result.stop, allocStopResult{
alloc: reconnectingAlloc,
statusDescription: allocNotNeeded,
})
delete(reconnecting, reconnectingAlloc.ID)
remove--
// if we've removed all we need to, stop iterating and return.
if remove == 0 {
return remove
}
}
// Compare reconnecting to untainted and decide which to keep.
for _, untaintedAlloc := range untainted {
// If not a match by name go to next
if reconnectingAlloc.Name != untaintedAlloc.Name {
continue
}
// By default, we prefer stopping the replacement alloc unless
// the replacement has a higher metrics score.
stopAlloc := untaintedAlloc
deleteSet := untainted
untaintedMaxScoreMeta := untaintedAlloc.Metrics.MaxNormScore()
reconnectingMaxScoreMeta := reconnectingAlloc.Metrics.MaxNormScore()
if untaintedMaxScoreMeta == nil {
a.logger.Error(fmt.Sprintf("error computing stop: replacement allocation metrics not available for alloc.name %q", untaintedAlloc.Name))
continue
}
if reconnectingMaxScoreMeta == nil {
a.logger.Error(fmt.Sprintf("error computing stop: reconnecting allocation metrics not available for alloc.name %q", reconnectingAlloc.Name))
continue
}
if untaintedMaxScoreMeta.NormScore > reconnectingMaxScoreMeta.NormScore {
stopAlloc = reconnectingAlloc
deleteSet = reconnecting
}
stop[stopAlloc.ID] = stopAlloc
a.result.stop = append(a.result.stop, allocStopResult{
alloc: stopAlloc,
statusDescription: allocNotNeeded,
})
delete(deleteSet, stopAlloc.ID)
remove--
// if we've removed all we need to, stop iterating and return.
if remove == 0 {
return remove
}
}
}
return remove
}
// computeUpdates determines which allocations for the passed group require
// updates. Three groups are returned:
// 1. Those that require no upgrades
@ -1005,7 +1114,33 @@ func (a *allocReconciler) createRescheduleLaterEvals(rescheduleLater []*delayedR
}
}
// createLostLaterEvals creates batched followup evaluations with the WaitUntil field set for
// computeReconnecting copies existing allocations in the unknown state, but
// whose nodes have been identified as ready. The Allocations DesiredStatus is
// set to running, and these allocs are appended to the Plan as non-destructive
// updates. Clients are responsible for reconciling the DesiredState with the
// actual state as the node comes back online.
func (a *allocReconciler) computeReconnecting(reconnecting allocSet) {
if len(reconnecting) == 0 {
return
}
// Create updates that will be appended to the plan.
for _, alloc := range reconnecting {
// If the user has defined a DesiredTransition don't resume the alloc.
if alloc.DesiredTransition.ShouldMigrate() || alloc.DesiredTransition.ShouldReschedule() || alloc.DesiredTransition.ShouldForceReschedule() {
continue
}
// If the scheduler has defined a terminal DesiredStatus don't resume the alloc.
if alloc.DesiredStatus != structs.AllocDesiredStatusRun {
continue
}
a.result.reconnectUpdates[alloc.ID] = alloc.Copy()
}
}
// handleDelayedLost creates batched followup evaluations with the WaitUntil field set for
// lost allocations. followupEvals are appended to a.result as a side effect, we return a
// map of alloc IDs to their followupEval IDs.
func (a *allocReconciler) createLostLaterEvals(rescheduleLater []*delayedRescheduleInfo, all allocSet, tgName string) map[string]string {
@ -1062,12 +1197,101 @@ func (a *allocReconciler) createLostLaterEvals(rescheduleLater []*delayedResched
emitRescheduleInfo(allocReschedInfo.alloc, eval)
}
a.result.desiredFollowupEvals[tgName] = evals
a.appendFollowupEvals(tgName, evals)
return allocIDToFollowupEvalID
}
// emitRescheduleInfo emits metrics about the reschedule decision of an evaluation. If a followup evaluation is
// createTimeoutLaterEvals creates followup evaluations with the
// WaitUntil field set for allocations in an unknown state on disconnected nodes.
// Followup Evals are appended to a.result as a side effect. It returns a map of
// allocIDs to their associated followUpEvalIDs.
func (a *allocReconciler) createTimeoutLaterEvals(disconnecting allocSet, tgName string) map[string]string {
if len(disconnecting) == 0 {
return map[string]string{}
}
timeoutDelays, err := disconnecting.delayByMaxClientDisconnect(a.now)
if err != nil || len(timeoutDelays) != len(disconnecting) {
a.logger.Error(fmt.Sprintf("error computing disconnecting timeouts for task_group.name %q: %s", tgName, err))
return map[string]string{}
}
// Sort by time
sort.Slice(timeoutDelays, func(i, j int) bool {
return timeoutDelays[i].rescheduleTime.Before(timeoutDelays[j].rescheduleTime)
})
var evals []*structs.Evaluation
nextReschedTime := timeoutDelays[0].rescheduleTime
allocIDToFollowupEvalID := make(map[string]string, len(timeoutDelays))
eval := &structs.Evaluation{
ID: uuid.Generate(),
Namespace: a.job.Namespace,
Priority: a.evalPriority,
Type: a.job.Type,
TriggeredBy: structs.EvalTriggerMaxDisconnectTimeout,
JobID: a.job.ID,
JobModifyIndex: a.job.ModifyIndex,
Status: structs.EvalStatusPending,
StatusDescription: disconnectTimeoutFollowupEvalDesc,
WaitUntil: nextReschedTime,
}
evals = append(evals, eval)
// Important to remember that these are sorted. The rescheduleTime can only
// get farther into the future. If this loop detects the next delay is greater
// than the batch window (5s) it creates another batch.
for _, timeoutInfo := range timeoutDelays {
if timeoutInfo.rescheduleTime.Sub(nextReschedTime) < batchedFailedAllocWindowSize {
allocIDToFollowupEvalID[timeoutInfo.allocID] = eval.ID
} else {
// Start a new batch
nextReschedTime = timeoutInfo.rescheduleTime
// Create a new eval for the new batch
eval = &structs.Evaluation{
ID: uuid.Generate(),
Namespace: a.job.Namespace,
Priority: a.evalPriority,
Type: a.job.Type,
TriggeredBy: structs.EvalTriggerMaxDisconnectTimeout,
JobID: a.job.ID,
JobModifyIndex: a.job.ModifyIndex,
Status: structs.EvalStatusPending,
StatusDescription: disconnectTimeoutFollowupEvalDesc,
WaitUntil: timeoutInfo.rescheduleTime,
}
evals = append(evals, eval)
allocIDToFollowupEvalID[timeoutInfo.allocID] = eval.ID
}
// Create updates that will be applied to the allocs to mark the FollowupEvalID
// and the unknown ClientStatus.
updatedAlloc := timeoutInfo.alloc.Copy()
updatedAlloc.ClientStatus = structs.AllocClientStatusUnknown
updatedAlloc.ClientDescription = allocUnknown
updatedAlloc.FollowupEvalID = eval.ID
a.result.disconnectUpdates[updatedAlloc.ID] = updatedAlloc
}
a.appendFollowupEvals(tgName, evals)
return allocIDToFollowupEvalID
}
// appendFollowupEvals appends a set of followup evals for a task group to the
// desiredFollowupEvals map which is later added to the scheduler's followUpEvals set.
func (a *allocReconciler) appendFollowupEvals(tgName string, evals []*structs.Evaluation) {
// Merge with
if existingFollowUpEvals, ok := a.result.desiredFollowupEvals[tgName]; ok {
evals = append(existingFollowUpEvals, evals...)
}
a.result.desiredFollowupEvals[tgName] = evals
}
// emitRescheduleInfo emits metrics about the rescheduling decision of an evaluation. If a followup evaluation is
// provided, the waitUntil time is emitted.
func emitRescheduleInfo(alloc *structs.Allocation, followupEval *structs.Evaluation) {
// Emit short-lived metrics data point. Note, these expire and stop emitting after about a minute.

View File

@ -257,6 +257,8 @@ type resultExpectation struct {
destructive int
inplace int
attributeUpdates int
disconnectUpdates int
reconnectUpdates int
stop int
desiredTGUpdates map[string]*structs.DesiredUpdates
}
@ -283,10 +285,61 @@ func assertResults(t *testing.T, r *reconcileResults, exp *resultExpectation) {
assertion.Len(r.destructiveUpdate, exp.destructive, "Expected Destructive")
assertion.Len(r.inplaceUpdate, exp.inplace, "Expected Inplace Updates")
assertion.Len(r.attributeUpdates, exp.attributeUpdates, "Expected Attribute Updates")
assertion.Len(r.reconnectUpdates, exp.reconnectUpdates, "Expected Reconnect Updates")
assertion.Len(r.disconnectUpdates, exp.disconnectUpdates, "Expected Disconnect Updates")
assertion.Len(r.stop, exp.stop, "Expected Stops")
assertion.EqualValues(exp.desiredTGUpdates, r.desiredTGUpdates, "Expected Desired TG Update Annotations")
}
func buildAllocations(job *structs.Job, count int, clientStatus, desiredStatus string, nodeScore float64) []*structs.Allocation {
allocs := make([]*structs.Allocation, 0)
for i := 0; i < count; i++ {
alloc := mock.Alloc()
alloc.Job = job
alloc.JobID = job.ID
alloc.NodeID = uuid.Generate()
alloc.Name = structs.AllocName(job.ID, job.TaskGroups[0].Name, uint(i))
alloc.ClientStatus = clientStatus
alloc.DesiredStatus = desiredStatus
alloc.Metrics = &structs.AllocMetric{
ScoreMetaData: []*structs.NodeScoreMeta{
{
NodeID: alloc.NodeID,
NormScore: nodeScore,
Scores: map[string]float64{
alloc.NodeID: nodeScore,
},
},
},
}
allocs = append(allocs, alloc)
}
return allocs
}
func buildDisconnectedNodes(allocs []*structs.Allocation, count int) map[string]*structs.Node {
tainted := make(map[string]*structs.Node, count)
for i := 0; i < count; i++ {
n := mock.Node()
n.ID = allocs[i].NodeID
n.Status = structs.NodeStatusDisconnected
tainted[n.ID] = n
}
return tainted
}
func buildResumableAllocations(count int, clientStatus, desiredStatus string, nodeScore float64) (*structs.Job, []*structs.Allocation) {
job := mock.Job()
job.TaskGroups[0].MaxClientDisconnect = helper.TimeToPtr(5 * time.Minute)
job.TaskGroups[0].Count = count
return job, buildAllocations(job, count, clientStatus, desiredStatus, nodeScore)
}
// Tests the reconciler properly handles placements for a job that has no
// existing allocations
func TestReconciler_Place_NoExisting(t *testing.T) {
@ -5169,3 +5222,233 @@ func TestReconciler_RescheduleNot_Batch(t *testing.T) {
},
})
}
// Tests that when a node disconnects running allocations are queued to transition to unknown.
func TestReconciler_Node_Disconnect_Updates_Alloc_To_Unknown(t *testing.T) {
job, allocs := buildResumableAllocations(3, structs.AllocClientStatusRunning, structs.AllocDesiredStatusRun, 2)
// Build a map of disconnected nodes
nodes := buildDisconnectedNodes(allocs, 2)
reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnIgnore, false, job.ID, job,
nil, allocs, nodes, "", 50)
reconciler.now = time.Now().UTC()
results := reconciler.Compute()
// Verify that 1 follow up eval was created with the values we expect.
evals := results.desiredFollowupEvals[job.TaskGroups[0].Name]
require.Len(t, evals, 1)
expectedTime := reconciler.now.Add(5 * time.Minute)
eval := evals[0]
require.NotNil(t, eval.WaitUntil)
require.Equal(t, expectedTime, eval.WaitUntil)
// Validate that the queued disconnectUpdates have the right client status,
// and that they have a valid FollowUpdEvalID.
for _, disconnectUpdate := range results.disconnectUpdates {
require.Equal(t, structs.AllocClientStatusUnknown, disconnectUpdate.ClientStatus)
require.NotEmpty(t, disconnectUpdate.FollowupEvalID)
require.Equal(t, eval.ID, disconnectUpdate.FollowupEvalID)
}
// 2 to place, 2 to update, 1 to ignore
assertResults(t, results, &resultExpectation{
createDeployment: nil,
deploymentUpdates: nil,
place: 2,
stop: 0,
inplace: 0,
disconnectUpdates: 2,
// 2 to place and 1 to ignore
desiredTGUpdates: map[string]*structs.DesiredUpdates{
job.TaskGroups[0].Name: {
Place: 2,
Stop: 0,
Ignore: 1,
InPlaceUpdate: 0,
},
},
})
}
// Tests that when a node reconnects unknown allocations for that node are queued
// to resume on the client, and that any replacement allocations that were scheduled
// are queued to stop.
func TestReconciler_Node_Reconnect_ScaleIn_And_Reconnect_Unknown(t *testing.T) {
// TODO: Table tests
// * Some replacements have a higher nodes score
// * Scores are a tie
// * Canarying
// Create 2 resumable allocs with a node score of 2.
job, allocs := buildResumableAllocations(2, structs.AllocClientStatusUnknown, structs.AllocDesiredStatusRun, 2)
// Adjust the desired count on the job's Task group that got set in the helper.
job.TaskGroups[0].Count = 3
// Create 3 placed allocs with a lower nodeScore here.
scaleInAllocs := buildAllocations(job, 3, structs.AllocClientStatusRunning, structs.AllocDesiredStatusRun, 1)
// 2 should scale in, since we are passing nil in tainted nodes. We pass the
// allocUpdateFnIgnore, because computeUpdates in a real setting should return
// ignore == true for the 1 remaining untainted update after computeStop
reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnIgnore, false, job.ID, job,
nil, append(allocs, scaleInAllocs...), nil, "", 50)
reconciler.now = time.Now().UTC()
results := reconciler.Compute()
// Verify that 0 follow up evals were created.
evals := results.desiredFollowupEvals[job.TaskGroups[0].Name]
require.Len(t, evals, 0)
// Validate that the queued reconnectUpdates have the right client status,
// and that they have no FollowUpdEvalID.
for _, reconnectUpdate := range results.reconnectUpdates {
require.Equal(t, structs.AllocClientStatusUnknown, reconnectUpdate.ClientStatus)
require.Empty(t, reconnectUpdate.FollowupEvalID)
require.Equal(t, structs.AllocDesiredStatusRun, reconnectUpdate.DesiredStatus)
}
// 2 to stop, 2 reconnect updates, 1 to ignore
assertResults(t, results, &resultExpectation{
createDeployment: nil,
deploymentUpdates: nil,
place: 0,
stop: 2,
destructive: 0,
inplace: 0,
disconnectUpdates: 0,
reconnectUpdates: 2,
// TODO: Figure out how this needs to change.
desiredTGUpdates: map[string]*structs.DesiredUpdates{
job.TaskGroups[0].Name: {
Place: 0,
Stop: 2,
DestructiveUpdate: 0,
Ignore: 1,
InPlaceUpdate: 0,
},
},
})
}
// Tests that the future timeout evals that get created when a node disconnects
// stop once the duration passes.
func TestReconciler_Disconnected_Node_FollowUpEvals_Stop_After_Timeout(t *testing.T) {
// TODO: Add table tests and play with the reconciler time/node status to make sure that
// if the expiration time has not passed, it's a no-op.
// Build a set of resumable allocations. Helper will set the timeout to 5 min.
job, allocs := buildResumableAllocations(3, structs.AllocClientStatusRunning, structs.AllocDesiredStatusRun, 2)
// Build a map of disconnected nodes. Only disconnect 2 of the nodes to make it a little
// more discernible that only the affected alloc(s) get marked unknown.
nodes := buildDisconnectedNodes(allocs, 2)
// Invoke the reconciler to queue status changes and get the followup evals.
// Use the allocUpdateFnIngore since alloc.TerminalStatus() will evaluate to
// false and cause the real genericAllocUpdateFn to return ignore=true destructive=false
reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnIgnore, false, job.ID, job,
nil, allocs, nodes, "", 50)
reconciler.now = time.Now().UTC()
results := reconciler.Compute()
// Verify that 1 follow up eval was created.
evals := results.desiredFollowupEvals[job.TaskGroups[0].Name]
require.Len(t, evals, 1)
eval := evals[0]
// Set the NodeStatus to Down on the 2 disconnected nodes to simulate that
// the resume duration has passed.
for _, node := range nodes {
node.Status = structs.NodeStatusDown
}
// Replace the allocs that were originally created with the updated copies that
// have the unknown ClientStatus.
for i, alloc := range allocs {
for id, updated := range results.disconnectUpdates {
if alloc.ID == id {
allocs[i] = updated
}
}
}
// Run the followup eval through the reconciler and verify the resumable allocs
// have timed out, will be stopped, and new placements are scheduled.
reconciler = NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnIgnore, false, job.ID, job,
nil, allocs, nodes, eval.ID, eval.Priority)
// Allocs were configured to expire in 5 min, so configure the reconciler
// to believe that time has passed.
// NOTE: this probably isn't really necessary because this value is really
// only used for computing future evals, but it seemed like good practice
// in case there are other unconsidered side effects.
reconciler.now = time.Now().UTC().Add(6 * time.Minute)
results = reconciler.Compute()
// Validate that the queued stops have the right client status.
for _, stopResult := range results.stop {
require.Equal(t, structs.AllocClientStatusLost, stopResult.clientStatus)
}
// 2 to place, 2 to stop, 1 to ignore
assertResults(t, results, &resultExpectation{
createDeployment: nil,
deploymentUpdates: nil,
place: 2,
destructive: 0,
stop: 2,
inplace: 0,
disconnectUpdates: 0,
reconnectUpdates: 0,
desiredTGUpdates: map[string]*structs.DesiredUpdates{
job.TaskGroups[0].Name: {
Place: 2,
Stop: 2,
DestructiveUpdate: 0,
Ignore: 1,
InPlaceUpdate: 0,
},
},
})
}
func TestReconciler_Compute_Disconnecting(t *testing.T) {
// Build a set of resumable allocations. Helper will set the timeout to 5 min.
job, allocs := buildResumableAllocations(3, structs.AllocClientStatusRunning, structs.AllocDesiredStatusRun, 2)
// Build a map of disconnected nodes. Only disconnect 2 of the nodes to make it a little
// more discernible that only the affected alloc(s) get marked unknown.
nodes := buildDisconnectedNodes(allocs, 2)
reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnIgnore, false, job.ID, job,
nil, allocs, nodes, "", 50)
reconciler.now = time.Now().UTC()
tgName := allocs[0].TaskGroup
matrix := newAllocMatrix(job, allocs)
_, _, _, reconnecting, _ := matrix[tgName].filterByTainted(nodes)
require.NotNil(t, reconnecting)
require.Len(t, reconnecting, 2)
result := reconciler.createTimeoutLaterEvals(reconnecting, tgName)
require.NotNil(t, result)
require.Len(t, reconciler.result.desiredFollowupEvals, 1)
evals := reconciler.result.desiredFollowupEvals[tgName]
for _, eval := range evals {
found := false
for _, evalID := range result {
found = eval.ID == evalID
if found {
break
}
}
require.True(t, found)
}
}

View File

@ -209,14 +209,19 @@ func (a allocSet) fromKeys(keys ...[]string) allocSet {
}
// filterByTainted takes a set of tainted nodes and filters the allocation set
// into three groups:
// into 5 groups:
// 1. Those that exist on untainted nodes
// 2. Those exist on nodes that are draining
// 3. Those that exist on lost nodes
func (a allocSet) filterByTainted(taintedNodes map[string]*structs.Node) (untainted, migrate, lost allocSet) {
// 4. Those that are on nodes that are disconnected, but have not had their ClientState set to unknown
// 5. Those that have had their ClientState set to unknown, but their node has reconnected.
func (a allocSet) filterByTainted(taintedNodes map[string]*structs.Node) (untainted, migrate, lost, disconnecting, reconnecting allocSet) {
untainted = make(map[string]*structs.Allocation)
migrate = make(map[string]*structs.Allocation)
lost = make(map[string]*structs.Allocation)
disconnecting = make(map[string]*structs.Allocation)
reconnecting = make(map[string]*structs.Allocation)
for _, alloc := range a {
// Terminal allocs are always untainted as they should never be migrated
if alloc.TerminalStatus() {
@ -232,11 +237,41 @@ func (a allocSet) filterByTainted(taintedNodes map[string]*structs.Node) (untain
taintedNode, ok := taintedNodes[alloc.NodeID]
if !ok {
// Node is untainted so alloc is untainted
// Filter allocs on a node that is now re-connected to be resumed.
if alloc.ClientStatus == structs.AllocClientStatusUnknown {
reconnecting[alloc.ID] = alloc
continue
}
// Otherwise, Node is untainted so alloc is untainted
untainted[alloc.ID] = alloc
continue
}
if taintedNode != nil {
// Group disconnecting/reconnecting
switch taintedNode.Status {
case structs.NodeStatusDisconnected:
// Filter running allocs on a node that is disconnected to be marked as unknown.
if alloc.ClientStatus == structs.AllocClientStatusRunning {
disconnecting[alloc.ID] = alloc
continue
}
// Filter pending allocs on a node that is disconnected to be marked as lost.
if alloc.ClientStatus == structs.AllocClientStatusPending {
lost[alloc.ID] = alloc
continue
}
case structs.NodeStatusReady:
// Filter unknown allocs on a node that is connected to reconnect.
if alloc.ClientStatus == structs.AllocClientStatusUnknown {
reconnecting[alloc.ID] = alloc
continue
}
default:
}
}
// Allocs on GC'd (nil) or lost nodes are Lost
if taintedNode == nil || taintedNode.TerminalStatus() {
lost[alloc.ID] = alloc
@ -245,7 +280,9 @@ func (a allocSet) filterByTainted(taintedNodes map[string]*structs.Node) (untain
// All other allocs are untainted
untainted[alloc.ID] = alloc
}
return
}
@ -413,6 +450,26 @@ func (a allocSet) delayByStopAfterClientDisconnect() (later []*delayedReschedule
return later
}
// delayByMaxClientDisconnect returns a delay for any unknown allocation
// that's got a max_client_reconnect configured
func (a allocSet) delayByMaxClientDisconnect(now time.Time) (later []*delayedRescheduleInfo, err error) {
for _, alloc := range a {
timeout := alloc.DisconnectTimeout(now)
if !timeout.After(now) {
continue
}
later = append(later, &delayedRescheduleInfo{
allocID: alloc.ID,
alloc: alloc,
rescheduleTime: timeout,
})
}
return
}
// allocNameIndex is used to select allocation names for placement or removal
// given an existing set of placed allocations.
type allocNameIndex struct {

View File

@ -39,8 +39,6 @@ func TestBitmapFrom(t *testing.T) {
func TestAllocSet_filterByTainted(t *testing.T) {
ci.Parallel(t)
require := require.New(t)
nodes := map[string]*structs.Node{
"draining": {
ID: "draining",
@ -55,6 +53,10 @@ func TestAllocSet_filterByTainted(t *testing.T) {
ID: "normal",
Status: structs.NodeStatusReady,
},
"disconnected": {
ID: "disconnected",
Status: structs.NodeStatusDisconnected,
},
}
batchJob := &structs.Job{
@ -119,18 +121,108 @@ func TestAllocSet_filterByTainted(t *testing.T) {
Job: batchJob,
NodeID: "lost",
},
// Non-terminal allocs on disconnected nodes are disconnecting
"disconnecting1": {
ID: "disconnecting1",
ClientStatus: structs.AllocClientStatusRunning,
Job: batchJob,
NodeID: "disconnected",
},
// Non-terminal allocs on disconnected nodes are disconnecting
"disconnecting2": {
ID: "disconnecting2",
ClientStatus: structs.AllocClientStatusRunning,
Job: batchJob,
NodeID: "disconnected",
},
// Non-terminal allocs on disconnected nodes are disconnecting
"disconnecting3": {
ID: "disconnecting3",
ClientStatus: structs.AllocClientStatusRunning,
Job: batchJob,
NodeID: "disconnected",
},
// Complete allocs on disconnected nodes don't get restarted
"disconnecting4": {
ID: "disconnecting4",
ClientStatus: structs.AllocClientStatusComplete,
Job: batchJob,
NodeID: "disconnected",
},
// Failed allocs on disconnected nodes don't get restarted
"disconnecting5": {
ID: "disconnecting5",
ClientStatus: structs.AllocClientStatusFailed,
Job: batchJob,
NodeID: "disconnected",
},
// Lost allocs on disconnected nodes don't get restarted
"disconnecting6": {
ID: "disconnecting6",
ClientStatus: structs.AllocClientStatusLost,
Job: batchJob,
NodeID: "disconnected",
},
// Unknown allocs on re-connected nodes are reconnecting
"reconnecting1": {
ID: "reconnecting1",
ClientStatus: structs.AllocClientStatusUnknown,
Job: batchJob,
NodeID: "normal",
},
// Unknown allocs on re-connected nodes are reconnecting
"reconnecting2": {
ID: "reconnecting2",
ClientStatus: structs.AllocClientStatusUnknown,
Job: batchJob,
NodeID: "normal",
},
// Complete allocs on disconnected nodes don't get restarted
"reconnecting3": {
ID: "reconnecting3",
ClientStatus: structs.AllocClientStatusComplete,
Job: batchJob,
NodeID: "normal",
},
// Failed allocs on disconnected nodes don't get restarted
"reconnecting4": {
ID: "reconnecting4",
ClientStatus: structs.AllocClientStatusFailed,
Job: batchJob,
NodeID: "normal",
},
// Lost allocs on disconnected nodes don't get restarted
"reconnecting5": {
ID: "reconnecting5",
ClientStatus: structs.AllocClientStatusLost,
Job: batchJob,
NodeID: "normal",
},
}
untainted, migrate, lost := allocs.filterByTainted(nodes)
require.Len(untainted, 4)
require.Contains(untainted, "untainted1")
require.Contains(untainted, "untainted2")
require.Contains(untainted, "untainted3")
require.Contains(untainted, "untainted4")
require.Len(migrate, 2)
require.Contains(migrate, "migrating1")
require.Contains(migrate, "migrating2")
require.Len(lost, 2)
require.Contains(lost, "lost1")
require.Contains(lost, "lost2")
untainted, migrate, lost, disconnecting, reconnecting := allocs.filterByTainted(nodes)
require.Len(t, untainted, 10)
require.Contains(t, untainted, "untainted1")
require.Contains(t, untainted, "untainted2")
require.Contains(t, untainted, "untainted3")
require.Contains(t, untainted, "untainted4")
require.Contains(t, untainted, "disconnecting4")
require.Contains(t, untainted, "disconnecting5")
require.Contains(t, untainted, "disconnecting6")
require.Contains(t, untainted, "reconnecting3")
require.Contains(t, untainted, "reconnecting4")
require.Contains(t, untainted, "reconnecting5")
require.Len(t, migrate, 2)
require.Contains(t, migrate, "migrating1")
require.Contains(t, migrate, "migrating2")
require.Len(t, lost, 2)
require.Contains(t, lost, "lost1")
require.Contains(t, lost, "lost2")
require.Len(t, disconnecting, 3)
require.Contains(t, disconnecting, "disconnecting1")
require.Contains(t, disconnecting, "disconnecting2")
require.Contains(t, disconnecting, "disconnecting3")
require.Len(t, reconnecting, 2)
require.Contains(t, reconnecting, "reconnecting1")
require.Contains(t, reconnecting, "reconnecting2")
}

View File

@ -350,8 +350,9 @@ func progressMade(result *structs.PlanResult) bool {
}
// taintedNodes is used to scan the allocations and then check if the
// underlying nodes are tainted, and should force a migration of the allocation.
// All the nodes returned in the map are tainted.
// underlying nodes are tainted, and should force a migration of the allocation,
// or if the underlying nodes are disconnected, and should be used to calculate
// the reconnect timeout of its allocations. All the nodes returned in the map are tainted.
func taintedNodes(state State, allocs []*structs.Allocation) (map[string]*structs.Node, error) {
out := make(map[string]*structs.Node)
for _, alloc := range allocs {
@ -373,7 +374,15 @@ func taintedNodes(state State, allocs []*structs.Allocation) (map[string]*struct
if structs.ShouldDrainNode(node.Status) || node.DrainStrategy != nil {
out[alloc.NodeID] = node
}
// Disconnected nodes are included in the tainted set so that their
// MaxClientDisconnect configuration can be included in the
// timeout calculation.
if node.Status == structs.NodeStatusDisconnected {
out[alloc.NodeID] = node
}
}
return out, nil
}