Merge branch 'master' of github.com:hashicorp/nomad

This commit is contained in:
Alex Dadgar 2016-07-28 14:32:58 -07:00
commit 6f3f5c8ba5
4 changed files with 141 additions and 24 deletions

View file

@ -4,7 +4,6 @@ import (
"fmt"
"log"
"github.com/davecgh/go-spew/spew"
"github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/nomad/structs"
)
@ -309,7 +308,26 @@ func (s *GenericScheduler) filterCompleteAllocs(allocs []*structs.Allocation) []
n--
}
}
return allocs[:n]
// If the job is batch, we want to filter allocations that have been
// replaced by a newer version for the same task group.
filtered := allocs[:n]
if s.batch {
byTG := make(map[string]*structs.Allocation)
for _, alloc := range filtered {
existing := byTG[alloc.TaskGroup]
if existing == nil || existing.CreateIndex < alloc.CreateIndex {
byTG[alloc.TaskGroup] = alloc
}
}
filtered = make([]*structs.Allocation, 0, len(byTG))
for _, alloc := range byTG {
filtered = append(filtered, alloc)
}
}
return filtered
}
// computeJobAllocs is used to reconcile differences between the job,
@ -342,27 +360,6 @@ func (s *GenericScheduler) computeJobAllocs() error {
diff := diffAllocs(s.job, tainted, groups, allocs)
s.logger.Printf("[DEBUG] sched: %#v: %#v", s.eval, diff)
// XXX: For debugging purposes only. An issue was observed where a job had a
// task group with count > 0 that produced a diff where no action would be
// taken (every slice was empty). Below we dump debug information if this
// condition is hit.
diffSum := len(diff.stop) + len(diff.place) + len(diff.ignore) +
len(diff.update) + len(diff.migrate)
if diffSum == 0 && len(groups) != 0 {
s.logger.Printf("[ERR] sched: %d tasks to schedule but scheduler believes there is no work", len(groups))
// Get the original set of allocations for the job.
jobAllocs, err := s.state.AllocsByJob(s.eval.JobID)
if err != nil {
return fmt.Errorf("failed to get allocs for job '%s': %v", s.eval.JobID, err)
}
s.logger.Printf("[DEBUG] sched: job: %s", spew.Sdump(s.job))
s.logger.Printf("[DEBUG] sched: materializeTaskGroups() returned: %s", spew.Sdump(groups))
s.logger.Printf("[DEBUG] sched: AllocsByJob(%q) returned: %s", s.eval.JobID, spew.Sdump(jobAllocs))
s.logger.Printf("[DEBUG] sched: filterCompleteAllocs(): %s", spew.Sdump(allocs))
s.logger.Printf("[DEBUG] sched: taintedNodes(): %s", spew.Sdump(tainted))
}
// Add all the allocs to stop
for _, e := range diff.stop {
s.plan.AppendUpdate(e.Alloc, structs.AllocDesiredStatusStop, allocNotNeeded)

View file

@ -1337,7 +1337,7 @@ func TestServiceSched_NodeUpdate(t *testing.T) {
if err != nil {
t.Fatalf("err: %v", err)
}
if h.Evals[0].QueuedAllocations["web"] != 0 {
if val, ok := h.Evals[0].QueuedAllocations["web"]; !ok || val != 0 {
t.Fatalf("bad queued allocations: %v", h.Evals[0].QueuedAllocations)
}
@ -1866,3 +1866,77 @@ func TestBatchSched_ReRun_SuccessfullyFinishedAlloc(t *testing.T) {
h.AssertEvalStatus(t, structs.EvalStatusComplete)
}
func TestGenericSched_FilterCompleteAllocs(t *testing.T) {
running := mock.Alloc()
desiredStop := mock.Alloc()
desiredStop.DesiredStatus = structs.AllocDesiredStatusStop
new := mock.Alloc()
new.CreateIndex = 10000
oldSuccessful := mock.Alloc()
oldSuccessful.CreateIndex = 30
oldSuccessful.DesiredStatus = structs.AllocDesiredStatusStop
oldSuccessful.ClientStatus = structs.AllocClientStatusComplete
oldSuccessful.TaskStates = make(map[string]*structs.TaskState, 1)
oldSuccessful.TaskStates["foo"] = &structs.TaskState{
State: structs.TaskStateDead,
Events: []*structs.TaskEvent{{Type: structs.TaskTerminated, ExitCode: 0}},
}
unsuccessful := mock.Alloc()
unsuccessful.DesiredStatus = structs.AllocDesiredStatusRun
unsuccessful.ClientStatus = structs.AllocClientStatusFailed
unsuccessful.TaskStates = make(map[string]*structs.TaskState, 1)
unsuccessful.TaskStates["foo"] = &structs.TaskState{
State: structs.TaskStateDead,
Events: []*structs.TaskEvent{{Type: structs.TaskTerminated, ExitCode: 1}},
}
cases := []struct {
Batch bool
Input, Output []*structs.Allocation
}{
{
Input: []*structs.Allocation{running},
Output: []*structs.Allocation{running},
},
{
Input: []*structs.Allocation{running, desiredStop},
Output: []*structs.Allocation{running},
},
{
Batch: true,
Input: []*structs.Allocation{running},
Output: []*structs.Allocation{running},
},
{
Batch: true,
Input: []*structs.Allocation{new, oldSuccessful},
Output: []*structs.Allocation{new},
},
{
Batch: true,
Input: []*structs.Allocation{unsuccessful},
Output: []*structs.Allocation{},
},
}
for i, c := range cases {
g := &GenericScheduler{batch: c.Batch}
out := g.filterCompleteAllocs(c.Input)
if !reflect.DeepEqual(out, c.Output) {
t.Log("Got:")
for i, a := range out {
t.Logf("%d: %#v", i, a)
}
t.Log("Want:")
for i, a := range c.Output {
t.Logf("%d: %#v", i, a)
}
t.Fatalf("Case %d failed", i+1)
}
}
}

View file

@ -224,6 +224,11 @@ func (s *SystemScheduler) computeJobAllocs() error {
// Nothing remaining to do if placement is not required
if len(diff.place) == 0 {
if s.job != nil {
for _, tg := range s.job.TaskGroups {
s.queuedAllocs[tg.Name] = 0
}
}
return nil
}

View file

@ -756,6 +756,47 @@ func TestSystemSched_NodeDrain(t *testing.T) {
h.AssertEvalStatus(t, structs.EvalStatusComplete)
}
func TestSystemSched_NodeUpdate(t *testing.T) {
h := NewHarness(t)
// Register a node
node := mock.Node()
noErr(t, h.State.UpsertNode(h.NextIndex(), node))
// Generate a fake job allocated on that node.
job := mock.SystemJob()
noErr(t, h.State.UpsertJob(h.NextIndex(), job))
alloc := mock.Alloc()
alloc.Job = job
alloc.JobID = job.ID
alloc.NodeID = node.ID
alloc.Name = "my-job.web[0]"
noErr(t, h.State.UpsertAllocs(h.NextIndex(), []*structs.Allocation{alloc}))
// Create a mock evaluation to deal
eval := &structs.Evaluation{
ID: structs.GenerateUUID(),
Priority: 50,
TriggeredBy: structs.EvalTriggerNodeUpdate,
JobID: job.ID,
NodeID: node.ID,
}
// Process the evaluation
err := h.Process(NewSystemScheduler, eval)
if err != nil {
t.Fatalf("err: %v", err)
}
// Ensure that queued allocations is zero
if val, ok := h.Evals[0].QueuedAllocations["web"]; !ok || val != 0 {
t.Fatalf("bad queued allocations: %#v", h.Evals[0].QueuedAllocations)
}
h.AssertEvalStatus(t, structs.EvalStatusComplete)
}
func TestSystemSched_RetryLimit(t *testing.T) {
h := NewHarness(t)
h.Planner = &RejectPlan{h}