Merge pull request #1667 from hashicorp/f-sticky-scheduler

Implemented SetPrefferingNodes in stack
This commit is contained in:
Diptanu Choudhury 2016-08-31 14:06:59 -07:00 committed by GitHub
commit 2cb81fb9be
11 changed files with 405 additions and 41 deletions

View file

@ -28,17 +28,29 @@ func RemoveAllocs(alloc []*Allocation, remove []*Allocation) []*Allocation {
return alloc return alloc
} }
// FilterTerminalAllocs filters out all allocations in a terminal state // FilterTerminalAllocs filters out all allocations in a terminal state and
func FilterTerminalAllocs(allocs []*Allocation) []*Allocation { // returns the latest terminal allocations
func FilterTerminalAllocs(allocs []*Allocation) ([]*Allocation, map[string]*Allocation) {
terminalAllocsByName := make(map[string]*Allocation)
n := len(allocs) n := len(allocs)
for i := 0; i < n; i++ { for i := 0; i < n; i++ {
if allocs[i].TerminalStatus() { if allocs[i].TerminalStatus() {
// Add the allocation to the terminal allocs map if it's not already
// added or has a higher create index than the one which is
// currently present.
alloc, ok := terminalAllocsByName[allocs[i].Name]
if !ok || alloc.CreateIndex < allocs[i].CreateIndex {
terminalAllocsByName[allocs[i].Name] = allocs[i]
}
// Remove the allocation
allocs[i], allocs[n-1] = allocs[n-1], nil allocs[i], allocs[n-1] = allocs[n-1], nil
i-- i--
n-- n--
} }
} }
return allocs[:n] return allocs[:n], terminalAllocsByName
} }
// AllocsFit checks if a given set of allocations will fit on a node. // AllocsFit checks if a given set of allocations will fit on a node.

View file

@ -1,6 +1,7 @@
package structs package structs
import ( import (
"fmt"
"regexp" "regexp"
"testing" "testing"
) )
@ -24,7 +25,11 @@ func TestRemoveAllocs(t *testing.T) {
func TestFilterTerminalAllocs(t *testing.T) { func TestFilterTerminalAllocs(t *testing.T) {
l := []*Allocation{ l := []*Allocation{
&Allocation{ID: "bar", DesiredStatus: AllocDesiredStatusEvict}, &Allocation{
ID: "bar",
Name: "myname1",
DesiredStatus: AllocDesiredStatusEvict,
},
&Allocation{ID: "baz", DesiredStatus: AllocDesiredStatusStop}, &Allocation{ID: "baz", DesiredStatus: AllocDesiredStatusStop},
&Allocation{ &Allocation{
ID: "foo", ID: "foo",
@ -33,18 +38,39 @@ func TestFilterTerminalAllocs(t *testing.T) {
}, },
&Allocation{ &Allocation{
ID: "bam", ID: "bam",
Name: "myname",
DesiredStatus: AllocDesiredStatusRun, DesiredStatus: AllocDesiredStatusRun,
ClientStatus: AllocClientStatusComplete, ClientStatus: AllocClientStatusComplete,
CreateIndex: 5,
},
&Allocation{
ID: "lol",
Name: "myname",
DesiredStatus: AllocDesiredStatusRun,
ClientStatus: AllocClientStatusComplete,
CreateIndex: 2,
}, },
} }
out := FilterTerminalAllocs(l) out, terminalAllocs := FilterTerminalAllocs(l)
if len(out) != 1 { if len(out) != 1 {
t.Fatalf("bad: %#v", out) t.Fatalf("bad: %#v", out)
} }
if out[0].ID != "foo" { if out[0].ID != "foo" {
t.Fatalf("bad: %#v", out) t.Fatalf("bad: %#v", out)
} }
if len(terminalAllocs) != 3 {
for _, o := range terminalAllocs {
fmt.Printf("%#v \n", o)
}
t.Fatalf("bad: %#v", terminalAllocs)
}
if terminalAllocs["myname"].ID != "bam" {
t.Fatalf("bad: %#v", terminalAllocs["myname"])
}
} }
func TestAllocsFit_PortsOvercommitted(t *testing.T) { func TestAllocsFit_PortsOvercommitted(t *testing.T) {

View file

@ -1,7 +1,6 @@
package scheduler package scheduler
import ( import (
"fmt"
"log" "log"
"regexp" "regexp"
@ -269,7 +268,6 @@ func (e *EvalEligibility) JobStatus(class string) ComputedClassFeasibility {
// will not have a computed class. The safest value to return is the escaped // will not have a computed class. The safest value to return is the escaped
// case, since it disables any optimization. // case, since it disables any optimization.
if e.jobEscaped || class == "" { if e.jobEscaped || class == "" {
fmt.Println(e.jobEscaped, class)
return EvalComputedClassEscaped return EvalComputedClassEscaped
} }

View file

@ -278,7 +278,7 @@ func (s *GenericScheduler) process() (bool, error) {
// filterCompleteAllocs filters allocations that are terminal and should be // filterCompleteAllocs filters allocations that are terminal and should be
// re-placed. // re-placed.
func (s *GenericScheduler) filterCompleteAllocs(allocs []*structs.Allocation) []*structs.Allocation { func (s *GenericScheduler) filterCompleteAllocs(allocs []*structs.Allocation) ([]*structs.Allocation, map[string]*structs.Allocation) {
filter := func(a *structs.Allocation) bool { filter := func(a *structs.Allocation) bool {
if s.batch { if s.batch {
// Allocs from batch jobs should be filtered when the desired status // Allocs from batch jobs should be filtered when the desired status
@ -303,9 +303,20 @@ func (s *GenericScheduler) filterCompleteAllocs(allocs []*structs.Allocation) []
return a.TerminalStatus() return a.TerminalStatus()
} }
terminalAllocsByName := make(map[string]*structs.Allocation)
n := len(allocs) n := len(allocs)
for i := 0; i < n; i++ { for i := 0; i < n; i++ {
if filter(allocs[i]) { if filter(allocs[i]) {
// Add the allocation to the terminal allocs map if it's not already
// added or has a higher create index than the one which is
// currently present.
alloc, ok := terminalAllocsByName[allocs[i].Name]
if !ok || alloc.CreateIndex < allocs[i].CreateIndex {
terminalAllocsByName[allocs[i].Name] = allocs[i]
}
// Remove the allocation
allocs[i], allocs[n-1] = allocs[n-1], nil allocs[i], allocs[n-1] = allocs[n-1], nil
i-- i--
n-- n--
@ -330,7 +341,7 @@ func (s *GenericScheduler) filterCompleteAllocs(allocs []*structs.Allocation) []
} }
} }
return filtered return filtered, terminalAllocsByName
} }
// computeJobAllocs is used to reconcile differences between the job, // computeJobAllocs is used to reconcile differences between the job,
@ -361,10 +372,10 @@ func (s *GenericScheduler) computeJobAllocs() error {
updateNonTerminalAllocsToLost(s.plan, tainted, allocs) updateNonTerminalAllocsToLost(s.plan, tainted, allocs)
// Filter out the allocations in a terminal state // Filter out the allocations in a terminal state
allocs = s.filterCompleteAllocs(allocs) allocs, terminalAllocs := s.filterCompleteAllocs(allocs)
// Diff the required and existing allocations // Diff the required and existing allocations
diff := diffAllocs(s.job, tainted, groups, allocs) diff := diffAllocs(s.job, tainted, groups, allocs, terminalAllocs)
s.logger.Printf("[DEBUG] sched: %#v: %#v", s.eval, diff) s.logger.Printf("[DEBUG] sched: %#v: %#v", s.eval, diff)
// Add all the allocs to stop // Add all the allocs to stop
@ -435,8 +446,19 @@ func (s *GenericScheduler) computePlacements(place []allocTuple) error {
continue continue
} }
// Find the preferred node
preferredNode, err := s.findPreferredNode(&missing)
if err != nil {
return err
}
// Attempt to match the task group // Attempt to match the task group
option, _ := s.stack.Select(missing.TaskGroup) var option *RankedNode
if preferredNode != nil {
option, _ = s.stack.SelectPreferringNodes(missing.TaskGroup, []*structs.Node{preferredNode})
} else {
option, _ = s.stack.Select(missing.TaskGroup)
}
// Store the available nodes by datacenter // Store the available nodes by datacenter
s.ctx.Metrics().NodesAvailable = byDC s.ctx.Metrics().NodesAvailable = byDC
@ -480,3 +502,18 @@ func (s *GenericScheduler) computePlacements(place []allocTuple) error {
return nil return nil
} }
// findPreferredNode finds the preferred node for an allocation
func (s *GenericScheduler) findPreferredNode(allocTuple *allocTuple) (node *structs.Node, err error) {
if allocTuple.Alloc != nil {
taskGroup := allocTuple.Alloc.Job.LookupTaskGroup(allocTuple.Alloc.TaskGroup)
if taskGroup == nil {
err = fmt.Errorf("can't find task group of existing allocation %q", allocTuple.Alloc.ID)
return
}
if taskGroup.LocalDisk.Sticky == true {
node, err = s.state.NodeByID(allocTuple.Alloc.NodeID)
}
}
return
}

View file

@ -91,6 +91,76 @@ func TestServiceSched_JobRegister(t *testing.T) {
h.AssertEvalStatus(t, structs.EvalStatusComplete) h.AssertEvalStatus(t, structs.EvalStatusComplete)
} }
func TestServiceSched_JobRegister_StickyAllocs(t *testing.T) {
h := NewHarness(t)
// Create some nodes
for i := 0; i < 10; i++ {
node := mock.Node()
noErr(t, h.State.UpsertNode(h.NextIndex(), node))
}
// Create a job
job := mock.Job()
job.TaskGroups[0].LocalDisk.Sticky = true
noErr(t, h.State.UpsertJob(h.NextIndex(), job))
// Create a mock evaluation to register the job
eval := &structs.Evaluation{
ID: structs.GenerateUUID(),
Priority: job.Priority,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: job.ID,
}
// Process the evaluation
if err := h.Process(NewServiceScheduler, eval); err != nil {
t.Fatalf("err: %v", err)
}
// Ensure the plan allocated
plan := h.Plans[0]
var planned []*structs.Allocation
for _, allocList := range plan.NodeAllocation {
planned = append(planned, allocList...)
}
if len(planned) != 10 {
t.Fatalf("bad: %#v", plan)
}
// Get an allocation and mark it as failed
alloc := planned[4].Copy()
alloc.ClientStatus = structs.AllocClientStatusFailed
noErr(t, h.State.UpdateAllocsFromClient(h.NextIndex(), []*structs.Allocation{alloc}))
// Create a mock evaluation to handle the update
eval = &structs.Evaluation{
ID: structs.GenerateUUID(),
Priority: job.Priority,
TriggeredBy: structs.EvalTriggerNodeUpdate,
JobID: job.ID,
}
h1 := NewHarnessWithState(t, h.State)
if err := h1.Process(NewServiceScheduler, eval); err != nil {
t.Fatalf("err: %v", err)
}
// Ensure we have created only one new allocation
plan = h1.Plans[0]
var newPlanned []*structs.Allocation
for _, allocList := range plan.NodeAllocation {
newPlanned = append(newPlanned, allocList...)
}
if len(newPlanned) != 1 {
t.Fatalf("bad plan: %#v", plan)
}
// Ensure that the new allocation was placed on the same node as the older
// one
if newPlanned[0].NodeID != alloc.NodeID || newPlanned[0].PreviousAllocation != alloc.ID {
t.Fatalf("expected: %#v, actual: %#v", alloc, newPlanned[0])
}
}
func TestServiceSched_JobRegister_DiskConstraints(t *testing.T) { func TestServiceSched_JobRegister_DiskConstraints(t *testing.T) {
h := NewHarness(t) h := NewHarness(t)
@ -842,7 +912,7 @@ func TestServiceSched_JobModify(t *testing.T) {
noErr(t, err) noErr(t, err)
// Ensure all allocations placed // Ensure all allocations placed
out = structs.FilterTerminalAllocs(out) out, _ = structs.FilterTerminalAllocs(out)
if len(out) != 10 { if len(out) != 10 {
t.Fatalf("bad: %#v", out) t.Fatalf("bad: %#v", out)
} }
@ -933,7 +1003,7 @@ func TestServiceSched_JobModify_IncrCount_NodeLimit(t *testing.T) {
noErr(t, err) noErr(t, err)
// Ensure all allocations placed // Ensure all allocations placed
out = structs.FilterTerminalAllocs(out) out, _ = structs.FilterTerminalAllocs(out)
if len(out) != 3 { if len(out) != 3 {
t.Fatalf("bad: %#v", out) t.Fatalf("bad: %#v", out)
} }
@ -1029,7 +1099,7 @@ func TestServiceSched_JobModify_CountZero(t *testing.T) {
noErr(t, err) noErr(t, err)
// Ensure all allocations placed // Ensure all allocations placed
out = structs.FilterTerminalAllocs(out) out, _ = structs.FilterTerminalAllocs(out)
if len(out) != 0 { if len(out) != 0 {
t.Fatalf("bad: %#v", out) t.Fatalf("bad: %#v", out)
} }
@ -1291,7 +1361,7 @@ func TestServiceSched_JobDeregister(t *testing.T) {
} }
// Ensure no remaining allocations // Ensure no remaining allocations
out = structs.FilterTerminalAllocs(out) out, _ = structs.FilterTerminalAllocs(out)
if len(out) != 0 { if len(out) != 0 {
t.Fatalf("bad: %#v", out) t.Fatalf("bad: %#v", out)
} }
@ -1494,7 +1564,7 @@ func TestServiceSched_NodeDrain(t *testing.T) {
noErr(t, err) noErr(t, err)
// Ensure all allocations placed // Ensure all allocations placed
out = structs.FilterTerminalAllocs(out) out, _ = structs.FilterTerminalAllocs(out)
if len(out) != 10 { if len(out) != 10 {
t.Fatalf("bad: %#v", out) t.Fatalf("bad: %#v", out)
} }
@ -2076,35 +2146,45 @@ func TestGenericSched_FilterCompleteAllocs(t *testing.T) {
cases := []struct { cases := []struct {
Batch bool Batch bool
Input, Output []*structs.Allocation Input, Output []*structs.Allocation
TerminalAllocs map[string]*structs.Allocation
}{ }{
{ {
Input: []*structs.Allocation{running}, Input: []*structs.Allocation{running},
Output: []*structs.Allocation{running}, Output: []*structs.Allocation{running},
TerminalAllocs: map[string]*structs.Allocation{},
}, },
{ {
Input: []*structs.Allocation{running, desiredStop}, Input: []*structs.Allocation{running, desiredStop},
Output: []*structs.Allocation{running}, Output: []*structs.Allocation{running},
TerminalAllocs: map[string]*structs.Allocation{
desiredStop.Name: desiredStop,
},
}, },
{ {
Batch: true, Batch: true,
Input: []*structs.Allocation{running}, Input: []*structs.Allocation{running},
Output: []*structs.Allocation{running}, Output: []*structs.Allocation{running},
TerminalAllocs: map[string]*structs.Allocation{},
}, },
{ {
Batch: true, Batch: true,
Input: []*structs.Allocation{new, oldSuccessful}, Input: []*structs.Allocation{new, oldSuccessful},
Output: []*structs.Allocation{new}, Output: []*structs.Allocation{new},
TerminalAllocs: map[string]*structs.Allocation{},
}, },
{ {
Batch: true, Batch: true,
Input: []*structs.Allocation{unsuccessful}, Input: []*structs.Allocation{unsuccessful},
Output: []*structs.Allocation{}, Output: []*structs.Allocation{},
TerminalAllocs: map[string]*structs.Allocation{
unsuccessful.Name: unsuccessful,
},
}, },
} }
for i, c := range cases { for i, c := range cases {
g := &GenericScheduler{batch: c.Batch} g := &GenericScheduler{batch: c.Batch}
out := g.filterCompleteAllocs(c.Input) out, terminalAllocs := g.filterCompleteAllocs(c.Input)
if !reflect.DeepEqual(out, c.Output) { if !reflect.DeepEqual(out, c.Output) {
t.Log("Got:") t.Log("Got:")
@ -2117,6 +2197,19 @@ func TestGenericSched_FilterCompleteAllocs(t *testing.T) {
} }
t.Fatalf("Case %d failed", i+1) t.Fatalf("Case %d failed", i+1)
} }
if !reflect.DeepEqual(terminalAllocs, c.TerminalAllocs) {
t.Log("Got:")
for n, a := range terminalAllocs {
t.Logf("%v: %#v", n, a)
}
t.Log("Want:")
for n, a := range c.TerminalAllocs {
t.Logf("%v: %#v", n, a)
}
t.Fatalf("Case %d failed", i+1)
}
} }
} }

View file

@ -171,6 +171,19 @@ func (s *GenericStack) Select(tg *structs.TaskGroup) (*RankedNode, *structs.Reso
return option, tgConstr.size return option, tgConstr.size
} }
// SelectPreferredNode returns a node where an allocation of the task group can
// be placed, the node passed to it is preferred over the other available nodes
func (s *GenericStack) SelectPreferringNodes(tg *structs.TaskGroup, nodes []*structs.Node) (*RankedNode, *structs.Resources) {
originalNodes := s.source.nodes
s.source.SetNodes(nodes)
if option, resources := s.Select(tg); option != nil {
s.source.SetNodes(originalNodes)
return option, resources
}
s.source.SetNodes(originalNodes)
return s.Select(tg)
}
// SystemStack is the Stack used for the System scheduler. It is designed to // SystemStack is the Stack used for the System scheduler. It is designed to
// attempt to make placements on all nodes. // attempt to make placements on all nodes.
type SystemStack struct { type SystemStack struct {

View file

@ -125,6 +125,42 @@ func TestServiceStack_Select_Size(t *testing.T) {
} }
} }
func TestServiceStack_Select_PreferringNodes(t *testing.T) {
_, ctx := testContext(t)
nodes := []*structs.Node{
mock.Node(),
}
stack := NewGenericStack(false, ctx)
stack.SetNodes(nodes)
job := mock.Job()
stack.SetJob(job)
// Create a preferred node
preferredNode := mock.Node()
option, _ := stack.SelectPreferringNodes(job.TaskGroups[0], []*structs.Node{preferredNode})
if option == nil {
t.Fatalf("missing node %#v", ctx.Metrics())
}
if option.Node.ID != preferredNode.ID {
t.Fatalf("expected: %v, actual: %v", option.Node.ID, preferredNode.ID)
}
// Change the preferred node's kernel to windows and ensure the allocations
// are placed elsewhere
preferredNode1 := preferredNode.Copy()
preferredNode1.Attributes["kernel.name"] = "windows"
preferredNode1.ComputeClass()
option, _ = stack.SelectPreferringNodes(job.TaskGroups[0], []*structs.Node{preferredNode1})
if option == nil {
t.Fatalf("missing node %#v", ctx.Metrics())
}
if option.Node.ID != nodes[0].ID {
t.Fatalf("expected: %#v, actual: %#v", nodes[0], option.Node)
}
}
func TestServiceStack_Select_MetricsReset(t *testing.T) { func TestServiceStack_Select_MetricsReset(t *testing.T) {
_, ctx := testContext(t) _, ctx := testContext(t)
nodes := []*structs.Node{ nodes := []*structs.Node{

View file

@ -196,10 +196,10 @@ func (s *SystemScheduler) computeJobAllocs() error {
updateNonTerminalAllocsToLost(s.plan, tainted, allocs) updateNonTerminalAllocsToLost(s.plan, tainted, allocs)
// Filter out the allocations in a terminal state // Filter out the allocations in a terminal state
allocs = structs.FilterTerminalAllocs(allocs) allocs, terminalAllocs := structs.FilterTerminalAllocs(allocs)
// Diff the required and existing allocations // Diff the required and existing allocations
diff := diffSystemAllocs(s.job, s.nodes, tainted, allocs) diff := diffSystemAllocs(s.job, s.nodes, tainted, allocs, terminalAllocs)
s.logger.Printf("[DEBUG] sched: %#v: %#v", s.eval, diff) s.logger.Printf("[DEBUG] sched: %#v: %#v", s.eval, diff)
// Add all the allocs to stop // Add all the allocs to stop

View file

@ -80,6 +80,76 @@ func TestSystemSched_JobRegister(t *testing.T) {
h.AssertEvalStatus(t, structs.EvalStatusComplete) h.AssertEvalStatus(t, structs.EvalStatusComplete)
} }
func TestSystemeSched_JobRegister_StickyAllocs(t *testing.T) {
h := NewHarness(t)
// Create some nodes
for i := 0; i < 10; i++ {
node := mock.Node()
noErr(t, h.State.UpsertNode(h.NextIndex(), node))
}
// Create a job
job := mock.SystemJob()
job.TaskGroups[0].LocalDisk.Sticky = true
noErr(t, h.State.UpsertJob(h.NextIndex(), job))
// Create a mock evaluation to register the job
eval := &structs.Evaluation{
ID: structs.GenerateUUID(),
Priority: job.Priority,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: job.ID,
}
// Process the evaluation
if err := h.Process(NewSystemScheduler, eval); err != nil {
t.Fatalf("err: %v", err)
}
// Ensure the plan allocated
plan := h.Plans[0]
var planned []*structs.Allocation
for _, allocList := range plan.NodeAllocation {
planned = append(planned, allocList...)
}
if len(planned) != 10 {
t.Fatalf("bad: %#v", plan)
}
// Get an allocation and mark it as failed
alloc := planned[4].Copy()
alloc.ClientStatus = structs.AllocClientStatusFailed
noErr(t, h.State.UpdateAllocsFromClient(h.NextIndex(), []*structs.Allocation{alloc}))
// Create a mock evaluation to handle the update
eval = &structs.Evaluation{
ID: structs.GenerateUUID(),
Priority: job.Priority,
TriggeredBy: structs.EvalTriggerNodeUpdate,
JobID: job.ID,
}
h1 := NewHarnessWithState(t, h.State)
if err := h1.Process(NewSystemScheduler, eval); err != nil {
t.Fatalf("err: %v", err)
}
// Ensure we have created only one new allocation
plan = h1.Plans[0]
var newPlanned []*structs.Allocation
for _, allocList := range plan.NodeAllocation {
newPlanned = append(newPlanned, allocList...)
}
if len(newPlanned) != 1 {
t.Fatalf("bad plan: %#v", plan)
}
// Ensure that the new allocation was placed on the same node as the older
// one
if newPlanned[0].NodeID != alloc.NodeID || newPlanned[0].PreviousAllocation != alloc.ID {
t.Fatalf("expected: %#v, actual: %#v", alloc, newPlanned[0])
}
}
func TestSystemSched_JobRegister_LocalDiskConstraint(t *testing.T) { func TestSystemSched_JobRegister_LocalDiskConstraint(t *testing.T) {
h := NewHarness(t) h := NewHarness(t)
@ -364,7 +434,7 @@ func TestSystemSched_JobRegister_AddNode(t *testing.T) {
noErr(t, err) noErr(t, err)
// Ensure all allocations placed // Ensure all allocations placed
out = structs.FilterTerminalAllocs(out) out, _ = structs.FilterTerminalAllocs(out)
if len(out) != 11 { if len(out) != 11 {
t.Fatalf("bad: %#v", out) t.Fatalf("bad: %#v", out)
} }
@ -492,7 +562,7 @@ func TestSystemSched_JobModify(t *testing.T) {
noErr(t, err) noErr(t, err)
// Ensure all allocations placed // Ensure all allocations placed
out = structs.FilterTerminalAllocs(out) out, _ = structs.FilterTerminalAllocs(out)
if len(out) != 10 { if len(out) != 10 {
t.Fatalf("bad: %#v", out) t.Fatalf("bad: %#v", out)
} }
@ -756,7 +826,7 @@ func TestSystemSched_JobDeregister(t *testing.T) {
noErr(t, err) noErr(t, err)
// Ensure no remaining allocations // Ensure no remaining allocations
out = structs.FilterTerminalAllocs(out) out, _ = structs.FilterTerminalAllocs(out)
if len(out) != 0 { if len(out) != 0 {
t.Fatalf("bad: %#v", out) t.Fatalf("bad: %#v", out)
} }

View file

@ -59,8 +59,16 @@ func (d *diffResult) Append(other *diffResult) {
// need to be migrated (node is draining), the allocs that need to be evicted // need to be migrated (node is draining), the allocs that need to be evicted
// (no longer required), those that should be ignored and those that are lost // (no longer required), those that should be ignored and those that are lost
// that need to be replaced (running on a lost node). // that need to be replaced (running on a lost node).
//
// job is the job whose allocs is going to be diff-ed.
// taintedNodes is an index of the nodes which are either down or in drain mode
// by name.
// required is a set of allocations that must exist.
// allocs is a list of non terminal allocations.
// terminalAllocs is an index of the latest terminal allocations by name.
func diffAllocs(job *structs.Job, taintedNodes map[string]*structs.Node, func diffAllocs(job *structs.Job, taintedNodes map[string]*structs.Node,
required map[string]*structs.TaskGroup, allocs []*structs.Allocation) *diffResult { required map[string]*structs.TaskGroup, allocs []*structs.Allocation,
terminalAllocs map[string]*structs.Allocation) *diffResult {
result := &diffResult{} result := &diffResult{}
// Scan the existing updates // Scan the existing updates
@ -143,6 +151,7 @@ func diffAllocs(job *structs.Job, taintedNodes map[string]*structs.Node,
result.place = append(result.place, allocTuple{ result.place = append(result.place, allocTuple{
Name: name, Name: name,
TaskGroup: tg, TaskGroup: tg,
Alloc: terminalAllocs[name],
}) })
} }
} }
@ -151,8 +160,15 @@ func diffAllocs(job *structs.Job, taintedNodes map[string]*structs.Node,
// diffSystemAllocs is like diffAllocs however, the allocations in the // diffSystemAllocs is like diffAllocs however, the allocations in the
// diffResult contain the specific nodeID they should be allocated on. // diffResult contain the specific nodeID they should be allocated on.
//
// job is the job whose allocs is going to be diff-ed.
// nodes is a list of nodes in ready state.
// taintedNodes is an index of the nodes which are either down or in drain mode
// by name.
// allocs is a list of non terminal allocations.
// terminalAllocs is an index of the latest terminal allocations by name.
func diffSystemAllocs(job *structs.Job, nodes []*structs.Node, taintedNodes map[string]*structs.Node, func diffSystemAllocs(job *structs.Job, nodes []*structs.Node, taintedNodes map[string]*structs.Node,
allocs []*structs.Allocation) *diffResult { allocs []*structs.Allocation, terminalAllocs map[string]*structs.Allocation) *diffResult {
// Build a mapping of nodes to all their allocs. // Build a mapping of nodes to all their allocs.
nodeAllocs := make(map[string][]*structs.Allocation, len(allocs)) nodeAllocs := make(map[string][]*structs.Allocation, len(allocs))
@ -172,13 +188,19 @@ func diffSystemAllocs(job *structs.Job, nodes []*structs.Node, taintedNodes map[
result := &diffResult{} result := &diffResult{}
for nodeID, allocs := range nodeAllocs { for nodeID, allocs := range nodeAllocs {
diff := diffAllocs(job, taintedNodes, required, allocs) diff := diffAllocs(job, taintedNodes, required, allocs, terminalAllocs)
// Mark the alloc as being for a specific node. // Mark the alloc as being for a specific node.
for i := range diff.place { for i := range diff.place {
alloc := &diff.place[i] alloc := &diff.place[i]
// If the new allocation isn't annotated with a previous allocation
// or if the previous allocation isn't from the same node then we
// annotate the allocTuple with a new Allocation
if alloc.Alloc == nil || alloc.Alloc.NodeID != nodeID {
alloc.Alloc = &structs.Allocation{NodeID: nodeID} alloc.Alloc = &structs.Allocation{NodeID: nodeID}
} }
}
// Migrate does not apply to system jobs and instead should be marked as // Migrate does not apply to system jobs and instead should be marked as
// stop because if a node is tainted, the job is invalid on that node. // stop because if a node is tainted, the job is invalid on that node.

View file

@ -99,7 +99,29 @@ func TestDiffAllocs(t *testing.T) {
}, },
} }
diff := diffAllocs(job, tainted, required, allocs) // Have three terminal allocs
terminalAllocs := map[string]*structs.Allocation{
"my-job.web[4]": &structs.Allocation{
ID: structs.GenerateUUID(),
NodeID: "zip",
Name: "my-job.web[4]",
Job: job,
},
"my-job.web[5]": &structs.Allocation{
ID: structs.GenerateUUID(),
NodeID: "zip",
Name: "my-job.web[5]",
Job: job,
},
"my-job.web[6]": &structs.Allocation{
ID: structs.GenerateUUID(),
NodeID: "zip",
Name: "my-job.web[6]",
Job: job,
},
}
diff := diffAllocs(job, tainted, required, allocs, terminalAllocs)
place := diff.place place := diff.place
update := diff.update update := diff.update
migrate := diff.migrate migrate := diff.migrate
@ -136,13 +158,26 @@ func TestDiffAllocs(t *testing.T) {
if len(place) != 6 { if len(place) != 6 {
t.Fatalf("bad: %#v", place) t.Fatalf("bad: %#v", place)
} }
// Ensure that the allocations which are replacements of terminal allocs are
// annotated
for name, alloc := range terminalAllocs {
for _, allocTuple := range diff.place {
if name == allocTuple.Name {
if !reflect.DeepEqual(alloc, allocTuple.Alloc) {
t.Fatalf("expected: %#v, actual: %#v", alloc, allocTuple.Alloc)
}
}
}
}
} }
func TestDiffSystemAllocs(t *testing.T) { func TestDiffSystemAllocs(t *testing.T) {
job := mock.SystemJob() job := mock.SystemJob()
// Create three alive nodes. // Create three alive nodes.
nodes := []*structs.Node{{ID: "foo"}, {ID: "bar"}, {ID: "baz"}} nodes := []*structs.Node{{ID: "foo"}, {ID: "bar"}, {ID: "baz"},
{ID: "pipe"}}
// The "old" job has a previous modify index // The "old" job has a previous modify index
oldJob := new(structs.Job) oldJob := new(structs.Job)
@ -193,7 +228,17 @@ func TestDiffSystemAllocs(t *testing.T) {
}, },
} }
diff := diffSystemAllocs(job, nodes, tainted, allocs) // Have three terminal allocs
terminalAllocs := map[string]*structs.Allocation{
"my-job.web[0]": &structs.Allocation{
ID: structs.GenerateUUID(),
NodeID: "pipe",
Name: "my-job.web[0]",
Job: job,
},
}
diff := diffSystemAllocs(job, nodes, tainted, allocs, terminalAllocs)
place := diff.place place := diff.place
update := diff.update update := diff.update
migrate := diff.migrate migrate := diff.migrate
@ -227,9 +272,21 @@ func TestDiffSystemAllocs(t *testing.T) {
} }
// We should place 1 // We should place 1
if len(place) != 1 { if len(place) != 2 {
t.Fatalf("bad: %#v", place) t.Fatalf("bad: %#v", place)
} }
// Ensure that the allocations which are replacements of terminal allocs are
// annotated
for _, alloc := range terminalAllocs {
for _, allocTuple := range diff.place {
if alloc.NodeID == allocTuple.Alloc.NodeID {
if !reflect.DeepEqual(alloc, allocTuple.Alloc) {
t.Fatalf("expected: %#v, actual: %#v", alloc, allocTuple.Alloc)
}
}
}
}
} }
func TestReadyNodesInDCs(t *testing.T) { func TestReadyNodesInDCs(t *testing.T) {