From c132952ba2104353490caa23658512bd5187c472 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Wed, 27 Jul 2016 11:54:55 -0700 Subject: [PATCH 1/2] filterCompleteAllocs filters replaced batch allocs --- scheduler/generic_sched.go | 43 +++++++++---------- scheduler/generic_sched_test.go | 74 +++++++++++++++++++++++++++++++++ 2 files changed, 94 insertions(+), 23 deletions(-) diff --git a/scheduler/generic_sched.go b/scheduler/generic_sched.go index e6cf2465c..830e4b356 100644 --- a/scheduler/generic_sched.go +++ b/scheduler/generic_sched.go @@ -4,7 +4,6 @@ import ( "fmt" "log" - "github.com/davecgh/go-spew/spew" "github.com/hashicorp/go-multierror" "github.com/hashicorp/nomad/nomad/structs" ) @@ -298,7 +297,26 @@ func (s *GenericScheduler) filterCompleteAllocs(allocs []*structs.Allocation) [] n-- } } - return allocs[:n] + + // If the job is batch, we want to filter allocations that have been + // replaced by a newer version for the same task group. + filtered := allocs[:n] + if s.batch { + byTG := make(map[string]*structs.Allocation) + for _, alloc := range filtered { + existing := byTG[alloc.TaskGroup] + if existing == nil || existing.CreateIndex < alloc.CreateIndex { + byTG[alloc.TaskGroup] = alloc + } + } + + filtered = make([]*structs.Allocation, 0, len(byTG)) + for _, alloc := range byTG { + filtered = append(filtered, alloc) + } + } + + return filtered } // computeJobAllocs is used to reconcile differences between the job, @@ -331,27 +349,6 @@ func (s *GenericScheduler) computeJobAllocs() error { diff := diffAllocs(s.job, tainted, groups, allocs) s.logger.Printf("[DEBUG] sched: %#v: %#v", s.eval, diff) - // XXX: For debugging purposes only. An issue was observed where a job had a - // task group with count > 0 that produced a diff where no action would be - // taken (every slice was empty). Below we dump debug information if this - // condition is hit. - diffSum := len(diff.stop) + len(diff.place) + len(diff.ignore) + - len(diff.update) + len(diff.migrate) - if diffSum == 0 && len(groups) != 0 { - s.logger.Printf("[ERR] sched: %d tasks to schedule but scheduler believes there is no work", len(groups)) - - // Get the original set of allocations for the job. - jobAllocs, err := s.state.AllocsByJob(s.eval.JobID) - if err != nil { - return fmt.Errorf("failed to get allocs for job '%s': %v", s.eval.JobID, err) - } - s.logger.Printf("[DEBUG] sched: job: %s", spew.Sdump(s.job)) - s.logger.Printf("[DEBUG] sched: materializeTaskGroups() returned: %s", spew.Sdump(groups)) - s.logger.Printf("[DEBUG] sched: AllocsByJob(%q) returned: %s", s.eval.JobID, spew.Sdump(jobAllocs)) - s.logger.Printf("[DEBUG] sched: filterCompleteAllocs(): %s", spew.Sdump(allocs)) - s.logger.Printf("[DEBUG] sched: taintedNodes(): %s", spew.Sdump(tainted)) - } - // Add all the allocs to stop for _, e := range diff.stop { s.plan.AppendUpdate(e.Alloc, structs.AllocDesiredStatusStop, allocNotNeeded) diff --git a/scheduler/generic_sched_test.go b/scheduler/generic_sched_test.go index daf85869e..9a9bc015c 100644 --- a/scheduler/generic_sched_test.go +++ b/scheduler/generic_sched_test.go @@ -1579,3 +1579,77 @@ func TestBatchSched_ReRun_SuccessfullyFinishedAlloc(t *testing.T) { h.AssertEvalStatus(t, structs.EvalStatusComplete) } + +func TestGenericSched_FilterCompleteAllocs(t *testing.T) { + running := mock.Alloc() + desiredStop := mock.Alloc() + desiredStop.DesiredStatus = structs.AllocDesiredStatusStop + + new := mock.Alloc() + new.CreateIndex = 10000 + + oldSuccessful := mock.Alloc() + oldSuccessful.CreateIndex = 30 + oldSuccessful.DesiredStatus = structs.AllocDesiredStatusStop + oldSuccessful.ClientStatus = structs.AllocClientStatusComplete + oldSuccessful.TaskStates = make(map[string]*structs.TaskState, 1) + oldSuccessful.TaskStates["foo"] = &structs.TaskState{ + State: structs.TaskStateDead, + Events: []*structs.TaskEvent{{Type: structs.TaskTerminated, ExitCode: 0}}, + } + + unsuccessful := mock.Alloc() + unsuccessful.DesiredStatus = structs.AllocDesiredStatusRun + unsuccessful.ClientStatus = structs.AllocClientStatusFailed + unsuccessful.TaskStates = make(map[string]*structs.TaskState, 1) + unsuccessful.TaskStates["foo"] = &structs.TaskState{ + State: structs.TaskStateDead, + Events: []*structs.TaskEvent{{Type: structs.TaskTerminated, ExitCode: 1}}, + } + + cases := []struct { + Batch bool + Input, Output []*structs.Allocation + }{ + { + Input: []*structs.Allocation{running}, + Output: []*structs.Allocation{running}, + }, + { + Input: []*structs.Allocation{running, desiredStop}, + Output: []*structs.Allocation{running}, + }, + { + Batch: true, + Input: []*structs.Allocation{running}, + Output: []*structs.Allocation{running}, + }, + { + Batch: true, + Input: []*structs.Allocation{new, oldSuccessful}, + Output: []*structs.Allocation{new}, + }, + { + Batch: true, + Input: []*structs.Allocation{unsuccessful}, + Output: []*structs.Allocation{}, + }, + } + + for i, c := range cases { + g := &GenericScheduler{batch: c.Batch} + out := g.filterCompleteAllocs(c.Input) + + if !reflect.DeepEqual(out, c.Output) { + t.Log("Got:") + for i, a := range out { + t.Logf("%d: %#v", i, a) + } + t.Log("Want:") + for i, a := range c.Output { + t.Logf("%d: %#v", i, a) + } + t.Fatalf("Case %d failed", i+1) + } + } +} From eb0840546734475cf02d00e8434f06c7b648abde Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Thu, 28 Jul 2016 14:02:50 -0700 Subject: [PATCH 2/2] Updated tests and added logic to system sched --- scheduler/generic_sched_test.go | 2 +- scheduler/system_sched.go | 5 ++++ scheduler/system_sched_test.go | 41 +++++++++++++++++++++++++++++++++ 3 files changed, 47 insertions(+), 1 deletion(-) diff --git a/scheduler/generic_sched_test.go b/scheduler/generic_sched_test.go index 69e72e69d..df1c63a16 100644 --- a/scheduler/generic_sched_test.go +++ b/scheduler/generic_sched_test.go @@ -1337,7 +1337,7 @@ func TestServiceSched_NodeUpdate(t *testing.T) { if err != nil { t.Fatalf("err: %v", err) } - if h.Evals[0].QueuedAllocations["web"] != 0 { + if val, ok := h.Evals[0].QueuedAllocations["web"]; !ok || val != 0 { t.Fatalf("bad queued allocations: %v", h.Evals[0].QueuedAllocations) } diff --git a/scheduler/system_sched.go b/scheduler/system_sched.go index 812a8b16e..7994efff7 100644 --- a/scheduler/system_sched.go +++ b/scheduler/system_sched.go @@ -224,6 +224,11 @@ func (s *SystemScheduler) computeJobAllocs() error { // Nothing remaining to do if placement is not required if len(diff.place) == 0 { + if s.job != nil { + for _, tg := range s.job.TaskGroups { + s.queuedAllocs[tg.Name] = 0 + } + } return nil } diff --git a/scheduler/system_sched_test.go b/scheduler/system_sched_test.go index 46865cccf..f273e0c7d 100644 --- a/scheduler/system_sched_test.go +++ b/scheduler/system_sched_test.go @@ -756,6 +756,47 @@ func TestSystemSched_NodeDrain(t *testing.T) { h.AssertEvalStatus(t, structs.EvalStatusComplete) } +func TestSystemSched_NodeUpdate(t *testing.T) { + h := NewHarness(t) + + // Register a node + node := mock.Node() + noErr(t, h.State.UpsertNode(h.NextIndex(), node)) + + // Generate a fake job allocated on that node. + job := mock.SystemJob() + noErr(t, h.State.UpsertJob(h.NextIndex(), job)) + + alloc := mock.Alloc() + alloc.Job = job + alloc.JobID = job.ID + alloc.NodeID = node.ID + alloc.Name = "my-job.web[0]" + noErr(t, h.State.UpsertAllocs(h.NextIndex(), []*structs.Allocation{alloc})) + + // Create a mock evaluation to deal + eval := &structs.Evaluation{ + ID: structs.GenerateUUID(), + Priority: 50, + TriggeredBy: structs.EvalTriggerNodeUpdate, + JobID: job.ID, + NodeID: node.ID, + } + + // Process the evaluation + err := h.Process(NewSystemScheduler, eval) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Ensure that queued allocations is zero + if val, ok := h.Evals[0].QueuedAllocations["web"]; !ok || val != 0 { + t.Fatalf("bad queued allocations: %#v", h.Evals[0].QueuedAllocations) + } + + h.AssertEvalStatus(t, structs.EvalStatusComplete) +} + func TestSystemSched_RetryLimit(t *testing.T) { h := NewHarness(t) h.Planner = &RejectPlan{h}