Scheduler changes to support network at task group level

Also includes unit tests for binpacker and preemption.
The tests verify that network resources specified at the
task group level are properly accounted for
This commit is contained in:
Preetha Appan 2019-07-03 13:29:47 -05:00 committed by Nick Ethier
parent af7f34604e
commit 99eca85206
No known key found for this signature in database
GPG Key ID: 07C1A3ECED90D24A
5 changed files with 287 additions and 6 deletions

View File

@ -113,6 +113,15 @@ func (idx *NetworkIndex) AddAllocs(allocs []*Allocation) (collide bool) {
}
if alloc.AllocatedResources != nil {
// Add network resources that are at the task group level
if len(alloc.AllocatedResources.Shared.Networks) > 0 {
for _, network := range alloc.AllocatedResources.Shared.Networks {
if idx.AddReserved(network) {
collide = true
}
}
}
for _, task := range alloc.AllocatedResources.Tasks {
if len(task.Networks) == 0 {
continue

View File

@ -2857,6 +2857,14 @@ func (a *AllocatedResources) Comparable() *ComparableResources {
for _, r := range a.Tasks {
c.Flattened.Add(r)
}
// Add network resources that are at the task group level
if len(a.Shared.Networks) > 0 {
for _, network := range a.Shared.Networks {
c.Flattened.Add(&AllocatedTaskResources{
Networks: []*NetworkResource{network},
})
}
}
return c
}
@ -8079,7 +8087,7 @@ func (a *Allocation) SetEventDisplayMessages() {
}
// COMPAT(0.11): Remove in 0.11
// ComparableResources returns the resouces on the allocation
// ComparableResources returns the resources on the allocation
// handling upgrade paths. After 0.11 calls to this should be replaced with:
// alloc.AllocatedResources.Comparable()
func (a *Allocation) ComparableResources() *ComparableResources {

View File

@ -512,7 +512,7 @@ func TestPreemption(t *testing.T) {
},
},
}),
createAlloc(allocIDs[1], lowPrioJob, &structs.Resources{
createAllocWithTaskgroupNetwork(allocIDs[1], lowPrioJob, &structs.Resources{
CPU: 200,
MemoryMB: 256,
DiskMB: 4 * 1024,
@ -520,9 +520,13 @@ func TestPreemption(t *testing.T) {
{
Device: "eth0",
IP: "192.168.0.200",
MBits: 500,
MBits: 200,
},
},
}, &structs.NetworkResource{
Device: "eth0",
IP: "192.168.0.201",
MBits: 300,
}),
createAlloc(allocIDs[2], lowPrioJob, &structs.Resources{
CPU: 200,
@ -1379,10 +1383,19 @@ func TestPreemption(t *testing.T) {
// helper method to create allocations with given jobs and resources
func createAlloc(id string, job *structs.Job, resource *structs.Resources) *structs.Allocation {
return createAllocWithDevice(id, job, resource, nil)
return createAllocInner(id, job, resource, nil, nil)
}
// helper method to create allocation with network at the task group level
func createAllocWithTaskgroupNetwork(id string, job *structs.Job, resource *structs.Resources, tgNet *structs.NetworkResource) *structs.Allocation {
return createAllocInner(id, job, resource, nil, tgNet)
}
func createAllocWithDevice(id string, job *structs.Job, resource *structs.Resources, allocatedDevices *structs.AllocatedDeviceResource) *structs.Allocation {
return createAllocInner(id, job, resource, allocatedDevices, nil)
}
func createAllocInner(id string, job *structs.Job, resource *structs.Resources, allocatedDevices *structs.AllocatedDeviceResource, tgNetwork *structs.NetworkResource) *structs.Allocation {
alloc := &structs.Allocation{
ID: id,
Job: job,
@ -1413,5 +1426,11 @@ func createAllocWithDevice(id string, job *structs.Job, resource *structs.Resour
if allocatedDevices != nil {
alloc.AllocatedResources.Tasks["web"].Devices = []*structs.AllocatedDeviceResource{allocatedDevices}
}
if tgNetwork != nil {
alloc.AllocatedResources.Shared = structs.AllocatedSharedResources{
Networks: []*structs.NetworkResource{tgNetwork},
}
}
return alloc
}

View File

@ -6,7 +6,7 @@ import (
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
require "github.com/stretchr/testify/require"
"github.com/stretchr/testify/require"
)
func TestFeasibleRankIterator(t *testing.T) {
@ -127,6 +127,246 @@ func TestBinPackIterator_NoExistingAlloc(t *testing.T) {
}
}
// Tests bin packing iterator with network resources at task and task group level
func TestBinPackIterator_Network_Success(t *testing.T) {
_, ctx := testContext(t)
nodes := []*RankedNode{
{
Node: &structs.Node{
// Perfect fit
NodeResources: &structs.NodeResources{
Cpu: structs.NodeCpuResources{
CpuShares: 2048,
},
Memory: structs.NodeMemoryResources{
MemoryMB: 2048,
},
Networks: []*structs.NetworkResource{
{
Mode: "host",
Device: "eth0",
CIDR: "192.168.0.100/32",
MBits: 1000,
},
},
},
ReservedResources: &structs.NodeReservedResources{
Cpu: structs.NodeReservedCpuResources{
CpuShares: 1024,
},
Memory: structs.NodeReservedMemoryResources{
MemoryMB: 1024,
},
Networks: structs.NodeReservedNetworkResources{
ReservedHostPorts: "1000-2000",
},
},
},
},
{
Node: &structs.Node{
// 50% fit
NodeResources: &structs.NodeResources{
Cpu: structs.NodeCpuResources{
CpuShares: 4096,
},
Memory: structs.NodeMemoryResources{
MemoryMB: 4096,
},
Networks: []*structs.NetworkResource{
{
Mode: "host",
Device: "eth0",
CIDR: "192.168.0.100/32",
MBits: 1000,
},
},
},
ReservedResources: &structs.NodeReservedResources{
Cpu: structs.NodeReservedCpuResources{
CpuShares: 1024,
},
Memory: structs.NodeReservedMemoryResources{
MemoryMB: 1024,
},
Networks: structs.NodeReservedNetworkResources{
ReservedHostPorts: "1000-2000",
},
},
},
},
}
static := NewStaticRankIterator(ctx, nodes)
// Create a task group with networks specified at task and task group level
taskGroup := &structs.TaskGroup{
EphemeralDisk: &structs.EphemeralDisk{},
Tasks: []*structs.Task{
{
Name: "web",
Resources: &structs.Resources{
CPU: 1024,
MemoryMB: 1024,
Networks: []*structs.NetworkResource{
{
Device: "eth0",
MBits: 300,
},
},
},
},
},
Networks: []*structs.NetworkResource{
{
Device: "eth0",
MBits: 500,
},
},
}
binp := NewBinPackIterator(ctx, static, false, 0)
binp.SetTaskGroup(taskGroup)
scoreNorm := NewScoreNormalizationIterator(ctx, binp)
out := collectRanked(scoreNorm)
require := require.New(t)
// We expect both nodes to be eligible to place
require.Len(out, 2)
require.Equal(out[0], nodes[0])
require.Equal(out[1], nodes[1])
// First node should have a perfect score
require.Equal(1.0, out[0].FinalScore)
if out[1].FinalScore < 0.75 || out[1].FinalScore > 0.95 {
t.Fatalf("Bad Score: %v", out[1].FinalScore)
}
// Verify network information at taskgroup level
require.Equal(500, out[0].AllocResources.Networks[0].MBits)
require.Equal(500, out[1].AllocResources.Networks[0].MBits)
// Verify network information at task level
require.Equal(300, out[0].TaskResources["web"].Networks[0].MBits)
require.Equal(300, out[1].TaskResources["web"].Networks[0].MBits)
}
// Tests that bin packing iterator fails due to overprovisioning of network
// This test has network resources at task group and task level
func TestBinPackIterator_Network_Failure(t *testing.T) {
_, ctx := testContext(t)
nodes := []*RankedNode{
{
Node: &structs.Node{
// 50% fit
NodeResources: &structs.NodeResources{
Cpu: structs.NodeCpuResources{
CpuShares: 4096,
},
Memory: structs.NodeMemoryResources{
MemoryMB: 4096,
},
Networks: []*structs.NetworkResource{
{
Mode: "host",
Device: "eth0",
CIDR: "192.168.0.100/32",
MBits: 1000,
},
},
},
ReservedResources: &structs.NodeReservedResources{
Cpu: structs.NodeReservedCpuResources{
CpuShares: 1024,
},
Memory: structs.NodeReservedMemoryResources{
MemoryMB: 1024,
},
Networks: structs.NodeReservedNetworkResources{
ReservedHostPorts: "1000-2000",
},
},
},
},
}
// Add a planned alloc that takes up some network mbits at task and task group level
plan := ctx.Plan()
plan.NodeAllocation[nodes[0].Node.ID] = []*structs.Allocation{
{
AllocatedResources: &structs.AllocatedResources{
Tasks: map[string]*structs.AllocatedTaskResources{
"web": {
Cpu: structs.AllocatedCpuResources{
CpuShares: 2048,
},
Memory: structs.AllocatedMemoryResources{
MemoryMB: 2048,
},
Networks: []*structs.NetworkResource{
{
Device: "eth0",
IP: "192.168.0.1",
MBits: 300,
},
},
},
},
Shared: structs.AllocatedSharedResources{
Networks: []*structs.NetworkResource{
{
Device: "eth0",
IP: "192.168.0.1",
MBits: 400,
},
},
},
},
},
}
static := NewStaticRankIterator(ctx, nodes)
// Create a task group with networks specified at task and task group level
taskGroup := &structs.TaskGroup{
EphemeralDisk: &structs.EphemeralDisk{},
Tasks: []*structs.Task{
{
Name: "web",
Resources: &structs.Resources{
CPU: 1024,
MemoryMB: 1024,
Networks: []*structs.NetworkResource{
{
Device: "eth0",
MBits: 300,
},
},
},
},
},
Networks: []*structs.NetworkResource{
{
Device: "eth0",
MBits: 500,
},
},
}
binp := NewBinPackIterator(ctx, static, false, 0)
binp.SetTaskGroup(taskGroup)
scoreNorm := NewScoreNormalizationIterator(ctx, binp)
out := collectRanked(scoreNorm)
require := require.New(t)
// We expect a placement failure because we need 800 mbits of network
// and only 200 is free
require.Len(out, 0)
require.Equal(1, ctx.metrics.DimensionExhausted["network: bandwidth exceeded"])
}
func TestBinPackIterator_PlannedAlloc(t *testing.T) {
_, ctx := testContext(t)
nodes := []*RankedNode{

View File

@ -344,6 +344,10 @@ func (s *SystemScheduler) computePlacements(place []allocTuple) error {
},
}
if option.AllocResources != nil {
resources.Shared.Networks = option.AllocResources.Networks
}
// Create an allocation for this
alloc := &structs.Allocation{
ID: uuid.Generate(),
@ -361,6 +365,7 @@ func (s *SystemScheduler) computePlacements(place []allocTuple) error {
ClientStatus: structs.AllocClientStatusPending,
SharedResources: &structs.Resources{
DiskMB: missing.TaskGroup.EphemeralDisk.SizeMB,
Networks: missing.TaskGroup.Networks,
},
}