From 52e9946da995fbf493be1263dc40928fd5c06e2f Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Tue, 30 Aug 2016 15:36:30 -0700 Subject: [PATCH 1/3] Implemented SetPrefferingNodes in stack --- nomad/structs/funcs.go | 18 ++++- nomad/structs/funcs_test.go | 6 +- scheduler/context.go | 2 - scheduler/generic_sched.go | 47 ++++++++++-- scheduler/generic_sched_test.go | 125 ++++++++++++++++++++++++++++---- scheduler/stack.go | 13 ++++ scheduler/stack_test.go | 36 +++++++++ scheduler/system_sched.go | 4 +- scheduler/system_sched_test.go | 76 ++++++++++++++++++- scheduler/util.go | 16 +++- scheduler/util_test.go | 65 ++++++++++++++++- 11 files changed, 368 insertions(+), 40 deletions(-) diff --git a/nomad/structs/funcs.go b/nomad/structs/funcs.go index 08da1bd85..5822d22f9 100644 --- a/nomad/structs/funcs.go +++ b/nomad/structs/funcs.go @@ -28,17 +28,29 @@ func RemoveAllocs(alloc []*Allocation, remove []*Allocation) []*Allocation { return alloc } -// FilterTerminalAllocs filters out all allocations in a terminal state -func FilterTerminalAllocs(allocs []*Allocation) []*Allocation { +// FilterTerminalAllocs filters out all allocations in a terminal state and +// returns the terminal allocations +func FilterTerminalAllocs(allocs []*Allocation) ([]*Allocation, map[string]*Allocation) { + terminalAllocsByName := make(map[string]*Allocation) n := len(allocs) for i := 0; i < n; i++ { if allocs[i].TerminalStatus() { + + // Add the allocation to the terminal allocs map if it's not already + // added or has a higher create index than the one which is + // currently present. + alloc, ok := terminalAllocsByName[allocs[i].Name] + if !ok || alloc.CreateIndex < allocs[i].CreateIndex { + terminalAllocsByName[allocs[i].Name] = allocs[i] + } + + // Remove the allocation allocs[i], allocs[n-1] = allocs[n-1], nil i-- n-- } } - return allocs[:n] + return allocs[:n], terminalAllocsByName } // AllocsFit checks if a given set of allocations will fit on a node. diff --git a/nomad/structs/funcs_test.go b/nomad/structs/funcs_test.go index ad5d044e0..b0f1f2194 100644 --- a/nomad/structs/funcs_test.go +++ b/nomad/structs/funcs_test.go @@ -38,13 +38,17 @@ func TestFilterTerminalAllocs(t *testing.T) { }, } - out := FilterTerminalAllocs(l) + out, terminalAllocs := FilterTerminalAllocs(l) if len(out) != 1 { t.Fatalf("bad: %#v", out) } if out[0].ID != "foo" { t.Fatalf("bad: %#v", out) } + + if len(terminalAllocs) != 1 { + t.Fatalf("bad: %#v", terminalAllocs) + } } func TestAllocsFit_PortsOvercommitted(t *testing.T) { diff --git a/scheduler/context.go b/scheduler/context.go index 7c9c4bc25..5f3366f46 100644 --- a/scheduler/context.go +++ b/scheduler/context.go @@ -1,7 +1,6 @@ package scheduler import ( - "fmt" "log" "regexp" @@ -269,7 +268,6 @@ func (e *EvalEligibility) JobStatus(class string) ComputedClassFeasibility { // will not have a computed class. The safest value to return is the escaped // case, since it disables any optimization. if e.jobEscaped || class == "" { - fmt.Println(e.jobEscaped, class) return EvalComputedClassEscaped } diff --git a/scheduler/generic_sched.go b/scheduler/generic_sched.go index 40590c2bb..f16217d91 100644 --- a/scheduler/generic_sched.go +++ b/scheduler/generic_sched.go @@ -278,7 +278,7 @@ func (s *GenericScheduler) process() (bool, error) { // filterCompleteAllocs filters allocations that are terminal and should be // re-placed. -func (s *GenericScheduler) filterCompleteAllocs(allocs []*structs.Allocation) []*structs.Allocation { +func (s *GenericScheduler) filterCompleteAllocs(allocs []*structs.Allocation) ([]*structs.Allocation, map[string]*structs.Allocation) { filter := func(a *structs.Allocation) bool { if s.batch { // Allocs from batch jobs should be filtered when the desired status @@ -303,9 +303,20 @@ func (s *GenericScheduler) filterCompleteAllocs(allocs []*structs.Allocation) [] return a.TerminalStatus() } + terminalAllocsByName := make(map[string]*structs.Allocation) n := len(allocs) for i := 0; i < n; i++ { if filter(allocs[i]) { + + // Add the allocation to the terminal allocs map if it's not already + // added or has a higher create index than the one which is + // currently present. + alloc, ok := terminalAllocsByName[allocs[i].Name] + if !ok || alloc.CreateIndex < allocs[i].CreateIndex { + terminalAllocsByName[allocs[i].Name] = allocs[i] + } + + // Remove the allocation allocs[i], allocs[n-1] = allocs[n-1], nil i-- n-- @@ -330,7 +341,7 @@ func (s *GenericScheduler) filterCompleteAllocs(allocs []*structs.Allocation) [] } } - return filtered + return filtered, terminalAllocsByName } // computeJobAllocs is used to reconcile differences between the job, @@ -361,10 +372,10 @@ func (s *GenericScheduler) computeJobAllocs() error { updateNonTerminalAllocsToLost(s.plan, tainted, allocs) // Filter out the allocations in a terminal state - allocs = s.filterCompleteAllocs(allocs) + allocs, terminalAllocs := s.filterCompleteAllocs(allocs) // Diff the required and existing allocations - diff := diffAllocs(s.job, tainted, groups, allocs) + diff := diffAllocs(s.job, tainted, groups, allocs, terminalAllocs) s.logger.Printf("[DEBUG] sched: %#v: %#v", s.eval, diff) // Add all the allocs to stop @@ -435,8 +446,19 @@ func (s *GenericScheduler) computePlacements(place []allocTuple) error { continue } + // Find the preferred node + preferredNode, err := s.findPreferredNode(&missing) + if err != nil { + return err + } + // Attempt to match the task group - option, _ := s.stack.Select(missing.TaskGroup) + var option *RankedNode + if preferredNode != nil { + option, _ = s.stack.SelectPreferringNodes(missing.TaskGroup, []*structs.Node{preferredNode}) + } else { + option, _ = s.stack.Select(missing.TaskGroup) + } // Store the available nodes by datacenter s.ctx.Metrics().NodesAvailable = byDC @@ -480,3 +502,18 @@ func (s *GenericScheduler) computePlacements(place []allocTuple) error { return nil } + +// findPreferredNode finds the preferred node for an allocation +func (s *GenericScheduler) findPreferredNode(allocTuple *allocTuple) (node *structs.Node, err error) { + if allocTuple.Alloc != nil { + taskGroup := allocTuple.Alloc.Job.LookupTaskGroup(allocTuple.Alloc.TaskGroup) + if taskGroup == nil { + err = fmt.Errorf("can't find task group of existing allocation %q", allocTuple.Alloc.ID) + return + } + if taskGroup.LocalDisk.Sticky == true { + node, err = s.state.NodeByID(allocTuple.Alloc.NodeID) + } + } + return +} diff --git a/scheduler/generic_sched_test.go b/scheduler/generic_sched_test.go index 03fe89b11..f5e480802 100644 --- a/scheduler/generic_sched_test.go +++ b/scheduler/generic_sched_test.go @@ -91,6 +91,76 @@ func TestServiceSched_JobRegister(t *testing.T) { h.AssertEvalStatus(t, structs.EvalStatusComplete) } +func TestServiceSched_JobRegister_StickyAllocs(t *testing.T) { + h := NewHarness(t) + + // Create some nodes + for i := 0; i < 10; i++ { + node := mock.Node() + noErr(t, h.State.UpsertNode(h.NextIndex(), node)) + } + + // Create a job + job := mock.Job() + job.TaskGroups[0].LocalDisk.Sticky = true + noErr(t, h.State.UpsertJob(h.NextIndex(), job)) + + // Create a mock evaluation to register the job + eval := &structs.Evaluation{ + ID: structs.GenerateUUID(), + Priority: job.Priority, + TriggeredBy: structs.EvalTriggerJobRegister, + JobID: job.ID, + } + + // Process the evaluation + if err := h.Process(NewServiceScheduler, eval); err != nil { + t.Fatalf("err: %v", err) + } + + // Ensure the plan allocated + plan := h.Plans[0] + var planned []*structs.Allocation + for _, allocList := range plan.NodeAllocation { + planned = append(planned, allocList...) + } + if len(planned) != 10 { + t.Fatalf("bad: %#v", plan) + } + + // Get an allocation and mark it as failed + alloc := planned[4].Copy() + alloc.ClientStatus = structs.AllocClientStatusFailed + noErr(t, h.State.UpdateAllocsFromClient(h.NextIndex(), []*structs.Allocation{alloc})) + + // Create a mock evaluation to handle the update + eval = &structs.Evaluation{ + ID: structs.GenerateUUID(), + Priority: job.Priority, + TriggeredBy: structs.EvalTriggerNodeUpdate, + JobID: job.ID, + } + h1 := NewHarnessWithState(t, h.State) + if err := h1.Process(NewServiceScheduler, eval); err != nil { + t.Fatalf("err: %v", err) + } + + // Ensure we have created only one new allocation + plan = h1.Plans[0] + var newPlanned []*structs.Allocation + for _, allocList := range plan.NodeAllocation { + newPlanned = append(newPlanned, allocList...) + } + if len(newPlanned) != 1 { + t.Fatalf("bad plan: %#v", plan) + } + // Ensure that the new allocation was placed on the same node as the older + // one + if newPlanned[0].NodeID != alloc.NodeID || newPlanned[0].PreviousAllocation != alloc.ID { + t.Fatalf("expected: %#v, actual: %#v", alloc, newPlanned[0]) + } +} + func TestServiceSched_JobRegister_DiskConstraints(t *testing.T) { h := NewHarness(t) @@ -842,7 +912,7 @@ func TestServiceSched_JobModify(t *testing.T) { noErr(t, err) // Ensure all allocations placed - out = structs.FilterTerminalAllocs(out) + out, _ = structs.FilterTerminalAllocs(out) if len(out) != 10 { t.Fatalf("bad: %#v", out) } @@ -933,7 +1003,7 @@ func TestServiceSched_JobModify_IncrCount_NodeLimit(t *testing.T) { noErr(t, err) // Ensure all allocations placed - out = structs.FilterTerminalAllocs(out) + out, _ = structs.FilterTerminalAllocs(out) if len(out) != 3 { t.Fatalf("bad: %#v", out) } @@ -1029,7 +1099,7 @@ func TestServiceSched_JobModify_CountZero(t *testing.T) { noErr(t, err) // Ensure all allocations placed - out = structs.FilterTerminalAllocs(out) + out, _ = structs.FilterTerminalAllocs(out) if len(out) != 0 { t.Fatalf("bad: %#v", out) } @@ -1291,7 +1361,7 @@ func TestServiceSched_JobDeregister(t *testing.T) { } // Ensure no remaining allocations - out = structs.FilterTerminalAllocs(out) + out, _ = structs.FilterTerminalAllocs(out) if len(out) != 0 { t.Fatalf("bad: %#v", out) } @@ -1494,7 +1564,7 @@ func TestServiceSched_NodeDrain(t *testing.T) { noErr(t, err) // Ensure all allocations placed - out = structs.FilterTerminalAllocs(out) + out, _ = structs.FilterTerminalAllocs(out) if len(out) != 10 { t.Fatalf("bad: %#v", out) } @@ -2074,37 +2144,47 @@ func TestGenericSched_FilterCompleteAllocs(t *testing.T) { } cases := []struct { - Batch bool - Input, Output []*structs.Allocation + Batch bool + Input, Output []*structs.Allocation + TerminalAllocs map[string]*structs.Allocation }{ { - Input: []*structs.Allocation{running}, - Output: []*structs.Allocation{running}, + Input: []*structs.Allocation{running}, + Output: []*structs.Allocation{running}, + TerminalAllocs: map[string]*structs.Allocation{}, }, { Input: []*structs.Allocation{running, desiredStop}, Output: []*structs.Allocation{running}, + TerminalAllocs: map[string]*structs.Allocation{ + desiredStop.Name: desiredStop, + }, }, { - Batch: true, - Input: []*structs.Allocation{running}, - Output: []*structs.Allocation{running}, + Batch: true, + Input: []*structs.Allocation{running}, + Output: []*structs.Allocation{running}, + TerminalAllocs: map[string]*structs.Allocation{}, }, { - Batch: true, - Input: []*structs.Allocation{new, oldSuccessful}, - Output: []*structs.Allocation{new}, + Batch: true, + Input: []*structs.Allocation{new, oldSuccessful}, + Output: []*structs.Allocation{new}, + TerminalAllocs: map[string]*structs.Allocation{}, }, { Batch: true, Input: []*structs.Allocation{unsuccessful}, Output: []*structs.Allocation{}, + TerminalAllocs: map[string]*structs.Allocation{ + unsuccessful.Name: unsuccessful, + }, }, } for i, c := range cases { g := &GenericScheduler{batch: c.Batch} - out := g.filterCompleteAllocs(c.Input) + out, terminalAllocs := g.filterCompleteAllocs(c.Input) if !reflect.DeepEqual(out, c.Output) { t.Log("Got:") @@ -2117,6 +2197,19 @@ func TestGenericSched_FilterCompleteAllocs(t *testing.T) { } t.Fatalf("Case %d failed", i+1) } + + if !reflect.DeepEqual(terminalAllocs, c.TerminalAllocs) { + t.Log("Got:") + for n, a := range terminalAllocs { + t.Logf("%v: %#v", n, a) + } + t.Log("Want:") + for n, a := range c.TerminalAllocs { + t.Logf("%v: %#v", n, a) + } + t.Fatalf("Case %d failed", i+1) + } + } } diff --git a/scheduler/stack.go b/scheduler/stack.go index bea8c91e6..fa76cbe21 100644 --- a/scheduler/stack.go +++ b/scheduler/stack.go @@ -171,6 +171,19 @@ func (s *GenericStack) Select(tg *structs.TaskGroup) (*RankedNode, *structs.Reso return option, tgConstr.size } +// SelectPreferredNode returns a node where an allocation of the task group can +// be placed, the node passed to it is preferred over the other available nodes +func (s *GenericStack) SelectPreferringNodes(tg *structs.TaskGroup, nodes []*structs.Node) (*RankedNode, *structs.Resources) { + originalNodes := s.source.nodes + s.source.SetNodes(nodes) + if option, resources := s.Select(tg); option != nil { + s.source.SetNodes(originalNodes) + return option, resources + } + s.source.SetNodes(originalNodes) + return s.Select(tg) +} + // SystemStack is the Stack used for the System scheduler. It is designed to // attempt to make placements on all nodes. type SystemStack struct { diff --git a/scheduler/stack_test.go b/scheduler/stack_test.go index dfab71e04..e94b8c9ec 100644 --- a/scheduler/stack_test.go +++ b/scheduler/stack_test.go @@ -125,6 +125,42 @@ func TestServiceStack_Select_Size(t *testing.T) { } } +func TestServiceStack_Select_PreferringNodes(t *testing.T) { + _, ctx := testContext(t) + nodes := []*structs.Node{ + mock.Node(), + } + stack := NewGenericStack(false, ctx) + stack.SetNodes(nodes) + + job := mock.Job() + stack.SetJob(job) + + // Create a preferred node + preferredNode := mock.Node() + option, _ := stack.SelectPreferringNodes(job.TaskGroups[0], []*structs.Node{preferredNode}) + if option == nil { + t.Fatalf("missing node %#v", ctx.Metrics()) + } + if option.Node.ID != preferredNode.ID { + t.Fatalf("expected: %v, actual: %v", option.Node.ID, preferredNode.ID) + } + + // Change the preferred node's kernel to windows and ensure the allocations + // are placed elsewhere + preferredNode1 := preferredNode.Copy() + preferredNode1.Attributes["kernel.name"] = "windows" + preferredNode1.ComputeClass() + option, _ = stack.SelectPreferringNodes(job.TaskGroups[0], []*structs.Node{preferredNode1}) + if option == nil { + t.Fatalf("missing node %#v", ctx.Metrics()) + } + + if option.Node.ID != nodes[0].ID { + t.Fatalf("expected: %#v, actual: %#v", nodes[0], option.Node) + } +} + func TestServiceStack_Select_MetricsReset(t *testing.T) { _, ctx := testContext(t) nodes := []*structs.Node{ diff --git a/scheduler/system_sched.go b/scheduler/system_sched.go index 098560e35..4f7c16919 100644 --- a/scheduler/system_sched.go +++ b/scheduler/system_sched.go @@ -196,10 +196,10 @@ func (s *SystemScheduler) computeJobAllocs() error { updateNonTerminalAllocsToLost(s.plan, tainted, allocs) // Filter out the allocations in a terminal state - allocs = structs.FilterTerminalAllocs(allocs) + allocs, terminalAllocs := structs.FilterTerminalAllocs(allocs) // Diff the required and existing allocations - diff := diffSystemAllocs(s.job, s.nodes, tainted, allocs) + diff := diffSystemAllocs(s.job, s.nodes, tainted, allocs, terminalAllocs) s.logger.Printf("[DEBUG] sched: %#v: %#v", s.eval, diff) // Add all the allocs to stop diff --git a/scheduler/system_sched_test.go b/scheduler/system_sched_test.go index 043fbe854..664bfe609 100644 --- a/scheduler/system_sched_test.go +++ b/scheduler/system_sched_test.go @@ -80,6 +80,76 @@ func TestSystemSched_JobRegister(t *testing.T) { h.AssertEvalStatus(t, structs.EvalStatusComplete) } +func TestSystemeSched_JobRegister_StickyAllocs(t *testing.T) { + h := NewHarness(t) + + // Create some nodes + for i := 0; i < 10; i++ { + node := mock.Node() + noErr(t, h.State.UpsertNode(h.NextIndex(), node)) + } + + // Create a job + job := mock.SystemJob() + job.TaskGroups[0].LocalDisk.Sticky = true + noErr(t, h.State.UpsertJob(h.NextIndex(), job)) + + // Create a mock evaluation to register the job + eval := &structs.Evaluation{ + ID: structs.GenerateUUID(), + Priority: job.Priority, + TriggeredBy: structs.EvalTriggerJobRegister, + JobID: job.ID, + } + + // Process the evaluation + if err := h.Process(NewSystemScheduler, eval); err != nil { + t.Fatalf("err: %v", err) + } + + // Ensure the plan allocated + plan := h.Plans[0] + var planned []*structs.Allocation + for _, allocList := range plan.NodeAllocation { + planned = append(planned, allocList...) + } + if len(planned) != 10 { + t.Fatalf("bad: %#v", plan) + } + + // Get an allocation and mark it as failed + alloc := planned[4].Copy() + alloc.ClientStatus = structs.AllocClientStatusFailed + noErr(t, h.State.UpdateAllocsFromClient(h.NextIndex(), []*structs.Allocation{alloc})) + + // Create a mock evaluation to handle the update + eval = &structs.Evaluation{ + ID: structs.GenerateUUID(), + Priority: job.Priority, + TriggeredBy: structs.EvalTriggerNodeUpdate, + JobID: job.ID, + } + h1 := NewHarnessWithState(t, h.State) + if err := h1.Process(NewSystemScheduler, eval); err != nil { + t.Fatalf("err: %v", err) + } + + // Ensure we have created only one new allocation + plan = h1.Plans[0] + var newPlanned []*structs.Allocation + for _, allocList := range plan.NodeAllocation { + newPlanned = append(newPlanned, allocList...) + } + if len(newPlanned) != 1 { + t.Fatalf("bad plan: %#v", plan) + } + // Ensure that the new allocation was placed on the same node as the older + // one + if newPlanned[0].NodeID != alloc.NodeID || newPlanned[0].PreviousAllocation != alloc.ID { + t.Fatalf("expected: %#v, actual: %#v", alloc, newPlanned[0]) + } +} + func TestSystemSched_JobRegister_LocalDiskConstraint(t *testing.T) { h := NewHarness(t) @@ -364,7 +434,7 @@ func TestSystemSched_JobRegister_AddNode(t *testing.T) { noErr(t, err) // Ensure all allocations placed - out = structs.FilterTerminalAllocs(out) + out, _ = structs.FilterTerminalAllocs(out) if len(out) != 11 { t.Fatalf("bad: %#v", out) } @@ -492,7 +562,7 @@ func TestSystemSched_JobModify(t *testing.T) { noErr(t, err) // Ensure all allocations placed - out = structs.FilterTerminalAllocs(out) + out, _ = structs.FilterTerminalAllocs(out) if len(out) != 10 { t.Fatalf("bad: %#v", out) } @@ -756,7 +826,7 @@ func TestSystemSched_JobDeregister(t *testing.T) { noErr(t, err) // Ensure no remaining allocations - out = structs.FilterTerminalAllocs(out) + out, _ = structs.FilterTerminalAllocs(out) if len(out) != 0 { t.Fatalf("bad: %#v", out) } diff --git a/scheduler/util.go b/scheduler/util.go index 0629686d5..6a85f5dc8 100644 --- a/scheduler/util.go +++ b/scheduler/util.go @@ -60,7 +60,8 @@ func (d *diffResult) Append(other *diffResult) { // (no longer required), those that should be ignored and those that are lost // that need to be replaced (running on a lost node). func diffAllocs(job *structs.Job, taintedNodes map[string]*structs.Node, - required map[string]*structs.TaskGroup, allocs []*structs.Allocation) *diffResult { + required map[string]*structs.TaskGroup, allocs []*structs.Allocation, + terminalAllocs map[string]*structs.Allocation) *diffResult { result := &diffResult{} // Scan the existing updates @@ -143,6 +144,7 @@ func diffAllocs(job *structs.Job, taintedNodes map[string]*structs.Node, result.place = append(result.place, allocTuple{ Name: name, TaskGroup: tg, + Alloc: terminalAllocs[name], }) } } @@ -152,7 +154,7 @@ func diffAllocs(job *structs.Job, taintedNodes map[string]*structs.Node, // diffSystemAllocs is like diffAllocs however, the allocations in the // diffResult contain the specific nodeID they should be allocated on. func diffSystemAllocs(job *structs.Job, nodes []*structs.Node, taintedNodes map[string]*structs.Node, - allocs []*structs.Allocation) *diffResult { + allocs []*structs.Allocation, terminalAllocs map[string]*structs.Allocation) *diffResult { // Build a mapping of nodes to all their allocs. nodeAllocs := make(map[string][]*structs.Allocation, len(allocs)) @@ -172,12 +174,18 @@ func diffSystemAllocs(job *structs.Job, nodes []*structs.Node, taintedNodes map[ result := &diffResult{} for nodeID, allocs := range nodeAllocs { - diff := diffAllocs(job, taintedNodes, required, allocs) + diff := diffAllocs(job, taintedNodes, required, allocs, terminalAllocs) // Mark the alloc as being for a specific node. for i := range diff.place { alloc := &diff.place[i] - alloc.Alloc = &structs.Allocation{NodeID: nodeID} + + // If the new allocation isn't annotated with a previous allocation + // or if the previous allocation isn't from the same node then we + // annotate the allocTuple with a new Allocation + if alloc.Alloc == nil || alloc.Alloc.NodeID != nodeID { + alloc.Alloc = &structs.Allocation{NodeID: nodeID} + } } // Migrate does not apply to system jobs and instead should be marked as diff --git a/scheduler/util_test.go b/scheduler/util_test.go index a63858613..2a8e11cdb 100644 --- a/scheduler/util_test.go +++ b/scheduler/util_test.go @@ -99,7 +99,29 @@ func TestDiffAllocs(t *testing.T) { }, } - diff := diffAllocs(job, tainted, required, allocs) + // Have three terminal allocs + terminalAllocs := map[string]*structs.Allocation{ + "my-job.web[4]": &structs.Allocation{ + ID: structs.GenerateUUID(), + NodeID: "zip", + Name: "my-job.web[4]", + Job: job, + }, + "my-job.web[5]": &structs.Allocation{ + ID: structs.GenerateUUID(), + NodeID: "zip", + Name: "my-job.web[5]", + Job: job, + }, + "my-job.web[6]": &structs.Allocation{ + ID: structs.GenerateUUID(), + NodeID: "zip", + Name: "my-job.web[6]", + Job: job, + }, + } + + diff := diffAllocs(job, tainted, required, allocs, terminalAllocs) place := diff.place update := diff.update migrate := diff.migrate @@ -136,13 +158,26 @@ func TestDiffAllocs(t *testing.T) { if len(place) != 6 { t.Fatalf("bad: %#v", place) } + + // Ensure that the allocations which are replacements of terminal allocs are + // annotated + for name, alloc := range terminalAllocs { + for _, allocTuple := range diff.place { + if name == allocTuple.Name { + if !reflect.DeepEqual(alloc, allocTuple.Alloc) { + t.Fatalf("expected: %#v, actual: %#v", alloc, allocTuple.Alloc) + } + } + } + } } func TestDiffSystemAllocs(t *testing.T) { job := mock.SystemJob() // Create three alive nodes. - nodes := []*structs.Node{{ID: "foo"}, {ID: "bar"}, {ID: "baz"}} + nodes := []*structs.Node{{ID: "foo"}, {ID: "bar"}, {ID: "baz"}, + {ID: "pipe"}} // The "old" job has a previous modify index oldJob := new(structs.Job) @@ -193,7 +228,17 @@ func TestDiffSystemAllocs(t *testing.T) { }, } - diff := diffSystemAllocs(job, nodes, tainted, allocs) + // Have three terminal allocs + terminalAllocs := map[string]*structs.Allocation{ + "my-job.web[0]": &structs.Allocation{ + ID: structs.GenerateUUID(), + NodeID: "pipe", + Name: "my-job.web[0]", + Job: job, + }, + } + + diff := diffSystemAllocs(job, nodes, tainted, allocs, terminalAllocs) place := diff.place update := diff.update migrate := diff.migrate @@ -227,9 +272,21 @@ func TestDiffSystemAllocs(t *testing.T) { } // We should place 1 - if len(place) != 1 { + if len(place) != 2 { t.Fatalf("bad: %#v", place) } + + // Ensure that the allocations which are replacements of terminal allocs are + // annotated + for _, alloc := range terminalAllocs { + for _, allocTuple := range diff.place { + if alloc.NodeID == allocTuple.Alloc.NodeID { + if !reflect.DeepEqual(alloc, allocTuple.Alloc) { + t.Fatalf("expected: %#v, actual: %#v", alloc, allocTuple.Alloc) + } + } + } + } } func TestReadyNodesInDCs(t *testing.T) { From 64c57d9136dcc80a5102d2889a98691fea8cad10 Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Wed, 31 Aug 2016 13:40:43 -0700 Subject: [PATCH 2/3] Added a test --- nomad/structs/funcs_test.go | 26 ++++++++++++++++++++++++-- 1 file changed, 24 insertions(+), 2 deletions(-) diff --git a/nomad/structs/funcs_test.go b/nomad/structs/funcs_test.go index b0f1f2194..36e30f6e4 100644 --- a/nomad/structs/funcs_test.go +++ b/nomad/structs/funcs_test.go @@ -1,6 +1,7 @@ package structs import ( + "fmt" "regexp" "testing" ) @@ -24,7 +25,11 @@ func TestRemoveAllocs(t *testing.T) { func TestFilterTerminalAllocs(t *testing.T) { l := []*Allocation{ - &Allocation{ID: "bar", DesiredStatus: AllocDesiredStatusEvict}, + &Allocation{ + ID: "bar", + Name: "myname1", + DesiredStatus: AllocDesiredStatusEvict, + }, &Allocation{ID: "baz", DesiredStatus: AllocDesiredStatusStop}, &Allocation{ ID: "foo", @@ -33,8 +38,17 @@ func TestFilterTerminalAllocs(t *testing.T) { }, &Allocation{ ID: "bam", + Name: "myname", DesiredStatus: AllocDesiredStatusRun, ClientStatus: AllocClientStatusComplete, + CreateIndex: 5, + }, + &Allocation{ + ID: "lol", + Name: "myname", + DesiredStatus: AllocDesiredStatusRun, + ClientStatus: AllocClientStatusComplete, + CreateIndex: 2, }, } @@ -46,9 +60,17 @@ func TestFilterTerminalAllocs(t *testing.T) { t.Fatalf("bad: %#v", out) } - if len(terminalAllocs) != 1 { + if len(terminalAllocs) != 3 { + for _, o := range terminalAllocs { + fmt.Printf("%#v \n", o) + } + t.Fatalf("bad: %#v", terminalAllocs) } + + if terminalAllocs["myname"].ID != "bam" { + t.Fatalf("bad: %#v", terminalAllocs["myname"]) + } } func TestAllocsFit_PortsOvercommitted(t *testing.T) { From d94bb45ad3e19d074952af779054fe395b136891 Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Wed, 31 Aug 2016 14:06:31 -0700 Subject: [PATCH 3/3] Added some more comments --- nomad/structs/funcs.go | 2 +- scheduler/util.go | 14 ++++++++++++++ 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/nomad/structs/funcs.go b/nomad/structs/funcs.go index 5822d22f9..c4aaef7f5 100644 --- a/nomad/structs/funcs.go +++ b/nomad/structs/funcs.go @@ -29,7 +29,7 @@ func RemoveAllocs(alloc []*Allocation, remove []*Allocation) []*Allocation { } // FilterTerminalAllocs filters out all allocations in a terminal state and -// returns the terminal allocations +// returns the latest terminal allocations func FilterTerminalAllocs(allocs []*Allocation) ([]*Allocation, map[string]*Allocation) { terminalAllocsByName := make(map[string]*Allocation) n := len(allocs) diff --git a/scheduler/util.go b/scheduler/util.go index 6a85f5dc8..6218c1c9a 100644 --- a/scheduler/util.go +++ b/scheduler/util.go @@ -59,6 +59,13 @@ func (d *diffResult) Append(other *diffResult) { // need to be migrated (node is draining), the allocs that need to be evicted // (no longer required), those that should be ignored and those that are lost // that need to be replaced (running on a lost node). +// +// job is the job whose allocs is going to be diff-ed. +// taintedNodes is an index of the nodes which are either down or in drain mode +// by name. +// required is a set of allocations that must exist. +// allocs is a list of non terminal allocations. +// terminalAllocs is an index of the latest terminal allocations by name. func diffAllocs(job *structs.Job, taintedNodes map[string]*structs.Node, required map[string]*structs.TaskGroup, allocs []*structs.Allocation, terminalAllocs map[string]*structs.Allocation) *diffResult { @@ -153,6 +160,13 @@ func diffAllocs(job *structs.Job, taintedNodes map[string]*structs.Node, // diffSystemAllocs is like diffAllocs however, the allocations in the // diffResult contain the specific nodeID they should be allocated on. +// +// job is the job whose allocs is going to be diff-ed. +// nodes is a list of nodes in ready state. +// taintedNodes is an index of the nodes which are either down or in drain mode +// by name. +// allocs is a list of non terminal allocations. +// terminalAllocs is an index of the latest terminal allocations by name. func diffSystemAllocs(job *structs.Job, nodes []*structs.Node, taintedNodes map[string]*structs.Node, allocs []*structs.Allocation, terminalAllocs map[string]*structs.Allocation) *diffResult {