Merge pull request #3217 from hashicorp/b-batch-filter

Fix batch handling of complete allocs/node drains
This commit is contained in:
Alex Dadgar 2017-09-14 15:11:40 -07:00 committed by GitHub
commit c08f9e729f
6 changed files with 353 additions and 191 deletions

View file

@ -6,6 +6,12 @@ IMPROVEMENTS:
BUG FIXES:
* core: Fix restoration of stopped periodic jobs [GH-3201]
* core: Fix issue where node-drain with complete batch allocation would create
replacement [GH-3217]
* core: Fix issue in which batch allocations from previous job versions may not
have been stopped properly. [GH-3217]
* core: Fix issue in which allocations with the same name during a scale
down/stop event wouldn't be properly stopped [GH-3217]
* core: Fix a race condition in which scheduling results from one invocation of
the scheduler wouldn't be considered by the next for the same job [GH-3206]
* api: Sort /v1/agent/servers output so that output of Consul checks does not

View file

@ -4630,18 +4630,7 @@ func (a *Allocation) Terminated() bool {
// RanSuccessfully returns whether the client has ran the allocation and all
// tasks finished successfully
func (a *Allocation) RanSuccessfully() bool {
// Handle the case the client hasn't started the allocation.
if len(a.TaskStates) == 0 {
return false
}
// Check to see if all the tasks finised successfully in the allocation
allSuccess := true
for _, state := range a.TaskStates {
allSuccess = allSuccess && state.Successful()
}
return allSuccess
return a.ClientStatus == AllocClientStatusComplete
}
// ShouldMigrate returns if the allocation needs data migration

View file

@ -294,7 +294,7 @@ func (s *GenericScheduler) process() (bool, error) {
// filterCompleteAllocs filters allocations that are terminal and should be
// re-placed.
func (s *GenericScheduler) filterCompleteAllocs(allocs []*structs.Allocation) ([]*structs.Allocation, map[string]*structs.Allocation) {
func (s *GenericScheduler) filterCompleteAllocs(allocs []*structs.Allocation) []*structs.Allocation {
filter := func(a *structs.Allocation) bool {
if s.batch {
// Allocs from batch jobs should be filtered when the desired status
@ -319,19 +319,9 @@ func (s *GenericScheduler) filterCompleteAllocs(allocs []*structs.Allocation) ([
return a.TerminalStatus()
}
terminalAllocsByName := make(map[string]*structs.Allocation)
n := len(allocs)
for i := 0; i < n; i++ {
if filter(allocs[i]) {
// Add the allocation to the terminal allocs map if it's not already
// added or has a higher create index than the one which is
// currently present.
alloc, ok := terminalAllocsByName[allocs[i].Name]
if !ok || alloc.CreateIndex < allocs[i].CreateIndex {
terminalAllocsByName[allocs[i].Name] = allocs[i]
}
// Remove the allocation
allocs[i], allocs[n-1] = allocs[n-1], nil
i--
@ -339,25 +329,7 @@ func (s *GenericScheduler) filterCompleteAllocs(allocs []*structs.Allocation) ([
}
}
// If the job is batch, we want to filter allocations that have been
// replaced by a newer version for the same task group.
filtered := allocs[:n]
if s.batch {
byTG := make(map[string]*structs.Allocation)
for _, alloc := range filtered {
existing := byTG[alloc.Name]
if existing == nil || existing.CreateIndex < alloc.CreateIndex {
byTG[alloc.Name] = alloc
}
}
filtered = make([]*structs.Allocation, 0, len(byTG))
for _, alloc := range byTG {
filtered = append(filtered, alloc)
}
}
return filtered, terminalAllocsByName
return allocs[:n]
}
// computeJobAllocs is used to reconcile differences between the job,
@ -383,7 +355,7 @@ func (s *GenericScheduler) computeJobAllocs() error {
updateNonTerminalAllocsToLost(s.plan, tainted, allocs)
// Filter out the allocations in a terminal state
allocs, _ = s.filterCompleteAllocs(allocs)
allocs = s.filterCompleteAllocs(allocs)
reconciler := NewAllocReconciler(s.ctx.Logger(),
genericAllocUpdateFn(s.ctx, s.stack, s.eval.ID),

View file

@ -2643,6 +2643,7 @@ func TestBatchSched_Run_CompleteAlloc(t *testing.T) {
// Create a job
job := mock.Job()
job.Type = structs.JobTypeBatch
job.TaskGroups[0].Count = 1
noErr(t, h.State.UpsertJob(h.NextIndex(), job))
@ -2688,61 +2689,6 @@ func TestBatchSched_Run_CompleteAlloc(t *testing.T) {
h.AssertEvalStatus(t, structs.EvalStatusComplete)
}
func TestBatchSched_Run_DrainedAlloc(t *testing.T) {
h := NewHarness(t)
// Create a node
node := mock.Node()
noErr(t, h.State.UpsertNode(h.NextIndex(), node))
// Create a job
job := mock.Job()
job.TaskGroups[0].Count = 1
noErr(t, h.State.UpsertJob(h.NextIndex(), job))
// Create a complete alloc
alloc := mock.Alloc()
alloc.Job = job
alloc.JobID = job.ID
alloc.NodeID = node.ID
alloc.Name = "my-job.web[0]"
alloc.DesiredStatus = structs.AllocDesiredStatusStop
alloc.ClientStatus = structs.AllocClientStatusComplete
noErr(t, h.State.UpsertAllocs(h.NextIndex(), []*structs.Allocation{alloc}))
// Create a mock evaluation to register the job
eval := &structs.Evaluation{
Namespace: structs.DefaultNamespace,
ID: structs.GenerateUUID(),
Priority: job.Priority,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: job.ID,
}
// Process the evaluation
err := h.Process(NewBatchScheduler, eval)
if err != nil {
t.Fatalf("err: %v", err)
}
// Ensure a plan
if len(h.Plans) != 1 {
t.Fatalf("bad: %#v", h.Plans)
}
// Lookup the allocations by JobID
ws := memdb.NewWatchSet()
out, err := h.State.AllocsByJob(ws, job.Namespace, job.ID, false)
noErr(t, err)
// Ensure a replacement alloc was placed.
if len(out) != 2 {
t.Fatalf("bad: %#v", out)
}
h.AssertEvalStatus(t, structs.EvalStatusComplete)
}
func TestBatchSched_Run_FailedAlloc(t *testing.T) {
h := NewHarness(t)
@ -2752,6 +2698,7 @@ func TestBatchSched_Run_FailedAlloc(t *testing.T) {
// Create a job
job := mock.Job()
job.Type = structs.JobTypeBatch
job.TaskGroups[0].Count = 1
noErr(t, h.State.UpsertJob(h.NextIndex(), job))
@ -2813,6 +2760,7 @@ func TestBatchSched_Run_FailedAllocQueuedAllocations(t *testing.T) {
// Create a job
job := mock.Job()
job.Type = structs.JobTypeBatch
job.TaskGroups[0].Count = 1
noErr(t, h.State.UpsertJob(h.NextIndex(), job))
@ -2948,12 +2896,7 @@ func TestBatchSched_JobModify_InPlace_Terminal(t *testing.T) {
}
noErr(t, h.State.UpsertAllocs(h.NextIndex(), allocs))
// Update the job
job2 := mock.Job()
job2.ID = job.ID
noErr(t, h.State.UpsertJob(h.NextIndex(), job2))
// Create a mock evaluation to deal with drain
// Create a mock evaluation to trigger the job
eval := &structs.Evaluation{
Namespace: structs.DefaultNamespace,
ID: structs.GenerateUUID(),
@ -2969,106 +2912,268 @@ func TestBatchSched_JobModify_InPlace_Terminal(t *testing.T) {
}
// Ensure no plan
if len(h.Plans) != 0 {
t.Fatalf("bad: %#v", h.Plans[0])
}
}
// This test ensures that terminal jobs from older versions are ignored.
func TestBatchSched_JobModify_Destructive_Terminal(t *testing.T) {
h := NewHarness(t)
// Create some nodes
var nodes []*structs.Node
for i := 0; i < 10; i++ {
node := mock.Node()
nodes = append(nodes, node)
noErr(t, h.State.UpsertNode(h.NextIndex(), node))
}
// Generate a fake job with allocations
job := mock.Job()
job.Type = structs.JobTypeBatch
noErr(t, h.State.UpsertJob(h.NextIndex(), job))
var allocs []*structs.Allocation
for i := 0; i < 10; i++ {
alloc := mock.Alloc()
alloc.Job = job
alloc.JobID = job.ID
alloc.NodeID = nodes[i].ID
alloc.Name = fmt.Sprintf("my-job.web[%d]", i)
alloc.ClientStatus = structs.AllocClientStatusComplete
allocs = append(allocs, alloc)
}
noErr(t, h.State.UpsertAllocs(h.NextIndex(), allocs))
// Update the job
job2 := mock.Job()
job2.ID = job.ID
job2.Type = structs.JobTypeBatch
job2.Version++
job2.TaskGroups[0].Tasks[0].Env = map[string]string{"foo": "bar"}
noErr(t, h.State.UpsertJob(h.NextIndex(), job2))
allocs = nil
for i := 0; i < 10; i++ {
alloc := mock.Alloc()
alloc.Job = job2
alloc.JobID = job2.ID
alloc.NodeID = nodes[i].ID
alloc.Name = fmt.Sprintf("my-job.web[%d]", i)
alloc.ClientStatus = structs.AllocClientStatusComplete
alloc.TaskStates = map[string]*structs.TaskState{
"web": &structs.TaskState{
State: structs.TaskStateDead,
Events: []*structs.TaskEvent{
{
Type: structs.TaskTerminated,
ExitCode: 0,
},
},
},
}
allocs = append(allocs, alloc)
}
noErr(t, h.State.UpsertAllocs(h.NextIndex(), allocs))
// Create a mock evaluation to deal with drain
eval := &structs.Evaluation{
Namespace: structs.DefaultNamespace,
ID: structs.GenerateUUID(),
Priority: 50,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: job.ID,
}
// Process the evaluation
err := h.Process(NewBatchScheduler, eval)
if err != nil {
t.Fatalf("err: %v", err)
}
// Ensure a plan
if len(h.Plans) != 0 {
t.Fatalf("bad: %#v", h.Plans)
}
}
func TestGenericSched_FilterCompleteAllocs(t *testing.T) {
running := mock.Alloc()
desiredStop := mock.Alloc()
desiredStop.DesiredStatus = structs.AllocDesiredStatusStop
// This test asserts that an allocation from an old job that is running on a
// drained node is cleaned up.
func TestBatchSched_NodeDrain_Running_OldJob(t *testing.T) {
h := NewHarness(t)
new := mock.Alloc()
new.CreateIndex = 10000
// Create two nodes, one that is drained and has a successfully finished
// alloc and a fresh undrained one
node := mock.Node()
node.Drain = true
node2 := mock.Node()
noErr(t, h.State.UpsertNode(h.NextIndex(), node))
noErr(t, h.State.UpsertNode(h.NextIndex(), node2))
oldSuccessful := mock.Alloc()
oldSuccessful.CreateIndex = 30
oldSuccessful.DesiredStatus = structs.AllocDesiredStatusStop
oldSuccessful.ClientStatus = structs.AllocClientStatusComplete
oldSuccessful.TaskStates = make(map[string]*structs.TaskState, 1)
oldSuccessful.TaskStates["foo"] = &structs.TaskState{
State: structs.TaskStateDead,
Events: []*structs.TaskEvent{{Type: structs.TaskTerminated, ExitCode: 0}},
// Create a job
job := mock.Job()
job.Type = structs.JobTypeBatch
job.TaskGroups[0].Count = 1
noErr(t, h.State.UpsertJob(h.NextIndex(), job))
// Create a running alloc
alloc := mock.Alloc()
alloc.Job = job
alloc.JobID = job.ID
alloc.NodeID = node.ID
alloc.Name = "my-job.web[0]"
alloc.ClientStatus = structs.AllocClientStatusRunning
noErr(t, h.State.UpsertAllocs(h.NextIndex(), []*structs.Allocation{alloc}))
// Create an update job
job2 := job.Copy()
job2.TaskGroups[0].Tasks[0].Env = map[string]string{"foo": "bar"}
noErr(t, h.State.UpsertJob(h.NextIndex(), job2))
// Create a mock evaluation to register the job
eval := &structs.Evaluation{
Namespace: structs.DefaultNamespace,
ID: structs.GenerateUUID(),
Priority: job.Priority,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: job.ID,
}
unsuccessful := mock.Alloc()
unsuccessful.DesiredStatus = structs.AllocDesiredStatusRun
unsuccessful.ClientStatus = structs.AllocClientStatusFailed
unsuccessful.TaskStates = make(map[string]*structs.TaskState, 1)
unsuccessful.TaskStates["foo"] = &structs.TaskState{
State: structs.TaskStateDead,
Events: []*structs.TaskEvent{{Type: structs.TaskTerminated, ExitCode: 1}},
// Process the evaluation
err := h.Process(NewBatchScheduler, eval)
if err != nil {
t.Fatalf("err: %v", err)
}
cases := []struct {
Batch bool
Input, Output []*structs.Allocation
TerminalAllocs map[string]*structs.Allocation
}{
{
Input: []*structs.Allocation{running},
Output: []*structs.Allocation{running},
TerminalAllocs: map[string]*structs.Allocation{},
},
{
Input: []*structs.Allocation{running, desiredStop},
Output: []*structs.Allocation{running},
TerminalAllocs: map[string]*structs.Allocation{
desiredStop.Name: desiredStop,
},
},
{
Batch: true,
Input: []*structs.Allocation{running},
Output: []*structs.Allocation{running},
TerminalAllocs: map[string]*structs.Allocation{},
},
{
Batch: true,
Input: []*structs.Allocation{new, oldSuccessful},
Output: []*structs.Allocation{new},
TerminalAllocs: map[string]*structs.Allocation{},
},
{
Batch: true,
Input: []*structs.Allocation{unsuccessful},
Output: []*structs.Allocation{},
TerminalAllocs: map[string]*structs.Allocation{
unsuccessful.Name: unsuccessful,
},
},
// Ensure a plan
if len(h.Plans) != 1 {
t.Fatalf("bad: %#v", h.Plans)
}
for i, c := range cases {
g := &GenericScheduler{batch: c.Batch}
out, terminalAllocs := g.filterCompleteAllocs(c.Input)
if !reflect.DeepEqual(out, c.Output) {
t.Log("Got:")
for i, a := range out {
t.Logf("%d: %#v", i, a)
}
t.Log("Want:")
for i, a := range c.Output {
t.Logf("%d: %#v", i, a)
}
t.Fatalf("Case %d failed", i+1)
}
if !reflect.DeepEqual(terminalAllocs, c.TerminalAllocs) {
t.Log("Got:")
for n, a := range terminalAllocs {
t.Logf("%v: %#v", n, a)
}
t.Log("Want:")
for n, a := range c.TerminalAllocs {
t.Logf("%v: %#v", n, a)
}
t.Fatalf("Case %d failed", i+1)
}
plan := h.Plans[0]
// Ensure the plan evicted 1
if len(plan.NodeUpdate[node.ID]) != 1 {
t.Fatalf("bad: %#v", plan)
}
// Ensure the plan places 1
if len(plan.NodeAllocation[node2.ID]) != 1 {
t.Fatalf("bad: %#v", plan)
}
h.AssertEvalStatus(t, structs.EvalStatusComplete)
}
// This test asserts that an allocation from a job that is complete on a
// drained node is ignored up.
func TestBatchSched_NodeDrain_Complete(t *testing.T) {
h := NewHarness(t)
// Create two nodes, one that is drained and has a successfully finished
// alloc and a fresh undrained one
node := mock.Node()
node.Drain = true
node2 := mock.Node()
noErr(t, h.State.UpsertNode(h.NextIndex(), node))
noErr(t, h.State.UpsertNode(h.NextIndex(), node2))
// Create a job
job := mock.Job()
job.Type = structs.JobTypeBatch
job.TaskGroups[0].Count = 1
noErr(t, h.State.UpsertJob(h.NextIndex(), job))
// Create a complete alloc
alloc := mock.Alloc()
alloc.Job = job
alloc.JobID = job.ID
alloc.NodeID = node.ID
alloc.Name = "my-job.web[0]"
alloc.ClientStatus = structs.AllocClientStatusComplete
noErr(t, h.State.UpsertAllocs(h.NextIndex(), []*structs.Allocation{alloc}))
// Create a mock evaluation to register the job
eval := &structs.Evaluation{
Namespace: structs.DefaultNamespace,
ID: structs.GenerateUUID(),
Priority: job.Priority,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: job.ID,
}
// Process the evaluation
err := h.Process(NewBatchScheduler, eval)
if err != nil {
t.Fatalf("err: %v", err)
}
// Ensure no plan
if len(h.Plans) != 0 {
t.Fatalf("bad: %#v", h.Plans)
}
h.AssertEvalStatus(t, structs.EvalStatusComplete)
}
// This is a slightly odd test but it ensures that we handle a scale down of a
// task group's count and that it works even if all the allocs have the same
// name.
func TestBatchSched_ScaleDown_SameName(t *testing.T) {
h := NewHarness(t)
// Create a node
node := mock.Node()
noErr(t, h.State.UpsertNode(h.NextIndex(), node))
// Create a job
job := mock.Job()
job.Type = structs.JobTypeBatch
job.TaskGroups[0].Count = 1
noErr(t, h.State.UpsertJob(h.NextIndex(), job))
// Create a few running alloc
var allocs []*structs.Allocation
for i := 0; i < 5; i++ {
alloc := mock.Alloc()
alloc.Job = job
alloc.JobID = job.ID
alloc.NodeID = node.ID
alloc.Name = "my-job.web[0]"
alloc.ClientStatus = structs.AllocClientStatusRunning
allocs = append(allocs, alloc)
}
noErr(t, h.State.UpsertAllocs(h.NextIndex(), allocs))
// Create a mock evaluation to register the job
eval := &structs.Evaluation{
Namespace: structs.DefaultNamespace,
ID: structs.GenerateUUID(),
Priority: job.Priority,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: job.ID,
}
// Process the evaluation
err := h.Process(NewBatchScheduler, eval)
if err != nil {
t.Fatalf("err: %v", err)
}
// Ensure a plan
if len(h.Plans) != 1 {
t.Fatalf("bad: %#v", h.Plans)
}
plan := h.Plans[0]
// Ensure the plan evicted 4 of the 5
if len(plan.NodeUpdate[node.ID]) != 4 {
t.Fatalf("bad: %#v", plan)
}
h.AssertEvalStatus(t, structs.EvalStatusComplete)
}
func TestGenericSched_ChainedAlloc(t *testing.T) {

View file

@ -296,6 +296,10 @@ func (a *allocReconciler) computeGroup(group string, all allocSet) bool {
}
}
// Filter batch allocations that do not need to be considered.
all, ignore := a.batchFiltration(all)
desiredChanges.Ignore += uint64(len(ignore))
canaries, all := a.handleGroupCanaries(all, desiredChanges)
// Determine what set of allocations are on tainted nodes
@ -484,6 +488,27 @@ func (a *allocReconciler) computeGroup(group string, all allocSet) bool {
return deploymentComplete
}
// batchFiltration filters batch allocations that should be ignored. These are
// allocations that are terminal from a previous job version.
func (a *allocReconciler) batchFiltration(all allocSet) (filtered, ignore allocSet) {
if !a.batch {
return all, nil
}
filtered = filtered.union(all)
ignored := make(map[string]*structs.Allocation)
// Ignore terminal batch jobs from older versions
for id, alloc := range filtered {
if alloc.Job.Version < a.job.Version && alloc.TerminalStatus() {
delete(filtered, id)
ignored[id] = alloc
}
}
return filtered, ignored
}
// handleGroupCanaries handles the canaries for the group by stopping the
// unneeded ones and returning the current set of canaries and the updated total
// set of allocs for the group
@ -673,12 +698,34 @@ func (a *allocReconciler) computeStop(group *structs.TaskGroup, nameIndex *alloc
// Select the allocs with the highest count to remove
removeNames := nameIndex.Highest(uint(remove))
for id, alloc := range untainted {
if _, remove := removeNames[alloc.Name]; remove {
if _, ok := removeNames[alloc.Name]; ok {
stop[id] = alloc
a.result.stop = append(a.result.stop, allocStopResult{
alloc: alloc,
statusDescription: allocNotNeeded,
})
delete(untainted, id)
remove--
if remove == 0 {
return stop
}
}
}
// It is possible that we didn't stop as many as we should have if there
// were allocations with duplicate names.
for id, alloc := range untainted {
stop[id] = alloc
a.result.stop = append(a.result.stop, allocStopResult{
alloc: alloc,
statusDescription: allocNotNeeded,
})
delete(untainted, id)
remove--
if remove == 0 {
return stop
}
}

View file

@ -150,6 +150,7 @@ func allocNameToIndex(name string) uint {
}
func assertNamesHaveIndexes(t *testing.T, indexes []int, names []string) {
t.Helper()
m := make(map[uint]int)
for _, i := range indexes {
m[uint(i)] += 1
@ -177,6 +178,7 @@ func assertNamesHaveIndexes(t *testing.T, indexes []int, names []string) {
}
func assertNoCanariesStopped(t *testing.T, d *structs.Deployment, stop []allocStopResult) {
t.Helper()
canaryIndex := make(map[string]struct{})
for _, state := range d.TaskGroups {
for _, c := range state.PlacedCanaries {
@ -192,6 +194,7 @@ func assertNoCanariesStopped(t *testing.T, d *structs.Deployment, stop []allocSt
}
func assertPlaceResultsHavePreviousAllocs(t *testing.T, numPrevious int, place []allocPlaceResult) {
t.Helper()
names := make(map[string]struct{}, numPrevious)
found := 0
@ -273,7 +276,7 @@ type resultExpectation struct {
}
func assertResults(t *testing.T, r *reconcileResults, exp *resultExpectation) {
t.Helper()
if exp.createDeployment != nil && r.deployment == nil {
t.Fatalf("Expect a created deployment got none")
} else if exp.createDeployment == nil && r.deployment != nil {
@ -459,6 +462,46 @@ func TestReconciler_ScaleDown_Zero(t *testing.T) {
assertNamesHaveIndexes(t, intRange(0, 19), stopResultsToNames(r.stop))
}
// Tests the reconciler properly handles stopping allocations for a job that has
// scaled down to zero desired where allocs have duplicate names
func TestReconciler_ScaleDown_Zero_DuplicateNames(t *testing.T) {
// Set desired 0
job := mock.Job()
job.TaskGroups[0].Count = 0
// Create 20 existing allocations
var allocs []*structs.Allocation
var expectedStopped []int
for i := 0; i < 20; i++ {
alloc := mock.Alloc()
alloc.Job = job
alloc.JobID = job.ID
alloc.NodeID = structs.GenerateUUID()
alloc.Name = structs.AllocName(job.ID, job.TaskGroups[0].Name, uint(i%2))
allocs = append(allocs, alloc)
expectedStopped = append(expectedStopped, i%2)
}
reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, job.ID, job, nil, allocs, nil)
r := reconciler.Compute()
// Assert the correct results
assertResults(t, r, &resultExpectation{
createDeployment: nil,
deploymentUpdates: nil,
place: 0,
inplace: 0,
stop: 20,
desiredTGUpdates: map[string]*structs.DesiredUpdates{
job.TaskGroups[0].Name: {
Stop: 20,
},
},
})
assertNamesHaveIndexes(t, expectedStopped, stopResultsToNames(r.stop))
}
// Tests the reconciler properly handles inplace upgrading allocations
func TestReconciler_Inplace(t *testing.T) {
job := mock.Job()