scheduler: binpacker makes network offers

This commit is contained in:
Armon Dadgar 2015-09-13 14:31:32 -07:00
parent b0eb463823
commit 625308661a
3 changed files with 101 additions and 38 deletions

View file

@ -10,8 +10,9 @@ import (
// along with a node when iterating. This state can be modified as
// various rank methods are applied.
type RankedNode struct {
Node *structs.Node
Score float64
Node *structs.Node
Score float64
TaskResources map[string]*structs.Resources
// Allocs is used to cache the proposed allocations on the
// node. This can be shared between iterators that require it.
@ -35,6 +36,14 @@ func (r *RankedNode) ProposedAllocs(ctx Context) ([]*structs.Allocation, error)
return p, nil
}
func (r *RankedNode) SetTaskResources(task *structs.Task,
resource *structs.Resources) {
if r.TaskResources == nil {
r.TaskResources = make(map[string]*structs.Resources)
}
r.TaskResources[task.Name] = resource
}
// RankFeasibleIterator is used to iteratively yield nodes along
// with ranking metadata. The iterators may manage some state for
// performance optimizations.
@ -122,35 +131,35 @@ func (iter *StaticRankIterator) Reset() {
// BinPackIterator is a RankIterator that scores potential options
// based on a bin-packing algorithm.
type BinPackIterator struct {
ctx Context
source RankIterator
resources *structs.Resources
evict bool
priority int
ctx Context
source RankIterator
evict bool
priority int
tasks []*structs.Task
}
// NewBinPackIterator returns a BinPackIterator which tries to fit the given
// resources, potentially evicting other tasks based on a given priority.
func NewBinPackIterator(ctx Context, source RankIterator, resources *structs.Resources, evict bool, priority int) *BinPackIterator {
// NewBinPackIterator returns a BinPackIterator which tries to fit tasks
// potentially evicting other tasks based on a given priority.
func NewBinPackIterator(ctx Context, source RankIterator, evict bool, priority int) *BinPackIterator {
iter := &BinPackIterator{
ctx: ctx,
source: source,
resources: resources,
evict: evict,
priority: priority,
ctx: ctx,
source: source,
evict: evict,
priority: priority,
}
return iter
}
func (iter *BinPackIterator) SetResources(r *structs.Resources) {
iter.resources = r
}
func (iter *BinPackIterator) SetPriority(p int) {
iter.priority = p
}
func (iter *BinPackIterator) SetTasks(tasks []*structs.Task) {
iter.tasks = tasks
}
func (iter *BinPackIterator) Next() *RankedNode {
OUTER:
for {
// Get the next potential option
option := iter.source.Next()
@ -167,8 +176,42 @@ func (iter *BinPackIterator) Next() *RankedNode {
continue
}
// Index the existing network usage
netIdx := NewNetworkIndex()
netIdx.SetNode(option.Node)
netIdx.AddAllocs(proposed)
// Assign the resources for each task
total := new(structs.Resources)
for _, task := range iter.tasks {
taskResources := task.Resources.Copy()
// Check if we need a network resource
if len(taskResources.Networks) > 0 {
ask := taskResources.Networks[0]
offer := netIdx.AssignNetwork(ask)
if offer == nil {
iter.ctx.Metrics().FilterNode(option.Node,
"failed network offer")
continue OUTER
}
// Reserve this to prevent another task from colliding
netIdx.AddReserved(offer)
// Update the network ask to the offer
taskResources.Networks = []*structs.NetworkResource{offer}
}
// Store the task resource
option.SetTaskResources(task, taskResources)
// Accumulate the total resource requirement
total.Add(taskResources)
}
// Add the resources we are trying to fit
proposed = append(proposed, &structs.Allocation{Resources: iter.resources})
proposed = append(proposed, &structs.Allocation{Resources: total})
// Check if these allocations fit, if they do not, simply skip this node
fit, util, _ := structs.AllocsFit(option.Node, proposed)

View file

@ -68,11 +68,16 @@ func TestBinPackIterator_NoExistingAlloc(t *testing.T) {
}
static := NewStaticRankIterator(ctx, nodes)
resources := &structs.Resources{
CPU: 1024,
MemoryMB: 1024,
task := &structs.Task{
Name: "web",
Resources: &structs.Resources{
CPU: 1024,
MemoryMB: 1024,
},
}
binp := NewBinPackIterator(ctx, static, resources, false, 0)
binp := NewBinPackIterator(ctx, static, false, 0)
binp.SetTasks([]*structs.Task{task})
out := collectRanked(binp)
if len(out) != 2 {
@ -137,11 +142,16 @@ func TestBinPackIterator_PlannedAlloc(t *testing.T) {
},
}
resources := &structs.Resources{
CPU: 1024,
MemoryMB: 1024,
task := &structs.Task{
Name: "web",
Resources: &structs.Resources{
CPU: 1024,
MemoryMB: 1024,
},
}
binp := NewBinPackIterator(ctx, static, resources, false, 0)
binp := NewBinPackIterator(ctx, static, false, 0)
binp.SetTasks([]*structs.Task{task})
out := collectRanked(binp)
if len(out) != 1 {
@ -207,11 +217,16 @@ func TestBinPackIterator_ExistingAlloc(t *testing.T) {
}
noErr(t, state.UpsertAllocs(1000, []*structs.Allocation{alloc1, alloc2}))
resources := &structs.Resources{
CPU: 1024,
MemoryMB: 1024,
task := &structs.Task{
Name: "web",
Resources: &structs.Resources{
CPU: 1024,
MemoryMB: 1024,
},
}
binp := NewBinPackIterator(ctx, static, resources, false, 0)
binp := NewBinPackIterator(ctx, static, false, 0)
binp.SetTasks([]*structs.Task{task})
out := collectRanked(binp)
if len(out) != 1 {
@ -280,11 +295,16 @@ func TestBinPackIterator_ExistingAlloc_PlannedEvict(t *testing.T) {
plan := ctx.Plan()
plan.NodeUpdate[nodes[0].Node.ID] = []*structs.Allocation{alloc1}
resources := &structs.Resources{
CPU: 1024,
MemoryMB: 1024,
task := &structs.Task{
Name: "web",
Resources: &structs.Resources{
CPU: 1024,
MemoryMB: 1024,
},
}
binp := NewBinPackIterator(ctx, static, resources, false, 0)
binp := NewBinPackIterator(ctx, static, false, 0)
binp.SetTasks([]*structs.Task{task})
out := collectRanked(binp)
if len(out) != 2 {

View file

@ -76,7 +76,7 @@ func NewGenericStack(batch bool, ctx Context, baseNodes []*structs.Node) *Generi
// by a particular task group. Only enable eviction for the service
// scheduler as that logic is expensive.
evict := !batch
s.binPack = NewBinPackIterator(ctx, rankSource, nil, evict, 0)
s.binPack = NewBinPackIterator(ctx, rankSource, evict, 0)
// Apply the job anti-affinity iterator. This is to avoid placing
// multiple allocations on the same node for this job. The penalty
@ -149,7 +149,7 @@ func (s *GenericStack) Select(tg *structs.TaskGroup) (*RankedNode, *structs.Reso
// Update the parameters of iterators
s.taskGroupDrivers.SetDrivers(drivers)
s.taskGroupConstraint.SetConstraints(constr)
s.binPack.SetResources(size)
s.binPack.SetTasks(tg.Tasks)
// Find the node with the max score
option := s.maxScore.Next()