Handle static port preemption when there are multiple devices

Also added test case
This commit is contained in:
Preetha Appan 2018-11-02 09:09:50 -05:00
parent c33469157d
commit 4182444937
No known key found for this signature in database
GPG Key ID: 9F7C19990A50EAFC
2 changed files with 132 additions and 25 deletions

View File

@ -260,11 +260,15 @@ func (p *Preemptor) PreemptForNetwork(networkResourceAsk *structs.NetworkResourc
deviceToAllocs := make(map[string][]*structs.Allocation)
MbitsNeeded := networkResourceAsk.MBits
reservedPortsNeeded := networkResourceAsk.ReservedPorts
// Build map of reserved ports needed for fast access
reservedPorts := make(map[int]interface{})
for _, port := range reservedPortsNeeded {
reservedPorts[port.Value] = struct{}{}
}
usedPortToAlloc := make(map[int]*structs.Allocation)
// Create a map from each device to allocs
// We can only preempt within allocations that
// are using the same device
@ -294,6 +298,14 @@ func (p *Preemptor) PreemptForNetwork(networkResourceAsk *structs.NetworkResourc
allocsForDevice := deviceToAllocs[device]
allocsForDevice = append(allocsForDevice, alloc)
deviceToAllocs[device] = allocsForDevice
// Populate map from used reserved ports to allocation
for _, n := range allocResources.Flattened.Networks {
reservedPorts := n.ReservedPorts
for _, p := range reservedPorts {
usedPortToAlloc[p.Value] = alloc
}
}
}
}
@ -323,31 +335,33 @@ OUTER:
// Reset allocsToPreempt since we don't want to preempt across devices for the same task
allocsToPreempt = nil
// Build map from used reserved ports to allocation
usedPortToAlloc := make(map[int]*structs.Allocation)
// First try to satisfy needed reserved ports
if len(reservedPortsNeeded) > 0 {
for _, alloc := range currentAllocs {
allocResources := p.allocDetails[alloc.ID].resources
for _, n := range allocResources.Flattened.Networks {
reservedPorts := n.ReservedPorts
for _, p := range reservedPorts {
usedPortToAlloc[p.Value] = alloc
}
}
}
// Look for allocs that are using reserved ports needed
for _, port := range reservedPortsNeeded {
alloc, ok := usedPortToAlloc[port.Value]
if ok {
// If the reserved port is in the map, we need
// to check if the device matches.
allocResources := p.allocDetails[alloc.ID].resources
allocDevice := allocResources.Flattened.Networks[0].Device
if allocDevice == device {
preemptedBandwidth += allocResources.Flattened.Networks[0].MBits
allocsToPreempt = append(allocsToPreempt, alloc)
} else {
// If its on a different device we continue to the
// outer loop because the current device cannot meet
// a needed, used reserved port
continue OUTER
}
}
// If the reserved port is not in the usedPortToAlloc map it means
// no alloc is using it.
}
// Remove allocs that were preempted to satisfy reserved ports
currentAllocs = structs.RemoveAllocs(currentAllocs, allocsToPreempt)
}

View File

@ -163,7 +163,8 @@ func TestPreemption(t *testing.T) {
// Create some persistent alloc ids to use in test cases
allocIDs := []string{uuid.Generate(), uuid.Generate(), uuid.Generate(), uuid.Generate(), uuid.Generate()}
nodeResources := &structs.Resources{
// TODO(preetha): Switch to using NodeResources and NodeReservedResources
defaultNodeResources := &structs.Resources{
CPU: 4000,
MemoryMB: 8192,
DiskMB: 100 * 1024,
@ -208,7 +209,7 @@ func TestPreemption(t *testing.T) {
},
})},
nodeReservedCapacity: reservedNodeResources,
nodeCapacity: nodeResources,
nodeCapacity: defaultNodeResources,
jobPriority: 100,
resourceAsk: &structs.Resources{
CPU: 2000,
@ -241,7 +242,7 @@ func TestPreemption(t *testing.T) {
},
})},
nodeReservedCapacity: reservedNodeResources,
nodeCapacity: nodeResources,
nodeCapacity: defaultNodeResources,
jobPriority: 100,
resourceAsk: &structs.Resources{
CPU: 4000,
@ -295,7 +296,7 @@ func TestPreemption(t *testing.T) {
}),
},
nodeReservedCapacity: reservedNodeResources,
nodeCapacity: nodeResources,
nodeCapacity: defaultNodeResources,
jobPriority: 100,
resourceAsk: &structs.Resources{
CPU: 600,
@ -316,6 +317,97 @@ func TestPreemption(t *testing.T) {
},
},
},
{
desc: "preempt only from device that has allocation with used reserved port",
currentAllocations: []*structs.Allocation{
createAlloc(allocIDs[0], highPrioJob, &structs.Resources{
CPU: 1200,
MemoryMB: 2256,
DiskMB: 4 * 1024,
IOPS: 50,
Networks: []*structs.NetworkResource{
{
Device: "eth0",
IP: "192.168.0.100",
MBits: 150,
},
},
}),
createAlloc(allocIDs[1], lowPrioJob, &structs.Resources{
CPU: 200,
MemoryMB: 256,
DiskMB: 4 * 1024,
IOPS: 50,
Networks: []*structs.NetworkResource{
{
Device: "eth1",
IP: "192.168.0.200",
MBits: 600,
ReservedPorts: []structs.Port{
{
Label: "db",
Value: 88,
},
},
},
},
}),
createAlloc(allocIDs[2], lowPrioJob, &structs.Resources{
CPU: 200,
MemoryMB: 256,
DiskMB: 4 * 1024,
IOPS: 50,
Networks: []*structs.NetworkResource{
{
Device: "eth0",
IP: "192.168.0.200",
MBits: 600,
},
},
}),
},
nodeReservedCapacity: reservedNodeResources,
// This test sets up a node with two NICs
nodeCapacity: &structs.Resources{
CPU: 4000,
MemoryMB: 8192,
DiskMB: 100 * 1024,
IOPS: 150,
Networks: []*structs.NetworkResource{
{
Device: "eth0",
CIDR: "192.168.0.100/32",
MBits: 1000,
},
{
Device: "eth1",
CIDR: "192.168.1.100/32",
MBits: 1000,
},
},
},
jobPriority: 100,
resourceAsk: &structs.Resources{
CPU: 600,
MemoryMB: 1000,
DiskMB: 25 * 1024,
Networks: []*structs.NetworkResource{
{
IP: "192.168.0.100",
MBits: 700,
ReservedPorts: []structs.Port{
{
Label: "db",
Value: 88,
},
},
},
},
},
preemptedAllocIDs: map[string]struct{}{
allocIDs[1]: {},
},
},
{
desc: "Combination of high/low priority allocs, without static ports",
currentAllocations: []*structs.Allocation{
@ -365,7 +457,7 @@ func TestPreemption(t *testing.T) {
}),
},
nodeReservedCapacity: reservedNodeResources,
nodeCapacity: nodeResources,
nodeCapacity: defaultNodeResources,
jobPriority: 100,
resourceAsk: &structs.Resources{
CPU: 1100,
@ -426,7 +518,7 @@ func TestPreemption(t *testing.T) {
}),
},
nodeReservedCapacity: reservedNodeResources,
nodeCapacity: nodeResources,
nodeCapacity: defaultNodeResources,
jobPriority: 100,
resourceAsk: &structs.Resources{
CPU: 1000,
@ -487,7 +579,7 @@ func TestPreemption(t *testing.T) {
}),
},
nodeReservedCapacity: reservedNodeResources,
nodeCapacity: nodeResources,
nodeCapacity: defaultNodeResources,
jobPriority: 100,
resourceAsk: &structs.Resources{
CPU: 300,
@ -555,7 +647,7 @@ func TestPreemption(t *testing.T) {
}),
},
nodeReservedCapacity: reservedNodeResources,
nodeCapacity: nodeResources,
nodeCapacity: defaultNodeResources,
jobPriority: 100,
resourceAsk: &structs.Resources{
CPU: 2700,
@ -630,7 +722,7 @@ func TestPreemption(t *testing.T) {
}),
},
nodeReservedCapacity: reservedNodeResources,
nodeCapacity: nodeResources,
nodeCapacity: defaultNodeResources,
jobPriority: 100,
resourceAsk: &structs.Resources{
CPU: 600,
@ -698,7 +790,7 @@ func TestPreemption(t *testing.T) {
}),
},
nodeReservedCapacity: reservedNodeResources,
nodeCapacity: nodeResources,
nodeCapacity: defaultNodeResources,
jobPriority: 100,
resourceAsk: &structs.Resources{
CPU: 300,
@ -775,7 +867,7 @@ func TestPreemption(t *testing.T) {
}),
},
nodeReservedCapacity: reservedNodeResources,
nodeCapacity: nodeResources,
nodeCapacity: defaultNodeResources,
jobPriority: 100,
resourceAsk: &structs.Resources{
CPU: 1000,
@ -800,6 +892,7 @@ func TestPreemption(t *testing.T) {
node := mock.Node()
node.Resources = tc.nodeCapacity
node.Reserved = tc.nodeReservedCapacity
node.NodeResources = nil
state, ctx := testContext(t)
nodes := []*RankedNode{