Refactor task group constraint logic in generic/system stack

This commit is contained in:
Alex Dadgar 2015-10-16 14:00:51 -07:00
parent ab9acb9edf
commit 1ec921a3c2
3 changed files with 91 additions and 28 deletions

View file

@ -134,21 +134,12 @@ func (s *GenericStack) Select(tg *structs.TaskGroup) (*RankedNode, *structs.Reso
s.ctx.Reset() s.ctx.Reset()
start := time.Now() start := time.Now()
// Collect the constraints, drivers and resources required by each // Get the task groups constraints.
// sub-task to aggregate the TaskGroup totals tgConstr := taskGroupConstraints(tg)
constr := make([]*structs.Constraint, 0, len(tg.Constraints))
drivers := make(map[string]struct{})
size := new(structs.Resources)
constr = append(constr, tg.Constraints...)
for _, task := range tg.Tasks {
drivers[task.Driver] = struct{}{}
constr = append(constr, task.Constraints...)
size.Add(task.Resources)
}
// Update the parameters of iterators // Update the parameters of iterators
s.taskGroupDrivers.SetDrivers(drivers) s.taskGroupDrivers.SetDrivers(tgConstr.drivers)
s.taskGroupConstraint.SetConstraints(constr) s.taskGroupConstraint.SetConstraints(tgConstr.constraints)
s.binPack.SetTasks(tg.Tasks) s.binPack.SetTasks(tg.Tasks)
// Find the node with the max score // Find the node with the max score
@ -163,7 +154,7 @@ func (s *GenericStack) Select(tg *structs.TaskGroup) (*RankedNode, *structs.Reso
// Store the compute time // Store the compute time
s.ctx.Metrics().AllocationTime = time.Since(start) s.ctx.Metrics().AllocationTime = time.Since(start)
return option, size return option, tgConstr.size
} }
// SystemStack is the Stack used for the System scheduler. It is designed to // SystemStack is the Stack used for the System scheduler. It is designed to
@ -226,21 +217,12 @@ func (s *SystemStack) Select(tg *structs.TaskGroup) (*RankedNode, *structs.Resou
s.ctx.Reset() s.ctx.Reset()
start := time.Now() start := time.Now()
// Collect the constraints, drivers and resources required by each // Get the task groups constraints.
// sub-task to aggregate the TaskGroup totals tgConstr := taskGroupConstraints(tg)
constr := make([]*structs.Constraint, 0, len(tg.Constraints))
drivers := make(map[string]struct{})
size := new(structs.Resources)
constr = append(constr, tg.Constraints...)
for _, task := range tg.Tasks {
drivers[task.Driver] = struct{}{}
constr = append(constr, task.Constraints...)
size.Add(task.Resources)
}
// Update the parameters of iterators // Update the parameters of iterators
s.taskGroupDrivers.SetDrivers(drivers) s.taskGroupDrivers.SetDrivers(tgConstr.drivers)
s.taskGroupConstraint.SetConstraints(constr) s.taskGroupConstraint.SetConstraints(tgConstr.constraints)
s.binPack.SetTasks(tg.Tasks) s.binPack.SetTasks(tg.Tasks)
// Get the next option that satisfies the constraints. // Get the next option that satisfies the constraints.
@ -255,5 +237,5 @@ func (s *SystemStack) Select(tg *structs.TaskGroup) (*RankedNode, *structs.Resou
// Store the compute time // Store the compute time
s.ctx.Metrics().AllocationTime = time.Since(start) s.ctx.Metrics().AllocationTime = time.Since(start)
return option, size return option, tgConstr.size
} }

View file

@ -408,3 +408,34 @@ func evictAndPlace(ctx Context, diff *diffResult, allocs []allocTuple, desc stri
*limit = 0 *limit = 0
return true return true
} }
// tgConstrainTuple is used to store the total constraints of a task group.
type tgConstrainTuple struct {
// Holds the combined constraints of the task group and all it's sub-tasks.
constraints []*structs.Constraint
// The set of required drivers within the task group.
drivers map[string]struct{}
// The combined resources of all tasks within the task group.
size *structs.Resources
}
// taskGroupConstraints collects the constraints, drivers and resources required by each
// sub-task to aggregate the TaskGroup totals
func taskGroupConstraints(tg *structs.TaskGroup) tgConstrainTuple {
c := tgConstrainTuple{
constraints: make([]*structs.Constraint, 0, len(tg.Constraints)),
drivers: make(map[string]struct{}),
size: new(structs.Resources),
}
c.constraints = append(c.constraints, tg.Constraints...)
for _, task := range tg.Tasks {
c.drivers[task.Driver] = struct{}{}
c.constraints = append(c.constraints, task.Constraints...)
c.size.Add(task.Resources)
}
return c
}

View file

@ -340,3 +340,53 @@ func TestTasksUpdated(t *testing.T) {
t.Fatalf("bad") t.Fatalf("bad")
} }
} }
func TestTaskGroupConstraints(t *testing.T) {
constr := &structs.Constraint{Hard: true}
constr2 := &structs.Constraint{LTarget: "foo"}
constr3 := &structs.Constraint{Weight: 10}
tg := &structs.TaskGroup{
Name: "web",
Count: 10,
Constraints: []*structs.Constraint{constr},
Tasks: []*structs.Task{
&structs.Task{
Driver: "exec",
Resources: &structs.Resources{
CPU: 500,
MemoryMB: 256,
},
Constraints: []*structs.Constraint{constr2},
},
&structs.Task{
Driver: "docker",
Resources: &structs.Resources{
CPU: 500,
MemoryMB: 256,
},
Constraints: []*structs.Constraint{constr3},
},
},
}
// Build the expected values.
expConstr := []*structs.Constraint{constr, constr2, constr3}
expDrivers := map[string]struct{}{"exec": struct{}{}, "docker": struct{}{}}
expSize := &structs.Resources{
CPU: 1000,
MemoryMB: 512,
}
actConstrains := taskGroupConstraints(tg)
if !reflect.DeepEqual(actConstrains.constraints, expConstr) {
t.Fatalf("taskGroupConstraints(%v) returned %v; want %v", tg, actConstrains.constraints, expConstr)
}
if !reflect.DeepEqual(actConstrains.drivers, expDrivers) {
t.Fatalf("taskGroupConstraints(%v) returned %v; want %v", tg, actConstrains.drivers, expDrivers)
}
if !reflect.DeepEqual(actConstrains.size, expSize) {
t.Fatalf("taskGroupConstraints(%v) returned %v; want %v", tg, actConstrains.size, expSize)
}
}