diff --git a/scheduler/rank.go b/scheduler/rank.go index 740a68a11..3f16c09bf 100644 --- a/scheduler/rank.go +++ b/scheduler/rank.go @@ -10,8 +10,9 @@ import ( // along with a node when iterating. This state can be modified as // various rank methods are applied. type RankedNode struct { - Node *structs.Node - Score float64 + Node *structs.Node + Score float64 + TaskResources map[string]*structs.Resources // Allocs is used to cache the proposed allocations on the // node. This can be shared between iterators that require it. @@ -35,6 +36,14 @@ func (r *RankedNode) ProposedAllocs(ctx Context) ([]*structs.Allocation, error) return p, nil } +func (r *RankedNode) SetTaskResources(task *structs.Task, + resource *structs.Resources) { + if r.TaskResources == nil { + r.TaskResources = make(map[string]*structs.Resources) + } + r.TaskResources[task.Name] = resource +} + // RankFeasibleIterator is used to iteratively yield nodes along // with ranking metadata. The iterators may manage some state for // performance optimizations. @@ -122,35 +131,35 @@ func (iter *StaticRankIterator) Reset() { // BinPackIterator is a RankIterator that scores potential options // based on a bin-packing algorithm. type BinPackIterator struct { - ctx Context - source RankIterator - resources *structs.Resources - evict bool - priority int + ctx Context + source RankIterator + evict bool + priority int + tasks []*structs.Task } -// NewBinPackIterator returns a BinPackIterator which tries to fit the given -// resources, potentially evicting other tasks based on a given priority. -func NewBinPackIterator(ctx Context, source RankIterator, resources *structs.Resources, evict bool, priority int) *BinPackIterator { +// NewBinPackIterator returns a BinPackIterator which tries to fit tasks +// potentially evicting other tasks based on a given priority. +func NewBinPackIterator(ctx Context, source RankIterator, evict bool, priority int) *BinPackIterator { iter := &BinPackIterator{ - ctx: ctx, - source: source, - resources: resources, - evict: evict, - priority: priority, + ctx: ctx, + source: source, + evict: evict, + priority: priority, } return iter } -func (iter *BinPackIterator) SetResources(r *structs.Resources) { - iter.resources = r -} - func (iter *BinPackIterator) SetPriority(p int) { iter.priority = p } +func (iter *BinPackIterator) SetTasks(tasks []*structs.Task) { + iter.tasks = tasks +} + func (iter *BinPackIterator) Next() *RankedNode { +OUTER: for { // Get the next potential option option := iter.source.Next() @@ -167,8 +176,42 @@ func (iter *BinPackIterator) Next() *RankedNode { continue } + // Index the existing network usage + netIdx := NewNetworkIndex() + netIdx.SetNode(option.Node) + netIdx.AddAllocs(proposed) + + // Assign the resources for each task + total := new(structs.Resources) + for _, task := range iter.tasks { + taskResources := task.Resources.Copy() + + // Check if we need a network resource + if len(taskResources.Networks) > 0 { + ask := taskResources.Networks[0] + offer := netIdx.AssignNetwork(ask) + if offer == nil { + iter.ctx.Metrics().FilterNode(option.Node, + "failed network offer") + continue OUTER + } + + // Reserve this to prevent another task from colliding + netIdx.AddReserved(offer) + + // Update the network ask to the offer + taskResources.Networks = []*structs.NetworkResource{offer} + } + + // Store the task resource + option.SetTaskResources(task, taskResources) + + // Accumulate the total resource requirement + total.Add(taskResources) + } + // Add the resources we are trying to fit - proposed = append(proposed, &structs.Allocation{Resources: iter.resources}) + proposed = append(proposed, &structs.Allocation{Resources: total}) // Check if these allocations fit, if they do not, simply skip this node fit, util, _ := structs.AllocsFit(option.Node, proposed) diff --git a/scheduler/rank_test.go b/scheduler/rank_test.go index 69bcdb452..605902ed4 100644 --- a/scheduler/rank_test.go +++ b/scheduler/rank_test.go @@ -68,11 +68,16 @@ func TestBinPackIterator_NoExistingAlloc(t *testing.T) { } static := NewStaticRankIterator(ctx, nodes) - resources := &structs.Resources{ - CPU: 1024, - MemoryMB: 1024, + task := &structs.Task{ + Name: "web", + Resources: &structs.Resources{ + CPU: 1024, + MemoryMB: 1024, + }, } - binp := NewBinPackIterator(ctx, static, resources, false, 0) + + binp := NewBinPackIterator(ctx, static, false, 0) + binp.SetTasks([]*structs.Task{task}) out := collectRanked(binp) if len(out) != 2 { @@ -137,11 +142,16 @@ func TestBinPackIterator_PlannedAlloc(t *testing.T) { }, } - resources := &structs.Resources{ - CPU: 1024, - MemoryMB: 1024, + task := &structs.Task{ + Name: "web", + Resources: &structs.Resources{ + CPU: 1024, + MemoryMB: 1024, + }, } - binp := NewBinPackIterator(ctx, static, resources, false, 0) + + binp := NewBinPackIterator(ctx, static, false, 0) + binp.SetTasks([]*structs.Task{task}) out := collectRanked(binp) if len(out) != 1 { @@ -207,11 +217,16 @@ func TestBinPackIterator_ExistingAlloc(t *testing.T) { } noErr(t, state.UpsertAllocs(1000, []*structs.Allocation{alloc1, alloc2})) - resources := &structs.Resources{ - CPU: 1024, - MemoryMB: 1024, + task := &structs.Task{ + Name: "web", + Resources: &structs.Resources{ + CPU: 1024, + MemoryMB: 1024, + }, } - binp := NewBinPackIterator(ctx, static, resources, false, 0) + + binp := NewBinPackIterator(ctx, static, false, 0) + binp.SetTasks([]*structs.Task{task}) out := collectRanked(binp) if len(out) != 1 { @@ -280,11 +295,16 @@ func TestBinPackIterator_ExistingAlloc_PlannedEvict(t *testing.T) { plan := ctx.Plan() plan.NodeUpdate[nodes[0].Node.ID] = []*structs.Allocation{alloc1} - resources := &structs.Resources{ - CPU: 1024, - MemoryMB: 1024, + task := &structs.Task{ + Name: "web", + Resources: &structs.Resources{ + CPU: 1024, + MemoryMB: 1024, + }, } - binp := NewBinPackIterator(ctx, static, resources, false, 0) + + binp := NewBinPackIterator(ctx, static, false, 0) + binp.SetTasks([]*structs.Task{task}) out := collectRanked(binp) if len(out) != 2 { diff --git a/scheduler/stack.go b/scheduler/stack.go index fdedc738b..4c5b76ce3 100644 --- a/scheduler/stack.go +++ b/scheduler/stack.go @@ -76,7 +76,7 @@ func NewGenericStack(batch bool, ctx Context, baseNodes []*structs.Node) *Generi // by a particular task group. Only enable eviction for the service // scheduler as that logic is expensive. evict := !batch - s.binPack = NewBinPackIterator(ctx, rankSource, nil, evict, 0) + s.binPack = NewBinPackIterator(ctx, rankSource, evict, 0) // Apply the job anti-affinity iterator. This is to avoid placing // multiple allocations on the same node for this job. The penalty @@ -149,7 +149,7 @@ func (s *GenericStack) Select(tg *structs.TaskGroup) (*RankedNode, *structs.Reso // Update the parameters of iterators s.taskGroupDrivers.SetDrivers(drivers) s.taskGroupConstraint.SetConstraints(constr) - s.binPack.SetResources(size) + s.binPack.SetTasks(tg.Tasks) // Find the node with the max score option := s.maxScore.Next()