disconnected clients: Support operator manual interventions (#12436)

* allocrunner: Remove Shutdown call in Reconnect
* Node.UpdateAlloc: Stop orphaned allocs.
* reconciler: Stop failed reconnects.
* Apply feedback from code review. Handle rebase conflict.
* Apply suggestions from code review

Co-authored-by: Tim Gross <tgross@hashicorp.com>
This commit is contained in:
Derek Strickland 2022-04-06 09:33:32 -04:00 committed by GitHub
parent 7405ebbad1
commit d1d6009e2c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 367 additions and 184 deletions

View File

@ -784,9 +784,8 @@ func (ar *allocRunner) NetworkStatus() *structs.AllocNetworkStatus {
return ar.state.NetworkStatus.Copy()
}
// setIndexes is a helper for forcing a set of server side indexes
// on the alloc runner. This is used during reconnect when the task
// has been marked unknown by the server.
// setIndexes is a helper for forcing alloc state on the alloc runner. This is
// used during reconnect when the task has been marked unknown by the server.
func (ar *allocRunner) setIndexes(update *structs.Allocation) {
ar.allocLock.Lock()
defer ar.allocLock.Unlock()
@ -1253,15 +1252,13 @@ func (ar *allocRunner) Signal(taskName, signal string) error {
// Reconnect logs a reconnect event for each task in the allocation and syncs the current alloc state with the server.
func (ar *allocRunner) Reconnect(update *structs.Allocation) (err error) {
ar.logger.Trace("reconnecting alloc", "alloc_id", update.ID, "alloc_modify_index", update.AllocModifyIndex)
event := structs.NewTaskEvent(structs.TaskClientReconnected)
event.Time = time.Now().UnixNano()
for _, tr := range ar.tasks {
tr.AppendEvent(event)
}
// Update the client alloc with the server client side indexes.
// Update the client alloc with the server side indexes.
ar.setIndexes(update)
// Calculate alloc state to get the final state with the new events.
@ -1274,12 +1271,6 @@ func (ar *allocRunner) Reconnect(update *structs.Allocation) (err error) {
// Build the client allocation
alloc := ar.clientAlloc(states)
// Don't destroy until after we've appended the reconnect event.
if update.DesiredStatus != structs.AllocDesiredStatusRun {
ar.Shutdown()
return
}
// Update the client state store.
err = ar.stateUpdater.PutAllocation(alloc)
if err != nil {

View File

@ -1151,6 +1151,7 @@ func (n *Node) UpdateAlloc(args *structs.AllocUpdateRequest, reply *structs.Gene
var evals []*structs.Evaluation
for _, allocToUpdate := range args.Alloc {
evalTriggerBy := ""
allocToUpdate.ModifyTime = now.UTC().UnixNano()
alloc, _ := n.srv.State().AllocByID(nil, allocToUpdate.ID)
@ -1162,51 +1163,73 @@ func (n *Node) UpdateAlloc(args *structs.AllocUpdateRequest, reply *structs.Gene
continue
}
// if the job has been purged, this will always return error
job, err := n.srv.State().JobByID(nil, alloc.Namespace, alloc.JobID)
var job *structs.Job
var jobType string
var jobPriority int
job, err = n.srv.State().JobByID(nil, alloc.Namespace, alloc.JobID)
if err != nil {
n.logger.Debug("UpdateAlloc unable to find job", "job", alloc.JobID, "error", err)
continue
}
// If the job is nil it means it has been de-registered.
if job == nil {
n.logger.Debug("UpdateAlloc unable to find job", "job", alloc.JobID)
continue
jobType = alloc.Job.Type
jobPriority = alloc.Job.Priority
evalTriggerBy = structs.EvalTriggerJobDeregister
allocToUpdate.DesiredStatus = structs.AllocDesiredStatusStop
n.logger.Debug("UpdateAlloc unable to find job - shutting down alloc", "job", alloc.JobID)
}
taskGroup := job.LookupTaskGroup(alloc.TaskGroup)
if taskGroup == nil {
continue
var taskGroup *structs.TaskGroup
if job != nil {
jobType = job.Type
jobPriority = job.Priority
taskGroup = job.LookupTaskGroup(alloc.TaskGroup)
}
// If we cannot find the task group for a failed alloc we cannot continue, unless it is an orphan.
if evalTriggerBy != structs.EvalTriggerJobDeregister &&
allocToUpdate.ClientStatus == structs.AllocClientStatusFailed &&
alloc.FollowupEvalID == "" {
if taskGroup == nil {
n.logger.Debug("UpdateAlloc unable to find task group for job", "job", alloc.JobID, "alloc", alloc.ID, "task_group", alloc.TaskGroup)
continue
}
// Set trigger by failed if not an orphan.
if alloc.RescheduleEligible(taskGroup.ReschedulePolicy, now) {
evalTriggerBy = structs.EvalTriggerRetryFailedAlloc
}
}
evalTriggerBy := ""
var eval *structs.Evaluation
// Add an evaluation if this is a failed alloc that is eligible for rescheduling
if allocToUpdate.ClientStatus == structs.AllocClientStatusFailed &&
alloc.FollowupEvalID == "" &&
alloc.RescheduleEligible(taskGroup.ReschedulePolicy, now) {
evalTriggerBy = structs.EvalTriggerRetryFailedAlloc
}
//Add an evaluation if this is a reconnecting allocation.
if alloc.ClientStatus == structs.AllocClientStatusUnknown {
// If unknown, and not an orphan, set the trigger by.
if evalTriggerBy != structs.EvalTriggerJobDeregister &&
alloc.ClientStatus == structs.AllocClientStatusUnknown {
evalTriggerBy = structs.EvalTriggerReconnect
}
if evalTriggerBy != "" {
eval = &structs.Evaluation{
ID: uuid.Generate(),
Namespace: alloc.Namespace,
TriggeredBy: evalTriggerBy,
JobID: alloc.JobID,
Type: job.Type,
Priority: job.Priority,
Status: structs.EvalStatusPending,
CreateTime: now.UTC().UnixNano(),
ModifyTime: now.UTC().UnixNano(),
}
evals = append(evals, eval)
// If we weren't able to determine one of our expected eval triggers,
// continue and don't create an eval.
if evalTriggerBy == "" {
continue
}
eval = &structs.Evaluation{
ID: uuid.Generate(),
Namespace: alloc.Namespace,
TriggeredBy: evalTriggerBy,
JobID: alloc.JobID,
Type: jobType,
Priority: jobPriority,
Status: structs.EvalStatusPending,
CreateTime: now.UTC().UnixNano(),
ModifyTime: now.UTC().UnixNano(),
}
evals = append(evals, eval)
}
// Add this to the batch
@ -1254,6 +1277,7 @@ func (n *Node) UpdateAlloc(args *structs.AllocUpdateRequest, reply *structs.Gene
// batchUpdate is used to update all the allocations
func (n *Node) batchUpdate(future *structs.BatchFuture, updates []*structs.Allocation, evals []*structs.Evaluation) {
var mErr multierror.Error
// Group pending evals by jobID to prevent creating unnecessary evals
evalsByJobId := make(map[structs.NamespacedID]struct{})
var trimmedEvals []*structs.Evaluation
@ -1283,7 +1307,6 @@ func (n *Node) batchUpdate(future *structs.BatchFuture, updates []*structs.Alloc
}
// Commit this update via Raft
var mErr multierror.Error
_, index, err := n.srv.raftApply(structs.AllocClientUpdateRequestType, batch)
if err != nil {
n.logger.Error("alloc update failed", "error", err)

View File

@ -3765,3 +3765,178 @@ func TestClientEndpoint_ShouldCreateNodeEval(t *testing.T) {
})
}
}
func TestClientEndpoint_UpdateAlloc_Evals_ByTrigger(t *testing.T) {
t.Parallel()
type testCase struct {
name string
clientStatus string
serverClientStatus string
triggerBy string
missingJob bool
missingAlloc bool
invalidTaskGroup bool
}
testCases := []testCase{
{
name: "failed-alloc",
clientStatus: structs.AllocClientStatusFailed,
serverClientStatus: structs.AllocClientStatusRunning,
triggerBy: structs.EvalTriggerRetryFailedAlloc,
missingJob: false,
missingAlloc: false,
invalidTaskGroup: false,
},
{
name: "unknown-alloc",
clientStatus: structs.AllocClientStatusRunning,
serverClientStatus: structs.AllocClientStatusUnknown,
triggerBy: structs.EvalTriggerReconnect,
missingJob: false,
missingAlloc: false,
invalidTaskGroup: false,
},
{
name: "orphaned-unknown-alloc",
clientStatus: structs.AllocClientStatusRunning,
serverClientStatus: structs.AllocClientStatusUnknown,
triggerBy: structs.EvalTriggerJobDeregister,
missingJob: true,
missingAlloc: false,
invalidTaskGroup: false,
},
{
name: "running-job",
clientStatus: structs.AllocClientStatusRunning,
serverClientStatus: structs.AllocClientStatusRunning,
triggerBy: "",
missingJob: false,
missingAlloc: false,
invalidTaskGroup: false,
},
{
name: "complete-job",
clientStatus: structs.AllocClientStatusComplete,
serverClientStatus: structs.AllocClientStatusComplete,
triggerBy: "",
missingJob: false,
missingAlloc: false,
invalidTaskGroup: false,
},
{
name: "no-alloc-at-server",
clientStatus: structs.AllocClientStatusUnknown,
serverClientStatus: "",
triggerBy: "",
missingJob: false,
missingAlloc: true,
invalidTaskGroup: false,
},
{
name: "invalid-task-group",
clientStatus: structs.AllocClientStatusUnknown,
serverClientStatus: structs.AllocClientStatusRunning,
triggerBy: "",
missingJob: false,
missingAlloc: false,
invalidTaskGroup: true,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
s1, cleanupS1 := TestServer(t, func(c *Config) {
// Disabling scheduling in this test so that we can
// ensure that the state store doesn't accumulate more evals
// than what we expect the unit test to add
c.NumSchedulers = 0
})
defer cleanupS1()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
// Create the register request
node := mock.Node()
reg := &structs.NodeRegisterRequest{
Node: node,
WriteRequest: structs.WriteRequest{Region: "global"},
}
// Fetch the response
var nodeResp structs.GenericResponse
err := msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &nodeResp)
require.NoError(t, err)
fsmState := s1.fsm.State()
job := mock.Job()
job.ID = tc.name + "-test-job"
if !tc.missingJob {
err = fsmState.UpsertJob(structs.MsgTypeTestSetup, 101, job)
require.NoError(t, err)
}
serverAlloc := mock.Alloc()
serverAlloc.JobID = job.ID
serverAlloc.NodeID = node.ID
serverAlloc.ClientStatus = tc.serverClientStatus
serverAlloc.TaskGroup = job.TaskGroups[0].Name
// Create the incoming client alloc.
clientAlloc := serverAlloc.Copy()
clientAlloc.ClientStatus = tc.clientStatus
err = fsmState.UpsertJobSummary(99, mock.JobSummary(serverAlloc.JobID))
require.NoError(t, err)
if tc.invalidTaskGroup {
serverAlloc.TaskGroup = "invalid"
}
if !tc.missingAlloc {
err = fsmState.UpsertAllocs(structs.MsgTypeTestSetup, 100, []*structs.Allocation{serverAlloc})
require.NoError(t, err)
}
updateReq := &structs.AllocUpdateRequest{
Alloc: []*structs.Allocation{clientAlloc},
WriteRequest: structs.WriteRequest{Region: "global"},
}
var nodeAllocResp structs.NodeAllocsResponse
err = msgpackrpc.CallWithCodec(codec, "Node.UpdateAlloc", updateReq, &nodeAllocResp)
require.NoError(t, err)
require.NotEqual(t, uint64(0), nodeAllocResp.Index)
// If no eval should be created validate, none were and return.
if tc.triggerBy == "" {
evaluations, err := fsmState.EvalsByJob(nil, job.Namespace, job.ID)
require.NoError(t, err)
require.Len(t, evaluations, 0)
return
}
// Lookup the alloc
updatedAlloc, err := fsmState.AllocByID(nil, serverAlloc.ID)
require.NoError(t, err)
require.Equal(t, tc.clientStatus, updatedAlloc.ClientStatus)
// Assert that exactly one eval with test case TriggeredBy exists
evaluations, err := fsmState.EvalsByJob(nil, job.Namespace, job.ID)
require.NoError(t, err)
require.Equal(t, 1, len(evaluations))
foundCount := 0
for _, resultEval := range evaluations {
if resultEval.TriggeredBy == tc.triggerBy && resultEval.WaitUntil.IsZero() {
foundCount++
}
}
require.Equal(t, 1, foundCount, "Should create exactly one eval for trigger by", tc.triggerBy)
})
}
}

View File

@ -440,13 +440,14 @@ func (a *allocReconciler) computeGroup(groupName string, all allocSet) bool {
// Stop any unneeded allocations and update the untainted set to not
// include stopped allocations.
isCanarying := dstate != nil && dstate.DesiredCanaries != 0 && !dstate.Promoted
stop := a.computeStop(tg, nameIndex, untainted, migrate, lost, canaries, reconnecting, isCanarying, lostLaterEvals)
stop, reconnecting := a.computeStop(tg, nameIndex, untainted, migrate, lost, canaries, reconnecting, isCanarying, lostLaterEvals)
desiredChanges.Stop += uint64(len(stop))
untainted = untainted.difference(stop)
// Validate and add reconnecting allocs to the plan so that they will be logged.
a.computeReconnecting(reconnecting)
desiredChanges.Ignore += uint64(len(a.result.reconnectUpdates))
// Do inplace upgrades where possible and capture the set of upgrades that
// need to be done destructively.
ignore, inplace, destructive := a.computeUpdates(tg, untainted)
@ -697,7 +698,7 @@ func (a *allocReconciler) computePlacements(group *structs.TaskGroup,
}
// Add replacements for disconnected and lost allocs up to group.Count
existing := len(untainted) + len(migrate) + len(reschedule) + len(reconnecting)
existing := len(untainted) + len(migrate) + len(reschedule) + len(reconnecting) - len(reconnecting.filterByFailedReconnect())
// Add replacements for lost
for _, alloc := range lost {
@ -912,13 +913,19 @@ func (a *allocReconciler) isDeploymentComplete(groupName string, destructive, in
// the group definition, the set of allocations in various states and whether we
// are canarying.
func (a *allocReconciler) computeStop(group *structs.TaskGroup, nameIndex *allocNameIndex,
untainted, migrate, lost, canaries, reconnecting allocSet, isCanarying bool, followupEvals map[string]string) allocSet {
untainted, migrate, lost, canaries, reconnecting allocSet, isCanarying bool, followupEvals map[string]string) (allocSet, allocSet) {
// Mark all lost allocations for stop.
var stop allocSet
stop = stop.union(lost)
a.markDelayed(lost, structs.AllocClientStatusLost, allocLost, followupEvals)
// Mark all failed reconnects for stop.
failedReconnects := reconnecting.filterByFailedReconnect()
stop = stop.union(failedReconnects)
a.markStop(failedReconnects, structs.AllocClientStatusFailed, allocRescheduled)
reconnecting = reconnecting.difference(failedReconnects)
// If we are still deploying or creating canaries, don't stop them
if isCanarying {
untainted = untainted.difference(canaries)
@ -927,7 +934,7 @@ func (a *allocReconciler) computeStop(group *structs.TaskGroup, nameIndex *alloc
// Hot path the nothing to do case
remove := len(untainted) + len(migrate) + len(reconnecting) - group.Count
if remove <= 0 {
return stop
return stop, reconnecting
}
// Filter out any terminal allocations from the untainted set
@ -949,7 +956,7 @@ func (a *allocReconciler) computeStop(group *structs.TaskGroup, nameIndex *alloc
remove--
if remove == 0 {
return stop
return stop, reconnecting
}
}
}
@ -973,7 +980,7 @@ func (a *allocReconciler) computeStop(group *structs.TaskGroup, nameIndex *alloc
remove--
if remove == 0 {
return stop
return stop, reconnecting
}
}
}
@ -982,7 +989,7 @@ func (a *allocReconciler) computeStop(group *structs.TaskGroup, nameIndex *alloc
if len(reconnecting) != 0 {
remove = a.computeStopByReconnecting(untainted, reconnecting, stop, remove)
if remove == 0 {
return stop
return stop, reconnecting
}
}
@ -999,7 +1006,7 @@ func (a *allocReconciler) computeStop(group *structs.TaskGroup, nameIndex *alloc
remove--
if remove == 0 {
return stop
return stop, reconnecting
}
}
}
@ -1016,11 +1023,11 @@ func (a *allocReconciler) computeStop(group *structs.TaskGroup, nameIndex *alloc
remove--
if remove == 0 {
return stop
return stop, reconnecting
}
}
return stop
return stop, reconnecting
}
// computeStopByReconnecting moves allocations from either the untainted or reconnecting
@ -1176,6 +1183,11 @@ func (a *allocReconciler) computeReconnecting(reconnecting allocSet) {
continue
}
// If the alloc has failed don't reconnect.
if alloc.ClientStatus != structs.AllocClientStatusRunning {
continue
}
a.result.reconnectUpdates[alloc.ID] = alloc
}
}

View File

@ -5282,6 +5282,7 @@ func TestReconciler_Disconnected_Client(t *testing.T) {
jobVersionIncrement uint64
nodeScoreIncrement float64
disconnectedAllocStatus string
serverDesiredStatus string
isBatch bool
nodeStatusDisconnected bool
replace bool
@ -5298,6 +5299,7 @@ func TestReconciler_Disconnected_Client(t *testing.T) {
replace: false,
disconnectedAllocCount: 2,
disconnectedAllocStatus: structs.AllocClientStatusRunning,
serverDesiredStatus: structs.AllocDesiredStatusRun,
shouldStopOnDisconnectedNode: false,
expected: &resultExpectation{
reconnectUpdates: 2,
@ -5314,6 +5316,7 @@ func TestReconciler_Disconnected_Client(t *testing.T) {
replace: true,
disconnectedAllocCount: 1,
disconnectedAllocStatus: structs.AllocClientStatusRunning,
serverDesiredStatus: structs.AllocDesiredStatusRun,
shouldStopOnDisconnectedNode: false,
expected: &resultExpectation{
stop: 1,
@ -5332,6 +5335,7 @@ func TestReconciler_Disconnected_Client(t *testing.T) {
replace: true,
disconnectedAllocCount: 1,
disconnectedAllocStatus: structs.AllocClientStatusRunning,
serverDesiredStatus: structs.AllocDesiredStatusRun,
shouldStopOnDisconnectedNode: true,
nodeScoreIncrement: 1,
expected: &resultExpectation{
@ -5345,15 +5349,18 @@ func TestReconciler_Disconnected_Client(t *testing.T) {
},
},
{
name: "ignore-original-failed-if-replaced",
name: "stop-original-failed-on-reconnect",
allocCount: 4,
replace: true,
disconnectedAllocCount: 2,
disconnectedAllocStatus: structs.AllocClientStatusFailed,
serverDesiredStatus: structs.AllocDesiredStatusRun,
shouldStopOnDisconnectedNode: true,
expected: &resultExpectation{
stop: 2,
desiredTGUpdates: map[string]*structs.DesiredUpdates{
"web": {
Stop: 2,
Ignore: 4,
},
},
@ -5365,6 +5372,7 @@ func TestReconciler_Disconnected_Client(t *testing.T) {
replace: false,
disconnectedAllocCount: 2,
disconnectedAllocStatus: structs.AllocClientStatusFailed,
serverDesiredStatus: structs.AllocDesiredStatusRun,
shouldStopOnDisconnectedNode: true,
expected: &resultExpectation{
stop: 2,
@ -5384,6 +5392,7 @@ func TestReconciler_Disconnected_Client(t *testing.T) {
replace: false,
disconnectedAllocCount: 2,
disconnectedAllocStatus: structs.AllocClientStatusComplete,
serverDesiredStatus: structs.AllocDesiredStatusRun,
isBatch: true,
expected: &resultExpectation{
desiredTGUpdates: map[string]*structs.DesiredUpdates{
@ -5399,6 +5408,7 @@ func TestReconciler_Disconnected_Client(t *testing.T) {
replace: true,
disconnectedAllocCount: 2,
disconnectedAllocStatus: structs.AllocClientStatusRunning,
serverDesiredStatus: structs.AllocDesiredStatusRun,
shouldStopOnDisconnectedNode: true,
jobVersionIncrement: 1,
expected: &resultExpectation{
@ -5417,6 +5427,7 @@ func TestReconciler_Disconnected_Client(t *testing.T) {
replace: true,
disconnectedAllocCount: 2,
disconnectedAllocStatus: structs.AllocClientStatusRunning,
serverDesiredStatus: structs.AllocDesiredStatusRun,
shouldStopOnDisconnectedNode: true,
jobVersionIncrement: 1,
expected: &resultExpectation{
@ -5435,6 +5446,7 @@ func TestReconciler_Disconnected_Client(t *testing.T) {
replace: true,
disconnectedAllocCount: 2,
disconnectedAllocStatus: structs.AllocClientStatusRunning,
serverDesiredStatus: structs.AllocDesiredStatusRun,
failReplacement: true,
shouldStopOnDisconnectedNode: true,
jobVersionIncrement: 1,
@ -5454,6 +5466,7 @@ func TestReconciler_Disconnected_Client(t *testing.T) {
replace: true,
disconnectedAllocCount: 1,
disconnectedAllocStatus: structs.AllocClientStatusPending,
serverDesiredStatus: structs.AllocDesiredStatusRun,
shouldStopOnDisconnectedNode: true,
nodeStatusDisconnected: true,
expected: &resultExpectation{
@ -5472,6 +5485,7 @@ func TestReconciler_Disconnected_Client(t *testing.T) {
replace: true,
disconnectedAllocCount: 2,
disconnectedAllocStatus: structs.AllocClientStatusUnknown,
serverDesiredStatus: structs.AllocDesiredStatusRun,
shouldStopOnDisconnectedNode: true,
nodeStatusDisconnected: true,
maxDisconnect: helper.TimeToPtr(2 * time.Second),
@ -5491,6 +5505,7 @@ func TestReconciler_Disconnected_Client(t *testing.T) {
replace: false,
disconnectedAllocCount: 2,
disconnectedAllocStatus: structs.AllocClientStatusRunning,
serverDesiredStatus: structs.AllocDesiredStatusRun,
nodeStatusDisconnected: true,
expected: &resultExpectation{
place: 2,
@ -5524,6 +5539,8 @@ func TestReconciler_Disconnected_Client(t *testing.T) {
// Set alloc state
disconnectedAllocCount := tc.disconnectedAllocCount
for _, alloc := range allocs {
alloc.DesiredStatus = tc.serverDesiredStatus
if tc.maxDisconnect != nil {
alloc.Job.TaskGroups[0].MaxClientDisconnect = tc.maxDisconnect
}
@ -5600,6 +5617,7 @@ func TestReconciler_Disconnected_Client(t *testing.T) {
}
results := reconciler.Compute()
assertResults(t, results, tc.expected)
for _, stopResult := range results.stop {
if tc.shouldStopOnDisconnectedNode {
@ -5610,91 +5628,6 @@ func TestReconciler_Disconnected_Client(t *testing.T) {
require.Equal(t, job.Version, stopResult.alloc.Job.Version)
}
assertResults(t, results, tc.expected)
})
}
}
// Tests that the future timeout evals that get created when a node disconnects
// stop once the duration passes.
func TestReconciler_Disconnected_Node_FollowUpEvals_Stop_After_Timeout(t *testing.T) {
// TODO: Add table tests and play with the reconciler time/node status to make sure that
// if the expiration time has not passed, it's a no-op.
// Build a set of resumable allocations. Helper will set the timeout to 5 min.
job, allocs := buildResumableAllocations(3, structs.AllocClientStatusRunning, structs.AllocDesiredStatusRun, 2)
// Build a map of disconnected nodes. Only disconnect 2 of the nodes to make it a little
// more discernible that only the affected alloc(s) get marked unknown.
nodes := buildDisconnectedNodes(allocs, 2)
// Invoke the reconciler to queue status changes and get the followup evals.
// Use the allocUpdateFnIngore since alloc.TerminalStatus() will evaluate to
// false and cause the real genericAllocUpdateFn to return ignore=true destructive=false
reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnIgnore, false, job.ID, job,
nil, allocs, nodes, "", 50, true)
reconciler.now = time.Now().UTC()
results := reconciler.Compute()
// Verify that 1 follow up eval was created.
evals := results.desiredFollowupEvals[job.TaskGroups[0].Name]
require.Len(t, evals, 1)
eval := evals[0]
// Set the NodeStatus to Down on the 2 disconnected nodes to simulate that
// the resume duration has passed.
for _, node := range nodes {
node.Status = structs.NodeStatusDown
}
// Replace the allocs that were originally created with the updated copies that
// have the unknown ClientStatus.
for i, alloc := range allocs {
for id, updated := range results.disconnectUpdates {
if alloc.ID == id {
allocs[i] = updated
}
}
}
// Run the followup eval through the reconciler and verify the resumable allocs
// have timed out, will be stopped, and new placements are scheduled.
reconciler = NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnIgnore, false, job.ID, job,
nil, allocs, nodes, eval.ID, eval.Priority, true)
// Allocs were configured to expire in 5 min, so configure the reconciler
// to believe that time has passed.
// NOTE: this probably isn't really necessary because this value is really
// only used for computing future evals, but it seemed like good practice
// in case there are other unconsidered side effects.
reconciler.now = time.Now().UTC().Add(6 * time.Minute)
results = reconciler.Compute()
// Validate that the queued stops have the right client status.
for _, stopResult := range results.stop {
require.Equal(t, structs.AllocClientStatusLost, stopResult.clientStatus)
}
// 2 to place, 2 to stop, 1 to ignore
assertResults(t, results, &resultExpectation{
createDeployment: nil,
deploymentUpdates: nil,
place: 2,
destructive: 0,
stop: 2,
inplace: 0,
disconnectUpdates: 0,
reconnectUpdates: 0,
desiredTGUpdates: map[string]*structs.DesiredUpdates{
job.TaskGroups[0].Name: {
Place: 2,
Stop: 2,
DestructiveUpdate: 0,
Ignore: 1,
InPlaceUpdate: 0,
},
},
})
}

View File

@ -225,8 +225,30 @@ func (a allocSet) filterByTainted(taintedNodes map[string]*structs.Node, support
ignore = make(map[string]*structs.Allocation)
for _, alloc := range a {
// Terminal allocs are always untainted as they should never be migrated.
if alloc.TerminalStatus() {
reconnected := false
expired := false
// Only compute reconnected for unknown, running, and failed since they need to go through the reconnect logic.
if supportsDisconnectedClients &&
(alloc.ClientStatus == structs.AllocClientStatusUnknown ||
alloc.ClientStatus == structs.AllocClientStatusRunning ||
alloc.ClientStatus == structs.AllocClientStatusFailed) {
reconnected, expired = alloc.Reconnected()
}
// Failed reconnected allocs need to be added to reconnecting so that they
// can be handled as a failed reconnect.
if supportsDisconnectedClients &&
reconnected &&
alloc.DesiredStatus == structs.AllocDesiredStatusRun &&
alloc.ClientStatus == structs.AllocClientStatusFailed {
reconnecting[alloc.ID] = alloc
continue
}
// Terminal allocs, if not reconnected, are always untainted as they
// should never be migrated.
if alloc.TerminalStatus() && !reconnected {
untainted[alloc.ID] = alloc
continue
}
@ -243,8 +265,19 @@ func (a allocSet) filterByTainted(taintedNodes map[string]*structs.Node, support
continue
}
// Ignore unknown allocs
if supportsDisconnectedClients && alloc.ClientStatus == structs.AllocClientStatusUnknown {
// Ignore unknown allocs that we want to reconnect eventually.
if supportsDisconnectedClients &&
alloc.ClientStatus == structs.AllocClientStatusUnknown &&
alloc.DesiredStatus == structs.AllocDesiredStatusRun {
ignore[alloc.ID] = alloc
continue
}
// Ignore reconnected failed allocs that have been marked stop by the server.
if supportsDisconnectedClients &&
reconnected &&
alloc.ClientStatus == structs.AllocClientStatusFailed &&
alloc.DesiredStatus == structs.AllocDesiredStatusStop {
ignore[alloc.ID] = alloc
continue
}
@ -252,7 +285,6 @@ func (a allocSet) filterByTainted(taintedNodes map[string]*structs.Node, support
taintedNode, ok := taintedNodes[alloc.NodeID]
if !ok {
// Filter allocs on a node that is now re-connected to be resumed.
reconnected, expired := alloc.Reconnected()
if reconnected {
if expired {
lost[alloc.ID] = alloc
@ -283,7 +315,6 @@ func (a allocSet) filterByTainted(taintedNodes map[string]*structs.Node, support
}
case structs.NodeStatusReady:
// Filter reconnecting allocs with replacements on a node that is now connected.
reconnected, expired := alloc.Reconnected()
if reconnected {
if expired {
lost[alloc.ID] = alloc
@ -461,6 +492,19 @@ func (a allocSet) filterByDeployment(id string) (match, nonmatch allocSet) {
return
}
// filterByFailedReconnect filters allocation into a set that have failed on the
// client but do not have a terminal status at the server so that they can be
// marked as stop at the server.
func (a allocSet) filterByFailedReconnect() allocSet {
failed := make(allocSet)
for _, alloc := range a {
if !alloc.ServerTerminalStatus() && alloc.ClientStatus == structs.AllocClientStatusFailed {
failed[alloc.ID] = alloc
}
}
return failed
}
// delayByStopAfterClientDisconnect returns a delay for any lost allocation that's got a
// stop_after_client_disconnect configured
func (a allocSet) delayByStopAfterClientDisconnect() (later []*delayedRescheduleInfo) {

View File

@ -292,6 +292,10 @@ func TestAllocSet_filterByTainted(t *testing.T) {
TaskGroup: "web",
PreviousAllocation: "failed-original",
},
},
migrate: allocSet{},
disconnecting: allocSet{},
reconnecting: allocSet{
"failed-original": {
ID: "failed-original",
Name: "web",
@ -303,11 +307,8 @@ func TestAllocSet_filterByTainted(t *testing.T) {
TaskStates: reconnectTaskState,
},
},
migrate: allocSet{},
disconnecting: allocSet{},
reconnecting: allocSet{},
ignore: allocSet{},
lost: allocSet{},
ignore: allocSet{},
lost: allocSet{},
},
{
name: "disco-client-reconnecting-running-no-replacement",
@ -366,10 +367,11 @@ func TestAllocSet_filterByTainted(t *testing.T) {
AllocStates: unknownAllocState,
TaskStates: reconnectTaskState,
},
// Failed allocs on reconnected nodes that are complete are untainted
"untainted-reconnect-failed": {
ID: "untainted-reconnect-failed",
Name: "untainted-reconnect-failed",
// Failed allocs on reconnected nodes are in reconnecting so that
// they be marked with desired status stop at the server.
"reconnecting-failed": {
ID: "reconnecting-failed",
Name: "reconnecting-failed",
ClientStatus: structs.AllocClientStatusFailed,
Job: testJob,
NodeID: "normal",
@ -408,7 +410,7 @@ func TestAllocSet_filterByTainted(t *testing.T) {
NodeID: "normal",
TaskGroup: "web",
AllocStates: unknownAllocState,
PreviousAllocation: "untainted-reconnect-failed",
PreviousAllocation: "reconnecting-failed",
},
// Lost replacement allocs on reconnected nodes don't get restarted
"untainted-reconnect-lost-replacement": {
@ -433,16 +435,6 @@ func TestAllocSet_filterByTainted(t *testing.T) {
AllocStates: unknownAllocState,
TaskStates: reconnectTaskState,
},
"untainted-reconnect-failed": {
ID: "untainted-reconnect-failed",
Name: "untainted-reconnect-failed",
ClientStatus: structs.AllocClientStatusFailed,
Job: testJob,
NodeID: "normal",
TaskGroup: "web",
AllocStates: unknownAllocState,
TaskStates: reconnectTaskState,
},
"untainted-reconnect-lost": {
ID: "untainted-reconnect-lost",
Name: "untainted-reconnect-lost",
@ -471,7 +463,7 @@ func TestAllocSet_filterByTainted(t *testing.T) {
NodeID: "normal",
TaskGroup: "web",
AllocStates: unknownAllocState,
PreviousAllocation: "untainted-reconnect-failed",
PreviousAllocation: "reconnecting-failed",
},
"untainted-reconnect-lost-replacement": {
ID: "untainted-reconnect-lost-replacement",
@ -486,9 +478,20 @@ func TestAllocSet_filterByTainted(t *testing.T) {
},
migrate: allocSet{},
disconnecting: allocSet{},
reconnecting: allocSet{},
ignore: allocSet{},
lost: allocSet{},
reconnecting: allocSet{
"reconnecting-failed": {
ID: "reconnecting-failed",
Name: "reconnecting-failed",
ClientStatus: structs.AllocClientStatusFailed,
Job: testJob,
NodeID: "normal",
TaskGroup: "web",
AllocStates: unknownAllocState,
TaskStates: reconnectTaskState,
},
},
ignore: allocSet{},
lost: allocSet{},
},
{
name: "disco-client-disconnect",
@ -508,13 +511,14 @@ func TestAllocSet_filterByTainted(t *testing.T) {
},
// Unknown allocs on disconnected nodes are ignored
"ignore-unknown": {
ID: "ignore-unknown",
Name: "ignore-unknown",
ClientStatus: structs.AllocClientStatusUnknown,
Job: testJob,
NodeID: "disconnected",
TaskGroup: "web",
AllocStates: unknownAllocState,
ID: "ignore-unknown",
Name: "ignore-unknown",
ClientStatus: structs.AllocClientStatusUnknown,
DesiredStatus: structs.AllocDesiredStatusRun,
Job: testJob,
NodeID: "disconnected",
TaskGroup: "web",
AllocStates: unknownAllocState,
},
// Unknown allocs on disconnected nodes are lost when expired
"lost-unknown": {
@ -543,13 +547,14 @@ func TestAllocSet_filterByTainted(t *testing.T) {
ignore: allocSet{
// Unknown allocs on disconnected nodes are ignored
"ignore-unknown": {
ID: "ignore-unknown",
Name: "ignore-unknown",
ClientStatus: structs.AllocClientStatusUnknown,
Job: testJob,
NodeID: "disconnected",
TaskGroup: "web",
AllocStates: unknownAllocState,
ID: "ignore-unknown",
Name: "ignore-unknown",
ClientStatus: structs.AllocClientStatusUnknown,
DesiredStatus: structs.AllocDesiredStatusRun,
Job: testJob,
NodeID: "disconnected",
TaskGroup: "web",
AllocStates: unknownAllocState,
},
},
lost: allocSet{