drain: use client status to determine drain is complete (#14348)

If an allocation is slow to stop because of `kill_timeout` or `shutdown_delay`,
the node drain is marked as complete prematurely, even though drain monitoring
will continue to report allocation migrations. This impacts the UI or API
clients that monitor node draining to shut down nodes.

This changeset updates the behavior to wait until the client status of all
drained allocs are terminal before marking the node as done draining.
This commit is contained in:
Tim Gross 2023-04-13 08:55:28 -04:00 committed by GitHub
parent 79c521e570
commit 5a9abdc469
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 367 additions and 221 deletions

3
.changelog/14348.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:bug
drain: Fixed a bug where drains would complete based on the server status and not the client status of an allocation
```

View File

@ -0,0 +1,43 @@
# Copyright (c) HashiCorp, Inc.
# SPDX-License-Identifier: MPL-2.0
job "drain_killtimeout" {
constraint {
attribute = "${attr.kernel.name}"
value = "linux"
}
group "group" {
task "task" {
driver = "docker"
kill_timeout = "30s" # matches the agent's max_kill_timeout
config {
image = "busybox:1"
command = "/bin/sh"
args = ["local/script.sh"]
}
# this job traps SIGINT so that we can assert that we've forced the drain
# to wait until the client status has been updated
template {
data = <<EOF
#!/bin/sh
trap 'sleep 60' 2
sleep 600
EOF
destination = "local/script.sh"
change_mode = "noop"
}
resources {
cpu = 256
memory = 128
}
}
}
}

View File

@ -30,6 +30,7 @@ func TestNodeDrain(t *testing.T) {
t.Run("IgnoreSystem", testIgnoreSystem) t.Run("IgnoreSystem", testIgnoreSystem)
t.Run("EphemeralMigrate", testEphemeralMigrate) t.Run("EphemeralMigrate", testEphemeralMigrate)
t.Run("KeepIneligible", testKeepIneligible) t.Run("KeepIneligible", testKeepIneligible)
t.Run("KillTimeout", testKillTimeout)
t.Run("DeadlineFlag", testDeadlineFlag) t.Run("DeadlineFlag", testDeadlineFlag)
t.Run("ForceFlag", testForceFlag) t.Run("ForceFlag", testForceFlag)
} }
@ -184,6 +185,62 @@ func testKeepIneligible(t *testing.T) {
} }
} }
// testKillTimeout tests that we block drains until the client status has been
// updated, not the server status.
func testKillTimeout(t *testing.T) {
nomadClient := e2eutil.NomadClient(t)
t.Cleanup(cleanupDrainState(t))
jobID := "test-node-drain-" + uuid.Short()
must.NoError(t, e2eutil.Register(jobID, "./input/drain_killtimeout.nomad"))
allocs := waitForRunningAllocs(t, nomadClient, jobID, 1)
t.Cleanup(cleanupJobState(t, jobID))
oldAllocID := allocs[0].ID
oldNodeID := allocs[0].NodeID
t.Logf("draining node %v", oldNodeID)
out, err := e2eutil.Command(
"nomad", "node", "drain",
"-enable", "-yes", "-detach", oldNodeID)
must.NoError(t, err, must.Sprintf("'nomad node drain %v' failed: %v\n%v", oldNodeID, err, out))
// the job will hang with kill_timeout for up to 30s, so we want to assert
// that we don't complete draining before that window expires. But we also
// can't guarantee we've started this assertion with exactly 30s left on the
// clock, so cut the deadline close without going over to avoid test
// flakiness
t.Log("waiting for kill_timeout to expire")
must.Wait(t, wait.ContinualSuccess(
wait.BoolFunc(func() bool {
node, _, err := nomadClient.Nodes().Info(oldNodeID, nil)
must.NoError(t, err)
return node.DrainStrategy != nil
}),
wait.Timeout(time.Second*25),
wait.Gap(500*time.Millisecond),
))
// the allocation will then get force-killed, so wait for the alloc
// eventually be migrated and for the node's drain to be complete
t.Log("waiting for migration to complete")
newAllocs := waitForAllocDrainComplete(t, nomadClient, jobID,
oldAllocID, oldNodeID, time.Second*60)
must.Len(t, 1, newAllocs, must.Sprint("expected 1 new alloc"))
must.Wait(t, wait.InitialSuccess(
wait.BoolFunc(func() bool {
node, _, err := nomadClient.Nodes().Info(oldNodeID, nil)
must.NoError(t, err)
return node.DrainStrategy == nil
}),
wait.Timeout(time.Second*5),
wait.Gap(500*time.Millisecond),
))
}
// testDeadlineFlag tests the enforcement of the node drain deadline so that // testDeadlineFlag tests the enforcement of the node drain deadline so that
// allocations are moved even if max_parallel says we should be waiting // allocations are moved even if max_parallel says we should be waiting
func testDeadlineFlag(t *testing.T) { func testDeadlineFlag(t *testing.T) {

View File

@ -76,7 +76,7 @@ func (n *drainingNode) IsDone() (bool, error) {
} }
// If there is a non-terminal we aren't done // If there is a non-terminal we aren't done
if !alloc.TerminalStatus() { if !alloc.ClientTerminalStatus() {
return false, nil return false, nil
} }
} }

View File

@ -367,10 +367,10 @@ func handleTaskGroup(snap *state.StateSnapshot, batch bool, tg *structs.TaskGrou
} }
// Check if the alloc should be considered migrated. A migrated // Check if the alloc should be considered migrated. A migrated
// allocation is one that is terminal, is on a draining // allocation is one that is terminal on the client, is on a draining
// allocation, and has only happened since our last handled index to // allocation, and has been updated since our last handled index to
// avoid emitting many duplicate migrate events. // avoid emitting many duplicate migrate events.
if alloc.TerminalStatus() && if alloc.ClientTerminalStatus() &&
onDrainingNode && onDrainingNode &&
alloc.ModifyIndex > lastHandledIndex { alloc.ModifyIndex > lastHandledIndex {
result.migrated = append(result.migrated, alloc) result.migrated = append(result.migrated, alloc)
@ -385,8 +385,8 @@ func handleTaskGroup(snap *state.StateSnapshot, batch bool, tg *structs.TaskGrou
// An alloc can't be considered for migration if: // An alloc can't be considered for migration if:
// - It isn't on a draining node // - It isn't on a draining node
// - It is already terminal // - It is already terminal on the client
if !onDrainingNode || alloc.TerminalStatus() { if !onDrainingNode || alloc.ClientTerminalStatus() {
continue continue
} }

View File

@ -8,6 +8,11 @@ import (
"testing" "testing"
"time" "time"
"github.com/shoenig/test"
"github.com/shoenig/test/must"
"github.com/stretchr/testify/require"
"golang.org/x/time/rate"
"github.com/hashicorp/nomad/ci" "github.com/hashicorp/nomad/ci"
"github.com/hashicorp/nomad/helper/pointer" "github.com/hashicorp/nomad/helper/pointer"
"github.com/hashicorp/nomad/helper/testlog" "github.com/hashicorp/nomad/helper/testlog"
@ -15,9 +20,6 @@ import (
"github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/nomad/structs"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/time/rate"
) )
func testNodes(t *testing.T, state *state.StateStore) (drainingNode, runningNode *structs.Node) { func testNodes(t *testing.T, state *state.StateStore) (drainingNode, runningNode *structs.Node) {
@ -106,12 +108,11 @@ func assertJobWatcherOps(t *testing.T, jw DrainingJobWatcher, drained, migrated
// allocation changes from multiple jobs. // allocation changes from multiple jobs.
func TestDrainingJobWatcher_DrainJobs(t *testing.T) { func TestDrainingJobWatcher_DrainJobs(t *testing.T) {
ci.Parallel(t) ci.Parallel(t)
require := require.New(t)
state := state.TestStateStore(t) store := state.TestStateStore(t)
jobWatcher, cancelWatcher := testDrainingJobWatcher(t, state) jobWatcher, cancelWatcher := testDrainingJobWatcher(t, store)
defer cancelWatcher() defer cancelWatcher()
drainingNode, runningNode := testNodes(t, state) drainingNode, runningNode := testNodes(t, store)
var index uint64 = 101 var index uint64 = 101
count := 8 count := 8
@ -134,7 +135,7 @@ func TestDrainingJobWatcher_DrainJobs(t *testing.T) {
jnss[i] = structs.NamespacedID{Namespace: job.Namespace, ID: job.ID} jnss[i] = structs.NamespacedID{Namespace: job.Namespace, ID: job.ID}
job.TaskGroups[0].Migrate.MaxParallel = 3 job.TaskGroups[0].Migrate.MaxParallel = 3
job.TaskGroups[0].Count = count job.TaskGroups[0].Count = count
require.Nil(state.UpsertJob(structs.MsgTypeTestSetup, index, nil, job)) must.NoError(t, store.UpsertJob(structs.MsgTypeTestSetup, index, nil, job))
index++ index++
var allocs []*structs.Allocation var allocs []*structs.Allocation
@ -146,7 +147,7 @@ func TestDrainingJobWatcher_DrainJobs(t *testing.T) {
allocs = append(allocs, a) allocs = append(allocs, a)
} }
require.Nil(state.UpsertAllocs(structs.MsgTypeTestSetup, index, allocs)) must.NoError(t, store.UpsertAllocs(structs.MsgTypeTestSetup, index, allocs))
index++ index++
} }
@ -168,7 +169,7 @@ func TestDrainingJobWatcher_DrainJobs(t *testing.T) {
// create a copy so we can reuse this slice // create a copy so we can reuse this slice
drainedAllocs[i] = a.Copy() drainedAllocs[i] = a.Copy()
} }
require.Nil(state.UpsertAllocs(structs.MsgTypeTestSetup, index, drainedAllocs)) must.NoError(t, store.UpsertAllocs(structs.MsgTypeTestSetup, index, drainedAllocs))
drains.Resp.Respond(index, nil) drains.Resp.Respond(index, nil)
index++ index++
@ -195,7 +196,21 @@ func TestDrainingJobWatcher_DrainJobs(t *testing.T) {
updates = append(updates, a, replacement) updates = append(updates, a, replacement)
replacements[i] = replacement.Copy() replacements[i] = replacement.Copy()
} }
require.Nil(state.UpsertAllocs(structs.MsgTypeTestSetup, index, updates)) must.NoError(t, store.UpsertAllocs(structs.MsgTypeTestSetup, index, updates))
index++
// The drained allocs stopping cause migrations but no new drains
// because the replacements have not started
assertJobWatcherOps(t, jobWatcher, 0, 0)
// Client sends stop on these allocs
completeAllocs := make([]*structs.Allocation, len(drainedAllocs))
for i, a := range drainedAllocs {
a = a.Copy()
a.ClientStatus = structs.AllocClientStatusComplete
completeAllocs[i] = a
}
must.NoError(t, store.UpdateAllocsFromClient(structs.MsgTypeTestSetup, index, completeAllocs))
index++ index++
// The drained allocs stopping cause migrations but no new drains // The drained allocs stopping cause migrations but no new drains
@ -209,10 +224,10 @@ func TestDrainingJobWatcher_DrainJobs(t *testing.T) {
Healthy: pointer.Of(true), Healthy: pointer.Of(true),
} }
} }
require.Nil(state.UpsertAllocs(structs.MsgTypeTestSetup, index, replacements)) must.NoError(t, store.UpsertAllocs(structs.MsgTypeTestSetup, index, replacements))
index++ index++
require.NotEmpty(jobWatcher.drainingJobs()) must.MapNotEmpty(t, jobWatcher.drainingJobs())
// 6 new drains // 6 new drains
drains, _ = assertJobWatcherOps(t, jobWatcher, 6, 0) drains, _ = assertJobWatcherOps(t, jobWatcher, 6, 0)
@ -225,7 +240,7 @@ func TestDrainingJobWatcher_DrainJobs(t *testing.T) {
// create a copy so we can reuse this slice // create a copy so we can reuse this slice
drainedAllocs[i] = a.Copy() drainedAllocs[i] = a.Copy()
} }
require.Nil(state.UpsertAllocs(structs.MsgTypeTestSetup, index, drainedAllocs)) must.NoError(t, store.UpsertAllocs(structs.MsgTypeTestSetup, index, drainedAllocs))
drains.Resp.Respond(index, nil) drains.Resp.Respond(index, nil)
index++ index++
@ -236,12 +251,13 @@ func TestDrainingJobWatcher_DrainJobs(t *testing.T) {
for i, a := range drainedAllocs { for i, a := range drainedAllocs {
a.DesiredTransition.Migrate = nil a.DesiredTransition.Migrate = nil
a.DesiredStatus = structs.AllocDesiredStatusStop a.DesiredStatus = structs.AllocDesiredStatusStop
a.ClientStatus = structs.AllocClientStatusComplete
replacement := newAlloc(runningNode, a.Job) replacement := newAlloc(runningNode, a.Job)
updates = append(updates, a, replacement) updates = append(updates, a, replacement)
replacements[i] = replacement.Copy() replacements[i] = replacement.Copy()
} }
require.Nil(state.UpsertAllocs(structs.MsgTypeTestSetup, index, updates)) must.NoError(t, store.UpsertAllocs(structs.MsgTypeTestSetup, index, updates))
index++ index++
assertJobWatcherOps(t, jobWatcher, 0, 6) assertJobWatcherOps(t, jobWatcher, 0, 6)
@ -252,10 +268,10 @@ func TestDrainingJobWatcher_DrainJobs(t *testing.T) {
Healthy: pointer.Of(true), Healthy: pointer.Of(true),
} }
} }
require.Nil(state.UpsertAllocs(structs.MsgTypeTestSetup, index, replacements)) must.NoError(t, store.UpsertAllocs(structs.MsgTypeTestSetup, index, replacements))
index++ index++
require.NotEmpty(jobWatcher.drainingJobs()) must.MapNotEmpty(t, jobWatcher.drainingJobs())
// Final 4 new drains // Final 4 new drains
drains, _ = assertJobWatcherOps(t, jobWatcher, 4, 0) drains, _ = assertJobWatcherOps(t, jobWatcher, 4, 0)
@ -268,7 +284,7 @@ func TestDrainingJobWatcher_DrainJobs(t *testing.T) {
// create a copy so we can reuse this slice // create a copy so we can reuse this slice
drainedAllocs[i] = a.Copy() drainedAllocs[i] = a.Copy()
} }
require.Nil(state.UpsertAllocs(structs.MsgTypeTestSetup, index, drainedAllocs)) must.NoError(t, store.UpsertAllocs(structs.MsgTypeTestSetup, index, drainedAllocs))
drains.Resp.Respond(index, nil) drains.Resp.Respond(index, nil)
index++ index++
@ -279,12 +295,13 @@ func TestDrainingJobWatcher_DrainJobs(t *testing.T) {
for i, a := range drainedAllocs { for i, a := range drainedAllocs {
a.DesiredTransition.Migrate = nil a.DesiredTransition.Migrate = nil
a.DesiredStatus = structs.AllocDesiredStatusStop a.DesiredStatus = structs.AllocDesiredStatusStop
a.ClientStatus = structs.AllocClientStatusComplete
replacement := newAlloc(runningNode, a.Job) replacement := newAlloc(runningNode, a.Job)
updates = append(updates, a, replacement) updates = append(updates, a, replacement)
replacements[i] = replacement.Copy() replacements[i] = replacement.Copy()
} }
require.Nil(state.UpsertAllocs(structs.MsgTypeTestSetup, index, updates)) must.NoError(t, store.UpsertAllocs(structs.MsgTypeTestSetup, index, updates))
index++ index++
assertJobWatcherOps(t, jobWatcher, 0, 4) assertJobWatcherOps(t, jobWatcher, 0, 4)
@ -295,70 +312,55 @@ func TestDrainingJobWatcher_DrainJobs(t *testing.T) {
Healthy: pointer.Of(true), Healthy: pointer.Of(true),
} }
} }
require.Nil(state.UpsertAllocs(structs.MsgTypeTestSetup, index, replacements)) must.NoError(t, store.UpsertAllocs(structs.MsgTypeTestSetup, index, replacements))
// No jobs should be left! // No jobs should be left!
require.Empty(jobWatcher.drainingJobs()) must.MapEmpty(t, jobWatcher.drainingJobs())
} }
// DrainingJobWatcher tests: // TestDrainingJobWatcher_HandleTaskGroup tests that the watcher handles
// TODO Test that the watcher cancels its query when a new job is registered // allocation updates as expected.
func TestDrainingJobWatcher_HandleTaskGroup(t *testing.T) {
// handleTaskGroupTestCase is the test case struct for TestHandleTaskGroup
//
// Two nodes will be initialized: one draining and one running.
type handleTaskGroupTestCase struct {
// Name of test
Name string
// Batch uses a batch job and alloc
Batch bool
// Expectations
ExpectedDrained int
ExpectedMigrated int
ExpectedDone bool
// Count overrides the default count of 10 if set
Count int
// MaxParallel overrides the default max_parallel of 1 if set
MaxParallel int
// AddAlloc will be called 10 times to create test allocs
//
// Allocs default to be healthy on the draining node
AddAlloc func(i int, a *structs.Allocation, drainingID, runningID string)
}
func TestHandeTaskGroup_Table(t *testing.T) {
ci.Parallel(t) ci.Parallel(t)
cases := []handleTaskGroupTestCase{ testCases := []struct {
name string
batch bool // use a batch job
allocCount int // number of allocs in test (defaults to 10)
maxParallel int // max_parallel (defaults to 1)
// addAllocFn will be called allocCount times to create test allocs,
// and the allocs default to be healthy on the draining node
addAllocFn func(idx int, a *structs.Allocation, drainingID, runningID string)
expectDrained int
expectMigrated int
expectDone bool
}{
{ {
// All allocs on draining node // all allocs on draining node, should respect max_parallel=1
Name: "AllDraining", name: "drain-respects-max-parallel-1",
ExpectedDrained: 1, expectDrained: 1,
ExpectedMigrated: 0, expectMigrated: 0,
ExpectedDone: false, expectDone: false,
}, },
{ {
// All allocs on non-draining node // allocs on a non-draining node, should not be drained
Name: "AllNonDraining", name: "allocs-on-non-draining-node-should-not-drain",
ExpectedDrained: 0, expectDrained: 0,
ExpectedMigrated: 0, expectMigrated: 0,
ExpectedDone: true, expectDone: true,
AddAlloc: func(i int, a *structs.Allocation, drainingID, runningID string) { addAllocFn: func(i int, a *structs.Allocation, drainingID, runningID string) {
a.NodeID = runningID a.NodeID = runningID
}, },
}, },
{ {
// Some allocs on non-draining node but not healthy // even unhealthy allocs on a non-draining node should not be drained
Name: "SomeNonDrainingUnhealthy", name: "unhealthy-allocs-on-non-draining-node-should-not-drain",
ExpectedDrained: 0, expectDrained: 0,
ExpectedMigrated: 0, expectMigrated: 0,
ExpectedDone: false, expectDone: false,
AddAlloc: func(i int, a *structs.Allocation, drainingID, runningID string) { addAllocFn: func(i int, a *structs.Allocation, drainingID, runningID string) {
if i%2 == 0 { if i%2 == 0 {
a.NodeID = runningID a.NodeID = runningID
a.DeploymentStatus = nil a.DeploymentStatus = nil
@ -366,24 +368,24 @@ func TestHandeTaskGroup_Table(t *testing.T) {
}, },
}, },
{ {
// One draining, other allocs on non-draining node and healthy // only the alloc on draining node should be drained
Name: "OneDraining", name: "healthy-alloc-draining-node-should-drain",
ExpectedDrained: 1, expectDrained: 1,
ExpectedMigrated: 0, expectMigrated: 0,
ExpectedDone: false, expectDone: false,
AddAlloc: func(i int, a *structs.Allocation, drainingID, runningID string) { addAllocFn: func(i int, a *structs.Allocation, drainingID, runningID string) {
if i != 0 { if i != 0 {
a.NodeID = runningID a.NodeID = runningID
} }
}, },
}, },
{ {
// One already draining, other allocs on non-draining node and healthy // alloc that's still draining doesn't produce more result updates
Name: "OneAlreadyDraining", name: "still-draining-alloc-no-new-updates",
ExpectedDrained: 0, expectDrained: 0,
ExpectedMigrated: 0, expectMigrated: 0,
ExpectedDone: false, expectDone: false,
AddAlloc: func(i int, a *structs.Allocation, drainingID, runningID string) { addAllocFn: func(i int, a *structs.Allocation, drainingID, runningID string) {
if i == 0 { if i == 0 {
a.DesiredTransition.Migrate = pointer.Of(true) a.DesiredTransition.Migrate = pointer.Of(true)
return return
@ -392,77 +394,97 @@ func TestHandeTaskGroup_Table(t *testing.T) {
}, },
}, },
{ {
// One already drained, other allocs on non-draining node and healthy // alloc that's finished draining gets marked as migrated
Name: "OneAlreadyDrained", name: "client-terminal-alloc-drain-should-be-finished",
ExpectedDrained: 0, expectDrained: 0,
ExpectedMigrated: 1, expectMigrated: 1,
ExpectedDone: true, expectDone: true,
AddAlloc: func(i int, a *structs.Allocation, drainingID, runningID string) { addAllocFn: func(i int, a *structs.Allocation, drainingID, runningID string) {
if i == 0 { if i == 0 {
a.DesiredStatus = structs.AllocDesiredStatusStop a.DesiredStatus = structs.AllocDesiredStatusStop
a.ClientStatus = structs.AllocClientStatusComplete
return return
} }
a.NodeID = runningID a.NodeID = runningID
}, },
}, },
{ {
// One already drained, other allocs on non-draining node and healthy // batch alloc that's finished draining gets marked as migrated
Name: "OneAlreadyDrainedBatched", name: "client-terminal-batch-alloc-drain-should-be-finished",
Batch: true, batch: true,
ExpectedDrained: 0, expectDrained: 0,
ExpectedMigrated: 1, expectMigrated: 1,
ExpectedDone: true, expectDone: true,
AddAlloc: func(i int, a *structs.Allocation, drainingID, runningID string) { addAllocFn: func(i int, a *structs.Allocation, drainingID, runningID string) {
if i == 0 { if i == 0 {
a.DesiredStatus = structs.AllocDesiredStatusStop a.DesiredStatus = structs.AllocDesiredStatusStop
a.ClientStatus = structs.AllocClientStatusComplete
return return
} }
a.NodeID = runningID a.NodeID = runningID
}, },
}, },
{ {
// All allocs are terminl, nothing to be drained // all allocs are client-terminal, so nothing left to drain
Name: "AllMigrating", name: "all-client-terminal-drain-should-be-finished",
ExpectedDrained: 0, expectDrained: 0,
ExpectedMigrated: 10, expectMigrated: 10,
ExpectedDone: true, expectDone: true,
AddAlloc: func(i int, a *structs.Allocation, drainingID, runningID string) { addAllocFn: func(i int, a *structs.Allocation, drainingID, runningID string) {
a.DesiredStatus = structs.AllocDesiredStatusStop a.DesiredStatus = structs.AllocDesiredStatusStop
a.ClientStatus = structs.AllocClientStatusComplete
}, },
}, },
{ {
// All allocs are terminl, nothing to be drained // all allocs are terminal, but only half are client-terminal
Name: "AllMigratingBatch", name: "half-client-terminal-drain-should-not-be-finished",
Batch: true, expectDrained: 0,
ExpectedDrained: 0, expectMigrated: 5,
ExpectedMigrated: 10, expectDone: false,
ExpectedDone: true, addAllocFn: func(i int, a *structs.Allocation, drainingID, runningID string) {
AddAlloc: func(i int, a *structs.Allocation, drainingID, runningID string) {
a.DesiredStatus = structs.AllocDesiredStatusStop a.DesiredStatus = structs.AllocDesiredStatusStop
if i%2 == 0 {
a.ClientStatus = structs.AllocClientStatusComplete
}
}, },
}, },
{ {
// All allocs may be drained at once // All allocs are terminal, nothing to be drained
Name: "AllAtOnce", name: "all-terminal-batch",
ExpectedDrained: 10, batch: true,
ExpectedMigrated: 0, expectDrained: 0,
ExpectedDone: false, expectMigrated: 10,
MaxParallel: 10, expectDone: true,
addAllocFn: func(i int, a *structs.Allocation, drainingID, runningID string) {
a.DesiredStatus = structs.AllocDesiredStatusStop
a.ClientStatus = structs.AllocClientStatusComplete
},
}, },
{ {
// Drain 2 // with max_parallel=10, all allocs can be drained at once
Name: "Drain2", name: "drain-respects-max-parallel-all-at-once",
ExpectedDrained: 2, expectDrained: 10,
ExpectedMigrated: 0, expectMigrated: 0,
ExpectedDone: false, expectDone: false,
MaxParallel: 2, maxParallel: 10,
}, },
{ {
// One on new node, one drained, and one draining // with max_parallel=2, up to 2 allocs can be drained at a time
ExpectedDrained: 1, name: "drain-respects-max-parallel-2",
ExpectedMigrated: 1, expectDrained: 2,
MaxParallel: 2, expectMigrated: 0,
AddAlloc: func(i int, a *structs.Allocation, drainingID, runningID string) { expectDone: false,
maxParallel: 2,
},
{
// with max_parallel=2, up to 2 allocs can be drained at a time but
// we haven't yet informed the drainer that 1 has completed
// migrating
name: "notify-migrated-1-on-new-1-drained-1-draining",
expectDrained: 1,
expectMigrated: 1,
maxParallel: 2,
addAllocFn: func(i int, a *structs.Allocation, drainingID, runningID string) {
switch i { switch i {
case 0: case 0:
// One alloc on running node // One alloc on running node
@ -470,44 +492,55 @@ func TestHandeTaskGroup_Table(t *testing.T) {
case 1: case 1:
// One alloc already migrated // One alloc already migrated
a.DesiredStatus = structs.AllocDesiredStatusStop a.DesiredStatus = structs.AllocDesiredStatusStop
a.ClientStatus = structs.AllocClientStatusComplete
} }
}, },
}, },
{ {
// 8 on new node, one drained, and one draining // with max_parallel=2, up to 2 allocs can be drained at a time but
ExpectedDrained: 1, // we haven't yet informed the drainer that 1 has completed
ExpectedMigrated: 1, // migrating
MaxParallel: 2, name: "notify-migrated-8-on-new-1-drained-1-draining",
AddAlloc: func(i int, a *structs.Allocation, drainingID, runningID string) { expectDrained: 1,
expectMigrated: 1,
maxParallel: 2,
addAllocFn: func(i int, a *structs.Allocation, drainingID, runningID string) {
switch i { switch i {
case 0, 1, 2, 3, 4, 5, 6, 7: case 0, 1, 2, 3, 4, 5, 6, 7:
a.NodeID = runningID a.NodeID = runningID
case 8: case 8:
a.DesiredStatus = structs.AllocDesiredStatusStop a.DesiredStatus = structs.AllocDesiredStatusStop
a.ClientStatus = structs.AllocClientStatusComplete
} }
}, },
}, },
{ {
// 5 on new node, two drained, and three draining // 5 on new node, two drained, and three draining
ExpectedDrained: 3, // with max_parallel=5, up to 5 allocs can be drained at a time but
ExpectedMigrated: 2, // we haven't yet informed the drainer that 2 have completed
MaxParallel: 5, // migrating
AddAlloc: func(i int, a *structs.Allocation, drainingID, runningID string) { name: "notify-migrated-5-on-new-2-drained-3-draining",
expectDrained: 3,
expectMigrated: 2,
maxParallel: 5,
addAllocFn: func(i int, a *structs.Allocation, drainingID, runningID string) {
switch i { switch i {
case 0, 1, 2, 3, 4: case 0, 1, 2, 3, 4:
a.NodeID = runningID a.NodeID = runningID
case 8, 9: case 8, 9:
a.DesiredStatus = structs.AllocDesiredStatusStop a.DesiredStatus = structs.AllocDesiredStatusStop
a.ClientStatus = structs.AllocClientStatusComplete
} }
}, },
}, },
{ {
// Not all on new node have health set // half the allocs have been moved to the new node but 1 doesn't
Name: "PendingHealth", // have health set yet, so we should have MaxParallel - 1 in flight
ExpectedDrained: 1, name: "pending-health-blocks",
ExpectedMigrated: 1, expectDrained: 1,
MaxParallel: 3, expectMigrated: 1,
AddAlloc: func(i int, a *structs.Allocation, drainingID, runningID string) { maxParallel: 3,
addAllocFn: func(i int, a *structs.Allocation, drainingID, runningID string) {
switch i { switch i {
case 0: case 0:
// Deployment status UNset for 1 on new node // Deployment status UNset for 1 on new node
@ -518,16 +551,18 @@ func TestHandeTaskGroup_Table(t *testing.T) {
a.NodeID = runningID a.NodeID = runningID
case 9: case 9:
a.DesiredStatus = structs.AllocDesiredStatusStop a.DesiredStatus = structs.AllocDesiredStatusStop
a.ClientStatus = structs.AllocClientStatusComplete
} }
}, },
}, },
{ {
// 5 max parallel - 1 migrating - 2 with unset health = 2 drainable // half the allocs have been moved to the new node but 2 don't have
Name: "PendingHealthHigherMax", // health set yet, so we should have MaxParallel - 2 in flight
ExpectedDrained: 2, name: "pending-health-blocks-higher-max",
ExpectedMigrated: 1, expectDrained: 2,
MaxParallel: 5, expectMigrated: 1,
AddAlloc: func(i int, a *structs.Allocation, drainingID, runningID string) { maxParallel: 5,
addAllocFn: func(i int, a *structs.Allocation, drainingID, runningID string) {
switch i { switch i {
case 0, 1: case 0, 1:
// Deployment status UNset for 2 on new node // Deployment status UNset for 2 on new node
@ -538,45 +573,38 @@ func TestHandeTaskGroup_Table(t *testing.T) {
a.NodeID = runningID a.NodeID = runningID
case 9: case 9:
a.DesiredStatus = structs.AllocDesiredStatusStop a.DesiredStatus = structs.AllocDesiredStatusStop
a.ClientStatus = structs.AllocClientStatusComplete
} }
}, },
}, },
} }
for _, testCase := range cases { for _, tc := range testCases {
t.Run(testCase.Name, func(t *testing.T) { tc := tc
testHandleTaskGroup(t, testCase) t.Run(tc.name, func(t *testing.T) {
})
}
}
func testHandleTaskGroup(t *testing.T, tc handleTaskGroupTestCase) {
ci.Parallel(t) ci.Parallel(t)
require := require.New(t)
assert := assert.New(t)
// Create nodes // Create nodes
state := state.TestStateStore(t) store := state.TestStateStore(t)
drainingNode, runningNode := testNodes(t, state) drainingNode, runningNode := testNodes(t, store)
job := mock.Job() job := mock.Job()
if tc.Batch { if tc.batch {
job = mock.BatchJob() job = mock.BatchJob()
} }
job.TaskGroups[0].Count = 10 job.TaskGroups[0].Count = 10
if tc.Count > 0 { if tc.allocCount > 0 {
job.TaskGroups[0].Count = tc.Count job.TaskGroups[0].Count = tc.allocCount
} }
if tc.MaxParallel > 0 { if tc.maxParallel > 0 {
job.TaskGroups[0].Migrate.MaxParallel = tc.MaxParallel job.TaskGroups[0].Migrate.MaxParallel = tc.maxParallel
} }
require.Nil(state.UpsertJob(structs.MsgTypeTestSetup, 102, nil, job)) must.NoError(t, store.UpsertJob(structs.MsgTypeTestSetup, 102, nil, job))
var allocs []*structs.Allocation var allocs []*structs.Allocation
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
a := mock.Alloc() a := mock.Alloc()
if tc.Batch { if tc.batch {
a = mock.BatchAlloc() a = mock.BatchAlloc()
} }
a.JobID = job.ID a.JobID = job.ID
@ -588,23 +616,23 @@ func testHandleTaskGroup(t *testing.T, tc handleTaskGroupTestCase) {
a.DeploymentStatus = &structs.AllocDeploymentStatus{ a.DeploymentStatus = &structs.AllocDeploymentStatus{
Healthy: pointer.Of(true), Healthy: pointer.Of(true),
} }
if tc.AddAlloc != nil { if tc.addAllocFn != nil {
tc.AddAlloc(i, a, drainingNode.ID, runningNode.ID) tc.addAllocFn(i, a, drainingNode.ID, runningNode.ID)
} }
allocs = append(allocs, a) allocs = append(allocs, a)
} }
require.Nil(state.UpsertAllocs(structs.MsgTypeTestSetup, 103, allocs)) must.NoError(t, store.UpsertAllocs(structs.MsgTypeTestSetup, 103, allocs))
snap, err := state.Snapshot() snap, err := store.Snapshot()
require.Nil(err) must.NoError(t, err)
res := newJobResult() res := newJobResult()
require.Nil(handleTaskGroup(snap, tc.Batch, job.TaskGroups[0], allocs, 102, res)) must.NoError(t, handleTaskGroup(snap, tc.batch, job.TaskGroups[0], allocs, 102, res))
assert.Lenf(res.drain, tc.ExpectedDrained, "Drain expected %d but found: %d", test.Len(t, tc.expectDrained, res.drain, test.Sprint("expected drained allocs"))
tc.ExpectedDrained, len(res.drain)) test.Len(t, tc.expectMigrated, res.migrated, test.Sprint("expected migrated allocs"))
assert.Lenf(res.migrated, tc.ExpectedMigrated, "Migrate expected %d but found: %d", test.Eq(t, tc.expectDone, res.done)
tc.ExpectedMigrated, len(res.migrated)) })
assert.Equal(tc.ExpectedDone, res.done) }
} }
func TestHandleTaskGroup_Migrations(t *testing.T) { func TestHandleTaskGroup_Migrations(t *testing.T) {
@ -638,6 +666,7 @@ func TestHandleTaskGroup_Migrations(t *testing.T) {
if i%2 == 0 { if i%2 == 0 {
a.DesiredStatus = structs.AllocDesiredStatusStop a.DesiredStatus = structs.AllocDesiredStatusStop
a.ClientStatus = structs.AllocClientStatusComplete
} else { } else {
a.ClientStatus = structs.AllocClientStatusFailed a.ClientStatus = structs.AllocClientStatusFailed
} }
@ -707,6 +736,7 @@ func TestHandleTaskGroup_GarbageCollectedNode(t *testing.T) {
if i%2 == 0 { if i%2 == 0 {
a.DesiredStatus = structs.AllocDesiredStatusStop a.DesiredStatus = structs.AllocDesiredStatusStop
a.ClientStatus = structs.AllocClientStatusComplete
} else { } else {
a.ClientStatus = structs.AllocClientStatusFailed a.ClientStatus = structs.AllocClientStatusFailed
} }

View File

@ -56,6 +56,8 @@ func allocClientStateSimulator(t *testing.T, errCh chan<- error, ctx context.Con
continue continue
} }
switch alloc.DesiredStatus {
case structs.AllocDesiredStatusRun:
if alloc.DeploymentStatus.HasHealth() { if alloc.DeploymentStatus.HasHealth() {
continue // only update to healthy once continue // only update to healthy once
} }
@ -66,6 +68,17 @@ func allocClientStateSimulator(t *testing.T, errCh chan<- error, ctx context.Con
} }
updates = append(updates, newAlloc) updates = append(updates, newAlloc)
logger.Trace("marking deployment health for alloc", "alloc_id", alloc.ID) logger.Trace("marking deployment health for alloc", "alloc_id", alloc.ID)
case structs.AllocDesiredStatusStop, structs.AllocDesiredStatusEvict:
if alloc.ClientStatus == structs.AllocClientStatusComplete {
continue // only update to complete once
}
newAlloc := alloc.Copy()
newAlloc.ClientStatus = structs.AllocClientStatusComplete
updates = append(updates, newAlloc)
logger.Trace("marking alloc complete", "alloc_id", alloc.ID)
}
} }
if len(updates) == 0 { if len(updates) == 0 {