Merge pull request #4881 from hashicorp/f-device-preemption

Device preemption
This commit is contained in:
Preetha 2018-12-11 18:34:19 -06:00 committed by GitHub
commit f406e66ab8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 594 additions and 8 deletions

View File

@ -129,3 +129,14 @@ func (d *DeviceAccounter) AddReserved(res *AllocatedDeviceResource) (collision b
return
}
// FreeCount returns the number of free device instances
func (i *DeviceAccounterInstance) FreeCount() int {
count := 0
for _, c := range i.Instances {
if c == 0 {
count++
}
}
return count
}

View File

@ -113,13 +113,17 @@ type Preemptor struct {
// currentAllocs is the candidate set used to find preemptible allocations
currentAllocs []*structs.Allocation
// ctx is the context from the scheduler stack
ctx Context
}
func NewPreemptor(jobPriority int) *Preemptor {
func NewPreemptor(jobPriority int, ctx Context) *Preemptor {
return &Preemptor{
currentPreemptions: make(map[structs.NamespacedID]map[string]int),
jobPriority: jobPriority,
allocDetails: make(map[string]*allocInfo),
ctx: ctx,
}
}
@ -435,6 +439,156 @@ OUTER:
return filteredBestAllocs
}
// deviceGroupAllocs represents a group of allocs that share a device
type deviceGroupAllocs struct {
allocs []*structs.Allocation
// deviceInstances tracks the number of instances used per alloc
deviceInstances map[string]int
}
func newAllocDeviceGroup() *deviceGroupAllocs {
return &deviceGroupAllocs{
deviceInstances: make(map[string]int),
}
}
// PreemptForDevice tries to find allocations to preempt to meet devices needed
// This is called once per device request when assigning devices to the task
func (p *Preemptor) PreemptForDevice(ask *structs.RequestedDevice, devAlloc *deviceAllocator) []*structs.Allocation {
// Group allocations by device, tracking the number of
// instances used in each device by alloc id
deviceToAllocs := make(map[structs.DeviceIdTuple]*deviceGroupAllocs)
for _, alloc := range p.currentAllocs {
for _, tr := range alloc.AllocatedResources.Tasks {
// Ignore allocs that don't use devices
if len(tr.Devices) == 0 {
continue
}
// Go through each assigned device group
for _, device := range tr.Devices {
// Look up the device instance from the device allocator
deviceIdTuple := *device.ID()
devInst := devAlloc.Devices[deviceIdTuple]
// devInst can be nil if the device is no longer healthy
if devInst == nil {
continue
}
// Ignore if the device doesn't match the ask
if !nodeDeviceMatches(p.ctx, devInst.Device, ask) {
continue
}
// Store both the alloc and the number of instances used
// in our tracking map
allocDeviceGrp := deviceToAllocs[deviceIdTuple]
if allocDeviceGrp == nil {
allocDeviceGrp = newAllocDeviceGroup()
deviceToAllocs[deviceIdTuple] = allocDeviceGrp
}
allocDeviceGrp.allocs = append(allocDeviceGrp.allocs, alloc)
allocDeviceGrp.deviceInstances[alloc.ID] += len(device.DeviceIDs)
}
}
}
neededCount := ask.Count
var preemptionOptions []*deviceGroupAllocs
// Examine matching allocs by device
OUTER:
for deviceIDTuple, allocsGrp := range deviceToAllocs {
// First group and sort allocations using this device by priority
allocsByPriority := filterAndGroupPreemptibleAllocs(p.jobPriority, allocsGrp.allocs)
// Reset preempted count for this device
preemptedCount := 0
// Initialize slice of preempted allocations
var preemptedAllocs []*structs.Allocation
for _, grpAllocs := range allocsByPriority {
for _, alloc := range grpAllocs.allocs {
// Look up the device instance from the device allocator
devInst := devAlloc.Devices[deviceIDTuple]
// Add to preemption list because this device matches
preemptedCount += allocsGrp.deviceInstances[alloc.ID]
preemptedAllocs = append(preemptedAllocs, alloc)
// Check if we met needed count
if preemptedCount+devInst.FreeCount() >= int(neededCount) {
preemptionOptions = append(preemptionOptions, &deviceGroupAllocs{
allocs: preemptedAllocs,
deviceInstances: allocsGrp.deviceInstances,
})
continue OUTER
}
}
}
}
// Find the combination of allocs with lowest net priority
if len(preemptionOptions) > 0 {
return selectBestAllocs(preemptionOptions, int(neededCount))
}
return nil
}
// selectBestAllocs finds the best allocations based on minimal net priority amongst
// all options. The net priority is the sum of unique priorities in each option
func selectBestAllocs(preemptionOptions []*deviceGroupAllocs, neededCount int) []*structs.Allocation {
bestPriority := math.MaxInt32
var bestAllocs []*structs.Allocation
// We iterate over allocations in priority order, so its possible
// that we have more allocations than needed to meet the needed count.
// e.g we need 4 instances, and we get 3 from a priority 10 alloc, and 4 from
// a priority 20 alloc. We should filter out the priority 10 alloc in that case.
// This loop does a filter and chooses the set with the smallest net priority
for _, allocGrp := range preemptionOptions {
// Find unique priorities and add them to calculate net priority
priorities := map[int]struct{}{}
netPriority := 0
devInst := allocGrp.deviceInstances
var filteredAllocs []*structs.Allocation
// Sort by number of device instances used, descending
sort.Slice(allocGrp.allocs, func(i, j int) bool {
instanceCount1 := devInst[allocGrp.allocs[i].ID]
instanceCount2 := devInst[allocGrp.allocs[j].ID]
return instanceCount1 > instanceCount2
})
// Filter and calculate net priority
preemptedInstanceCount := 0
for _, alloc := range allocGrp.allocs {
if preemptedInstanceCount >= neededCount {
break
}
instanceCount := devInst[alloc.ID]
preemptedInstanceCount += instanceCount
filteredAllocs = append(filteredAllocs, alloc)
_, ok := priorities[alloc.Job.Priority]
if !ok {
priorities[alloc.Job.Priority] = struct{}{}
netPriority += alloc.Job.Priority
}
}
if netPriority < bestPriority {
bestPriority = netPriority
bestAllocs = filteredAllocs
}
}
return bestAllocs
}
// basicResourceDistance computes a distance using a coordinate system. It compares resource fields like CPU/Memory and Disk.
// Values emitted are in the range [0, maxFloat]
func basicResourceDistance(resourceAsk *structs.ComparableResources, resourceUsed *structs.ComparableResources) float64 {

View File

@ -4,9 +4,12 @@ import (
"fmt"
"testing"
"strconv"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
psstructs "github.com/hashicorp/nomad/plugins/shared/structs"
"github.com/stretchr/testify/require"
)
@ -157,10 +160,15 @@ func TestPreemption(t *testing.T) {
lowPrioJob.Priority = 30
lowPrioJob2 := mock.Job()
lowPrioJob2.Priority = 30
lowPrioJob2.Priority = 40
// Create some persistent alloc ids to use in test cases
allocIDs := []string{uuid.Generate(), uuid.Generate(), uuid.Generate(), uuid.Generate(), uuid.Generate()}
allocIDs := []string{uuid.Generate(), uuid.Generate(), uuid.Generate(), uuid.Generate(), uuid.Generate(), uuid.Generate()}
var deviceIDs []string
for i := 0; i < 10; i++ {
deviceIDs = append(deviceIDs, "dev"+strconv.Itoa(i))
}
defaultNodeResources := &structs.NodeResources{
Cpu: structs.NodeCpuResources{
@ -179,6 +187,88 @@ func TestPreemption(t *testing.T) {
MBits: 1000,
},
},
Devices: []*structs.NodeDeviceResource{
{
Type: "gpu",
Vendor: "nvidia",
Name: "1080ti",
Attributes: map[string]*psstructs.Attribute{
"memory": psstructs.NewIntAttribute(11, psstructs.UnitGiB),
"cuda_cores": psstructs.NewIntAttribute(3584, ""),
"graphics_clock": psstructs.NewIntAttribute(1480, psstructs.UnitMHz),
"memory_bandwidth": psstructs.NewIntAttribute(11, psstructs.UnitGBPerS),
},
Instances: []*structs.NodeDevice{
{
ID: deviceIDs[0],
Healthy: true,
},
{
ID: deviceIDs[1],
Healthy: true,
},
{
ID: deviceIDs[2],
Healthy: true,
},
{
ID: deviceIDs[3],
Healthy: true,
},
},
},
{
Type: "gpu",
Vendor: "nvidia",
Name: "2080ti",
Attributes: map[string]*psstructs.Attribute{
"memory": psstructs.NewIntAttribute(11, psstructs.UnitGiB),
"cuda_cores": psstructs.NewIntAttribute(3584, ""),
"graphics_clock": psstructs.NewIntAttribute(1480, psstructs.UnitMHz),
"memory_bandwidth": psstructs.NewIntAttribute(11, psstructs.UnitGBPerS),
},
Instances: []*structs.NodeDevice{
{
ID: deviceIDs[4],
Healthy: true,
},
{
ID: deviceIDs[5],
Healthy: true,
},
{
ID: deviceIDs[6],
Healthy: true,
},
{
ID: deviceIDs[7],
Healthy: true,
},
{
ID: deviceIDs[8],
Healthy: true,
},
},
},
{
Type: "fpga",
Vendor: "intel",
Name: "F100",
Attributes: map[string]*psstructs.Attribute{
"memory": psstructs.NewIntAttribute(4, psstructs.UnitGiB),
},
Instances: []*structs.NodeDevice{
{
ID: "fpga1",
Healthy: true,
},
{
ID: "fpga2",
Healthy: false,
},
},
},
},
}
reservedNodeResources := &structs.NodeReservedResources{
@ -807,6 +897,288 @@ func TestPreemption(t *testing.T) {
allocIDs[1]: {},
},
},
{
desc: "Preemption with one device instance per alloc",
// Add allocations that use two device instances
currentAllocations: []*structs.Allocation{
createAllocWithDevice(allocIDs[0], lowPrioJob, &structs.Resources{
CPU: 500,
MemoryMB: 512,
DiskMB: 4 * 1024,
}, &structs.AllocatedDeviceResource{
Type: "gpu",
Vendor: "nvidia",
Name: "1080ti",
DeviceIDs: []string{deviceIDs[0]},
}),
createAllocWithDevice(allocIDs[1], lowPrioJob, &structs.Resources{
CPU: 200,
MemoryMB: 512,
DiskMB: 4 * 1024,
}, &structs.AllocatedDeviceResource{
Type: "gpu",
Vendor: "nvidia",
Name: "1080ti",
DeviceIDs: []string{deviceIDs[1]},
})},
nodeReservedCapacity: reservedNodeResources,
nodeCapacity: defaultNodeResources,
jobPriority: 100,
resourceAsk: &structs.Resources{
CPU: 1000,
MemoryMB: 512,
DiskMB: 4 * 1024,
Devices: []*structs.RequestedDevice{
{
Name: "nvidia/gpu/1080ti",
Count: 4,
},
},
},
preemptedAllocIDs: map[string]struct{}{
allocIDs[0]: {},
allocIDs[1]: {},
},
},
{
desc: "Preemption multiple devices used",
currentAllocations: []*structs.Allocation{
createAllocWithDevice(allocIDs[0], lowPrioJob, &structs.Resources{
CPU: 500,
MemoryMB: 512,
DiskMB: 4 * 1024,
}, &structs.AllocatedDeviceResource{
Type: "gpu",
Vendor: "nvidia",
Name: "1080ti",
DeviceIDs: []string{deviceIDs[0], deviceIDs[1], deviceIDs[2], deviceIDs[3]},
}),
createAllocWithDevice(allocIDs[1], lowPrioJob, &structs.Resources{
CPU: 200,
MemoryMB: 512,
DiskMB: 4 * 1024,
}, &structs.AllocatedDeviceResource{
Type: "fpga",
Vendor: "intel",
Name: "F100",
DeviceIDs: []string{"fpga1"},
})},
nodeReservedCapacity: reservedNodeResources,
nodeCapacity: defaultNodeResources,
jobPriority: 100,
resourceAsk: &structs.Resources{
CPU: 1000,
MemoryMB: 512,
DiskMB: 4 * 1024,
Devices: []*structs.RequestedDevice{
{
Name: "nvidia/gpu/1080ti",
Count: 4,
},
},
},
preemptedAllocIDs: map[string]struct{}{
allocIDs[0]: {},
},
},
{
// This test cases creates allocations across two GPUs
// Both GPUs are eligible for the task, but only allocs sharing the
// same device should be chosen for preemption
desc: "Preemption with allocs across multiple devices that match",
currentAllocations: []*structs.Allocation{
createAllocWithDevice(allocIDs[0], lowPrioJob, &structs.Resources{
CPU: 500,
MemoryMB: 512,
DiskMB: 4 * 1024,
}, &structs.AllocatedDeviceResource{
Type: "gpu",
Vendor: "nvidia",
Name: "1080ti",
DeviceIDs: []string{deviceIDs[0], deviceIDs[1]},
}),
createAllocWithDevice(allocIDs[1], highPrioJob, &structs.Resources{
CPU: 200,
MemoryMB: 100,
DiskMB: 4 * 1024,
}, &structs.AllocatedDeviceResource{
Type: "gpu",
Vendor: "nvidia",
Name: "1080ti",
DeviceIDs: []string{deviceIDs[2]},
}),
createAllocWithDevice(allocIDs[2], lowPrioJob, &structs.Resources{
CPU: 200,
MemoryMB: 256,
DiskMB: 4 * 1024,
}, &structs.AllocatedDeviceResource{
Type: "gpu",
Vendor: "nvidia",
Name: "2080ti",
DeviceIDs: []string{deviceIDs[4], deviceIDs[5]},
}),
createAllocWithDevice(allocIDs[3], lowPrioJob, &structs.Resources{
CPU: 100,
MemoryMB: 256,
DiskMB: 4 * 1024,
}, &structs.AllocatedDeviceResource{
Type: "gpu",
Vendor: "nvidia",
Name: "2080ti",
DeviceIDs: []string{deviceIDs[6], deviceIDs[7]},
}),
createAllocWithDevice(allocIDs[4], lowPrioJob, &structs.Resources{
CPU: 200,
MemoryMB: 512,
DiskMB: 4 * 1024,
}, &structs.AllocatedDeviceResource{
Type: "fpga",
Vendor: "intel",
Name: "F100",
DeviceIDs: []string{"fpga1"},
})},
nodeReservedCapacity: reservedNodeResources,
nodeCapacity: defaultNodeResources,
jobPriority: 100,
resourceAsk: &structs.Resources{
CPU: 1000,
MemoryMB: 512,
DiskMB: 4 * 1024,
Devices: []*structs.RequestedDevice{
{
Name: "gpu",
Count: 4,
},
},
},
preemptedAllocIDs: map[string]struct{}{
allocIDs[2]: {},
allocIDs[3]: {},
},
},
{
// This test cases creates allocations across two GPUs
// Both GPUs are eligible for the task, but only allocs with the lower
// priority are chosen
desc: "Preemption with lower/higher priority combinations",
currentAllocations: []*structs.Allocation{
createAllocWithDevice(allocIDs[0], lowPrioJob, &structs.Resources{
CPU: 500,
MemoryMB: 512,
DiskMB: 4 * 1024,
}, &structs.AllocatedDeviceResource{
Type: "gpu",
Vendor: "nvidia",
Name: "1080ti",
DeviceIDs: []string{deviceIDs[0], deviceIDs[1]},
}),
createAllocWithDevice(allocIDs[1], lowPrioJob2, &structs.Resources{
CPU: 200,
MemoryMB: 100,
DiskMB: 4 * 1024,
}, &structs.AllocatedDeviceResource{
Type: "gpu",
Vendor: "nvidia",
Name: "1080ti",
DeviceIDs: []string{deviceIDs[2], deviceIDs[3]},
}),
createAllocWithDevice(allocIDs[2], lowPrioJob, &structs.Resources{
CPU: 200,
MemoryMB: 256,
DiskMB: 4 * 1024,
}, &structs.AllocatedDeviceResource{
Type: "gpu",
Vendor: "nvidia",
Name: "2080ti",
DeviceIDs: []string{deviceIDs[4], deviceIDs[5]},
}),
createAllocWithDevice(allocIDs[3], lowPrioJob, &structs.Resources{
CPU: 100,
MemoryMB: 256,
DiskMB: 4 * 1024,
}, &structs.AllocatedDeviceResource{
Type: "gpu",
Vendor: "nvidia",
Name: "2080ti",
DeviceIDs: []string{deviceIDs[6], deviceIDs[7]},
}),
createAllocWithDevice(allocIDs[4], lowPrioJob, &structs.Resources{
CPU: 100,
MemoryMB: 256,
DiskMB: 4 * 1024,
}, &structs.AllocatedDeviceResource{
Type: "gpu",
Vendor: "nvidia",
Name: "2080ti",
DeviceIDs: []string{deviceIDs[8]},
}),
createAllocWithDevice(allocIDs[5], lowPrioJob, &structs.Resources{
CPU: 200,
MemoryMB: 512,
DiskMB: 4 * 1024,
}, &structs.AllocatedDeviceResource{
Type: "fpga",
Vendor: "intel",
Name: "F100",
DeviceIDs: []string{"fpga1"},
})},
nodeReservedCapacity: reservedNodeResources,
nodeCapacity: defaultNodeResources,
jobPriority: 100,
resourceAsk: &structs.Resources{
CPU: 1000,
MemoryMB: 512,
DiskMB: 4 * 1024,
Devices: []*structs.RequestedDevice{
{
Name: "gpu",
Count: 4,
},
},
},
preemptedAllocIDs: map[string]struct{}{
allocIDs[2]: {},
allocIDs[3]: {},
},
},
{
desc: "Device preemption not possible due to more instances needed than available",
currentAllocations: []*structs.Allocation{
createAllocWithDevice(allocIDs[0], lowPrioJob, &structs.Resources{
CPU: 500,
MemoryMB: 512,
DiskMB: 4 * 1024,
}, &structs.AllocatedDeviceResource{
Type: "gpu",
Vendor: "nvidia",
Name: "1080ti",
DeviceIDs: []string{deviceIDs[0], deviceIDs[1], deviceIDs[2], deviceIDs[3]},
}),
createAllocWithDevice(allocIDs[1], lowPrioJob, &structs.Resources{
CPU: 200,
MemoryMB: 512,
DiskMB: 4 * 1024,
}, &structs.AllocatedDeviceResource{
Type: "fpga",
Vendor: "intel",
Name: "F100",
DeviceIDs: []string{"fpga1"},
})},
nodeReservedCapacity: reservedNodeResources,
nodeCapacity: defaultNodeResources,
jobPriority: 100,
resourceAsk: &structs.Resources{
CPU: 1000,
MemoryMB: 512,
DiskMB: 4 * 1024,
Devices: []*structs.RequestedDevice{
{
Name: "gpu",
Count: 6,
},
},
},
},
// This test case exercises the code path for a final filtering step that tries to
// minimize the number of preemptible allocations
{
@ -925,6 +1297,10 @@ func TestPreemption(t *testing.T) {
// helper method to create allocations with given jobs and resources
func createAlloc(id string, job *structs.Job, resource *structs.Resources) *structs.Allocation {
return createAllocWithDevice(id, job, resource, nil)
}
func createAllocWithDevice(id string, job *structs.Job, resource *structs.Resources, allocatedDevices *structs.AllocatedDeviceResource) *structs.Allocation {
alloc := &structs.Allocation{
ID: id,
Job: job,
@ -938,6 +1314,23 @@ func createAlloc(id string, job *structs.Job, resource *structs.Resources) *stru
DesiredStatus: structs.AllocDesiredStatusRun,
ClientStatus: structs.AllocClientStatusRunning,
TaskGroup: "web",
AllocatedResources: &structs.AllocatedResources{
Tasks: map[string]*structs.AllocatedTaskResources{
"web": {
Cpu: structs.AllocatedCpuResources{
CpuShares: int64(resource.CPU),
},
Memory: structs.AllocatedMemoryResources{
MemoryMB: int64(resource.MemoryMB),
},
Networks: resource.Networks,
},
},
},
}
if allocatedDevices != nil {
alloc.AllocatedResources.Tasks["web"].Devices = []*structs.AllocatedDeviceResource{allocatedDevices}
}
return alloc
}

View File

@ -211,7 +211,7 @@ OUTER:
var allocsToPreempt []*structs.Allocation
// Initialize preemptor with node
preemptor := NewPreemptor(iter.priority)
preemptor := NewPreemptor(iter.priority, iter.ctx)
preemptor.SetNode(option.Node)
// Count the number of existing preemptions
@ -251,7 +251,7 @@ OUTER:
netPreemptions := preemptor.PreemptForNetwork(ask, netIdx)
if netPreemptions == nil {
iter.ctx.Logger().Named("binpack").Error(fmt.Sprintf("unable to meet network resource %v after preemption", ask))
iter.ctx.Logger().Named("binpack").Error("preemption not possible ", "network_resource", ask)
netIdx.Release()
continue OUTER
}
@ -268,7 +268,7 @@ OUTER:
offer, err = netIdx.AssignNetwork(ask)
if offer == nil {
iter.ctx.Logger().Named("binpack").Error(fmt.Sprintf("unexpected error, unable to create offer after preempting:%v", err))
iter.ctx.Logger().Named("binpack").Error("unexpected error, unable to create network offer after considering preemption", "error", err)
netIdx.Release()
continue OUTER
}
@ -285,8 +285,36 @@ OUTER:
for _, req := range task.Resources.Devices {
offer, sumAffinities, err := devAllocator.AssignDevice(req)
if offer == nil {
iter.ctx.Metrics().ExhaustedNode(option.Node, fmt.Sprintf("devices: %s", err))
continue OUTER
// If eviction is not enabled, mark this node as exhausted and continue
if !iter.evict {
iter.ctx.Metrics().ExhaustedNode(option.Node, fmt.Sprintf("devices: %s", err))
continue OUTER
}
// Attempt preemption
preemptor.SetCandidates(proposed)
devicePreemptions := preemptor.PreemptForDevice(req, devAllocator)
if devicePreemptions == nil {
iter.ctx.Logger().Named("binpack").Error("preemption not possible", "requested_device", req)
netIdx.Release()
continue OUTER
}
allocsToPreempt = append(allocsToPreempt, devicePreemptions...)
// First subtract out preempted allocations
proposed = structs.RemoveAllocs(proposed, allocsToPreempt)
// Reset the device allocator with new set of proposed allocs
devAllocator := newDeviceAllocator(iter.ctx, option.Node)
devAllocator.AddAllocs(proposed)
// Try offer again
offer, sumAffinities, err = devAllocator.AssignDevice(req)
if offer == nil {
iter.ctx.Logger().Named("binpack").Error("unexpected error, unable to create device offer after considering preemption", "error", err)
continue OUTER
}
}
// Store the resource