From 2d2248e209e5796cb35cdf589a6482bb20e12fda Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Mon, 15 Oct 2018 13:45:38 -0700 Subject: [PATCH 1/6] Add devices to allocated resources --- nomad/structs/structs.go | 77 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 77 insertions(+) diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index d5665f9b8..d2a54b14d 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -2694,6 +2694,7 @@ type AllocatedTaskResources struct { Cpu AllocatedCpuResources Memory AllocatedMemoryResources Networks Networks + Devices []*AllocatedDeviceResource } func (a *AllocatedTaskResources) Copy() *AllocatedTaskResources { @@ -2702,6 +2703,8 @@ func (a *AllocatedTaskResources) Copy() *AllocatedTaskResources { } newA := new(AllocatedTaskResources) *newA = *a + + // Copy the networks if a.Networks != nil { n := len(a.Networks) newA.Networks = make([]*NetworkResource, n) @@ -2709,6 +2712,16 @@ func (a *AllocatedTaskResources) Copy() *AllocatedTaskResources { newA.Networks[i] = a.Networks[i].Copy() } } + + // Copy the devices + if newA.Devices != nil { + n := len(a.Devices) + newA.Devices = make([]*AllocatedDeviceResource, n) + for i := 0; i < n; i++ { + newA.Devices[i] = a.Devices[i].Copy() + } + } + return newA } @@ -2734,6 +2747,16 @@ func (a *AllocatedTaskResources) Add(delta *AllocatedTaskResources) { a.Networks[idx].Add(n) } } + + for _, d := range delta.Devices { + // Find the matching device + idx := AllocatedDevices(delta.Devices).Index(d) + if idx == -1 { + a.Devices = append(a.Devices, d.Copy()) + } else { + a.Devices[idx].Add(d) + } + } } // Comparable turns AllocatedTaskResources into ComparableResources @@ -2831,6 +2854,60 @@ func (a *AllocatedMemoryResources) Subtract(delta *AllocatedMemoryResources) { a.MemoryMB -= delta.MemoryMB } +type AllocatedDevices []*AllocatedDeviceResource + +// Index finds the matching index using the passed device. If not found, -1 is +// returned. +func (a AllocatedDevices) Index(d *AllocatedDeviceResource) int { + if d == nil { + return -1 + } + + for i, o := range a { + if o.Vendor == d.Vendor && o.Type == d.Type && o.Name == d.Name { + return i + } + } + + return -1 +} + +// AllocatedDeviceResource captures a set of allocated devices. +type AllocatedDeviceResource struct { + // Vendor, Type, and Name are used to select the plugin to request the + // device IDs from. + Vendor string + Type string + Name string + + // DeviceIDs is the set of allocated devices + DeviceIDs []string +} + +func (a *AllocatedDeviceResource) Add(delta *AllocatedDeviceResource) { + if delta == nil { + return + } + + a.DeviceIDs = append(a.DeviceIDs, delta.DeviceIDs...) +} + +func (a *AllocatedDeviceResource) Copy() *AllocatedDeviceResource { + if a == nil { + return a + } + + na := *a + + // Copy the devices + na.DeviceIDs = make([]string, len(a.DeviceIDs)) + for i, id := range a.DeviceIDs { + na.DeviceIDs[i] = id + } + + return &na +} + // ComparableResources is the set of resources allocated to a task group but // not keyed by Task, making it easier to compare. type ComparableResources struct { From feb83a2be399314c3cdbf1a5b2017f37cd6bbf9a Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Mon, 15 Oct 2018 15:15:46 -0700 Subject: [PATCH 2/6] assign devices --- nomad/mock/mock.go | 30 +++ nomad/structs/structs.go | 27 +- plugins/shared/structs/units.go | 2 +- scheduler/device.go | 200 ++++++++++++++ scheduler/device_test.go | 455 ++++++++++++++++++++++++++++++++ scheduler/feasible.go | 22 +- scheduler/feasible_test.go | 28 +- scheduler/rank.go | 15 ++ scheduler/rank_test.go | 274 +++++++++++++++++++ 9 files changed, 1018 insertions(+), 35 deletions(-) create mode 100644 scheduler/device.go create mode 100644 scheduler/device_test.go diff --git a/nomad/mock/mock.go b/nomad/mock/mock.go index 3b90ee15d..ea34170a5 100644 --- a/nomad/mock/mock.go +++ b/nomad/mock/mock.go @@ -6,6 +6,7 @@ import ( "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/structs" + psstructs "github.com/hashicorp/nomad/plugins/shared/structs" ) func Node() *structs.Node { @@ -91,6 +92,35 @@ func Node() *structs.Node { return node } +func NvidiaNode() *structs.Node { + n := Node() + n.NodeResources.Devices = []*structs.NodeDeviceResource{ + { + Type: "gpu", + Vendor: "nvidia", + Name: "1080ti", + Attributes: map[string]*psstructs.Attribute{ + "memory": psstructs.NewIntAttribute(11, psstructs.UnitGiB), + "cuda_cores": psstructs.NewIntAttribute(3584, ""), + "graphics_clock": psstructs.NewIntAttribute(1480, psstructs.UnitMHz), + "memory_bandwidth": psstructs.NewIntAttribute(11, psstructs.UnitGBPerS), + }, + Instances: []*structs.NodeDevice{ + { + ID: uuid.Generate(), + Healthy: true, + }, + { + ID: uuid.Generate(), + Healthy: true, + }, + }, + }, + } + n.ComputeClass() + return n +} + func HCL() string { return `job "my-job" { datacenters = ["dc1"] diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index d2a54b14d..f999739ae 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -2388,6 +2388,17 @@ func (id *DeviceIdTuple) Matches(other *DeviceIdTuple) bool { return true } +// Equals returns if this Device ID is the same as the passed ID. +func (id *DeviceIdTuple) Equals(o *DeviceIdTuple) bool { + if id == nil && o == nil { + return true + } else if id == nil || o == nil { + return false + } + + return o.Vendor == id.Vendor && o.Type == id.Type && o.Name == id.Name +} + // NodeDeviceResource captures a set of devices sharing a common // vendor/type/device_name tuple. type NodeDeviceResource struct { @@ -2750,7 +2761,7 @@ func (a *AllocatedTaskResources) Add(delta *AllocatedTaskResources) { for _, d := range delta.Devices { // Find the matching device - idx := AllocatedDevices(delta.Devices).Index(d) + idx := AllocatedDevices(a.Devices).Index(d) if idx == -1 { a.Devices = append(a.Devices, d.Copy()) } else { @@ -2864,7 +2875,7 @@ func (a AllocatedDevices) Index(d *AllocatedDeviceResource) int { } for i, o := range a { - if o.Vendor == d.Vendor && o.Type == d.Type && o.Name == d.Name { + if o.ID().Equals(d.ID()) { return i } } @@ -2884,6 +2895,18 @@ type AllocatedDeviceResource struct { DeviceIDs []string } +func (a *AllocatedDeviceResource) ID() *DeviceIdTuple { + if a == nil { + return nil + } + + return &DeviceIdTuple{ + Vendor: a.Vendor, + Type: a.Type, + Name: a.Name, + } +} + func (a *AllocatedDeviceResource) Add(delta *AllocatedDeviceResource) { if delta == nil { return diff --git a/plugins/shared/structs/units.go b/plugins/shared/structs/units.go index 597be99c6..4310b8443 100644 --- a/plugins/shared/structs/units.go +++ b/plugins/shared/structs/units.go @@ -206,7 +206,7 @@ var ( { Name: UnitMHz, Base: UnitHertz, - Multiplier: Pow(1000, 1), + Multiplier: Pow(1000, 2), }, { Name: UnitGHz, diff --git a/scheduler/device.go b/scheduler/device.go new file mode 100644 index 000000000..94f683904 --- /dev/null +++ b/scheduler/device.go @@ -0,0 +1,200 @@ +package scheduler + +import ( + "fmt" + + "github.com/hashicorp/nomad/nomad/structs" +) + +type deviceAllocator struct { + ctx Context + devices map[structs.DeviceIdTuple]*deviceAllocatorInstance +} + +type deviceAllocatorInstance struct { + d *structs.NodeDeviceResource + instances map[string]int +} + +// Free returns if the device is free to use. +func (d *deviceAllocatorInstance) Free(id string) bool { + uses, ok := d.instances[id] + return ok && uses == 0 +} + +func newDeviceAllocator(ctx Context, n *structs.Node) *deviceAllocator { + numDevices := 0 + var devices []*structs.NodeDeviceResource + + // COMPAT(0.11): Remove in 0.11 + if n.NodeResources != nil { + numDevices = len(n.NodeResources.Devices) + devices = n.NodeResources.Devices + } + + d := &deviceAllocator{ + ctx: ctx, + devices: make(map[structs.DeviceIdTuple]*deviceAllocatorInstance, numDevices), + } + + for _, dev := range devices { + id := *dev.ID() + d.devices[id] = &deviceAllocatorInstance{ + d: dev, + instances: make(map[string]int, len(dev.Instances)), + } + for _, instance := range dev.Instances { + // Skip unhealthy devices as they aren't allocatable + if !instance.Healthy { + continue + } + + d.devices[id].instances[instance.ID] = 0 + } + } + + return d +} + +// AddAllocs takes a set of allocations and internally marks which devices are +// used. +func (d *deviceAllocator) AddAllocs(allocs []*structs.Allocation) (collision bool) { + for _, a := range allocs { + // Filter any terminal allocation + if a.TerminalStatus() { + continue + } + + // COMPAT(0.11): Remove in 0.11 + // If the alloc doesn't have the new style resources, it can't have + // devices + if a.AllocatedResources == nil { + continue + } + + // Go through each task resource + for _, tr := range a.AllocatedResources.Tasks { + + // Go through each assigned device group + for _, device := range tr.Devices { + devID := device.ID() + + // Go through each assigned device + for _, instanceID := range device.DeviceIDs { + + // Mark that we are using the device. It may not be in the + // map if the device is no longer being fingerprinted, is + // unhealthy, etc. + if devInst, ok := d.devices[*devID]; ok { + if i, ok := devInst.instances[instanceID]; ok { + // Mark that the device is in use + devInst.instances[instanceID]++ + + if i != 0 { + collision = true + } + } + } + } + } + } + } + + return +} + +func (d *deviceAllocator) AddReserved(res *structs.AllocatedDeviceResource) (collision bool) { + devInst, ok := d.devices[*res.ID()] + if !ok { + return false + } + + for _, id := range res.DeviceIDs { + cur, ok := devInst.instances[id] + if !ok { + continue + } + + if cur != 0 { + collision = true + } + + devInst.instances[id]++ + } + + return +} + +func (d *deviceAllocator) AssignDevice(ask *structs.RequestedDevice) (out *structs.AllocatedDeviceResource, err error) { + // Try to hot path + if len(d.devices) == 0 { + return nil, fmt.Errorf("no devices available") + } + if ask.Count == 0 { + return nil, fmt.Errorf("invalid request of zero devices") + } + + // Hold the current best offer + var offer *structs.AllocatedDeviceResource + var score float64 + + // Determine the devices that are feasible based on availability and + // constraints + for id, devInst := range d.devices { + // Check if we have enough unused instances to use this + assignable := uint64(0) + for _, v := range devInst.instances { + if v == 0 { + assignable++ + } + } + + // This device doesn't have enough instances + if assignable < ask.Count { + continue + } + + // Check if the device works + if !nodeDeviceMatches(d.ctx, devInst.d, ask) { + continue + } + + // Score the choice + var choiceScore float64 + if len(ask.Affinities) != 0 { + // TODO + } + + if offer != nil && choiceScore < score { + continue + } + + // Set the new highest score + score = choiceScore + + // Build the choice + offer = &structs.AllocatedDeviceResource{ + Vendor: id.Vendor, + Type: id.Type, + Name: id.Name, + DeviceIDs: make([]string, 0, ask.Count), + } + + assigned := uint64(0) + for id, v := range devInst.instances { + if v == 0 && assigned < ask.Count { + assigned++ + offer.DeviceIDs = append(offer.DeviceIDs, id) + if assigned == ask.Count { + break + } + } + } + } + + if offer == nil { + return nil, fmt.Errorf("no devices match request") + } + + return offer, nil +} diff --git a/scheduler/device_test.go b/scheduler/device_test.go new file mode 100644 index 000000000..95772060f --- /dev/null +++ b/scheduler/device_test.go @@ -0,0 +1,455 @@ +package scheduler + +import ( + "testing" + + "github.com/hashicorp/nomad/helper/uuid" + "github.com/hashicorp/nomad/nomad/mock" + "github.com/hashicorp/nomad/nomad/structs" + psstructs "github.com/hashicorp/nomad/plugins/shared/structs" + "github.com/stretchr/testify/require" +) + +func deviceRequest(name string, count uint64, + constraints []*structs.Constraint, affinities []*structs.Affinity) *structs.RequestedDevice { + return &structs.RequestedDevice{ + Name: name, + Count: count, + Constraints: constraints, + Affinities: affinities, + } +} + +func nvidiaAllocatedDevice() *structs.AllocatedDeviceResource { + return &structs.AllocatedDeviceResource{ + Type: "gpu", + Vendor: "nvidia", + Name: "1080ti", + DeviceIDs: []string{uuid.Generate()}, + } +} + +func nvidiaAlloc() *structs.Allocation { + a := mock.Alloc() + a.AllocatedResources.Tasks["web"].Devices = []*structs.AllocatedDeviceResource{ + nvidiaAllocatedDevice(), + } + return a +} + +func devNode() *structs.Node { + n := mock.NvidiaNode() + n.NodeResources.Devices = append(n.NodeResources.Devices, &structs.NodeDeviceResource{ + Type: "fpga", + Vendor: "intel", + Name: "F100", + Attributes: map[string]*psstructs.Attribute{ + "memory": psstructs.NewIntAttribute(4, psstructs.UnitGiB), + }, + Instances: []*structs.NodeDevice{ + { + ID: uuid.Generate(), + Healthy: true, + }, + { + ID: uuid.Generate(), + Healthy: false, + }, + }, + }) + return n +} + +func multipleNvidiaNode() *structs.Node { + n := mock.NvidiaNode() + n.NodeResources.Devices = append(n.NodeResources.Devices, &structs.NodeDeviceResource{ + Type: "gpu", + Vendor: "nvidia", + Name: "2080ti", + Attributes: map[string]*psstructs.Attribute{ + "memory": psstructs.NewIntAttribute(11, psstructs.UnitGiB), + "cuda_cores": psstructs.NewIntAttribute(4352, ""), + "graphics_clock": psstructs.NewIntAttribute(1350, psstructs.UnitMHz), + "memory_bandwidth": psstructs.NewIntAttribute(14, psstructs.UnitGBPerS), + }, + Instances: []*structs.NodeDevice{ + { + ID: uuid.Generate(), + Healthy: true, + }, + { + ID: uuid.Generate(), + Healthy: true, + }, + }, + }) + return n + +} + +// collectInstanceIDs returns the IDs of the device instances +func collectInstanceIDs(devices ...*structs.NodeDeviceResource) []string { + var out []string + for _, d := range devices { + for _, i := range d.Instances { + out = append(out, i.ID) + } + } + return out +} + +// Make sure that the device allocator works even if the node has no devices +func TestDeviceAllocator_AddAllocs_NoDeviceNode(t *testing.T) { + require := require.New(t) + _, ctx := testContext(t) + n := mock.Node() + d := newDeviceAllocator(ctx, n) + require.NotNil(d) + + // Create three allocations, one with a device, one without, and one + // terminal + a1, a2, a3 := mock.Alloc(), nvidiaAlloc(), mock.Alloc() + allocs := []*structs.Allocation{a1, a2, a3} + a3.DesiredStatus = structs.AllocDesiredStatusStop + + require.False(d.AddAllocs(allocs)) + require.Len(d.devices, 0) +} + +// Add allocs to a node with a device +func TestDeviceAllocator_AddAllocs(t *testing.T) { + require := require.New(t) + _, ctx := testContext(t) + n := devNode() + d := newDeviceAllocator(ctx, n) + require.NotNil(d) + + // Create three allocations, one with a device, one without, and one + // terminal + a1, a2, a3 := mock.Alloc(), nvidiaAlloc(), mock.Alloc() + + nvidiaDev0ID := n.NodeResources.Devices[0].Instances[0].ID + intelDev0ID := n.NodeResources.Devices[1].Instances[0].ID + a2.AllocatedResources.Tasks["web"].Devices[0].DeviceIDs = []string{nvidiaDev0ID} + + allocs := []*structs.Allocation{a1, a2, a3} + a3.DesiredStatus = structs.AllocDesiredStatusStop + + require.False(d.AddAllocs(allocs)) + require.Len(d.devices, 2) + + // Check that we have two devices for nvidia and that one of them is used + nvidiaDevice, ok := d.devices[*n.NodeResources.Devices[0].ID()] + require.True(ok) + require.Len(nvidiaDevice.instances, 2) + require.Contains(nvidiaDevice.instances, nvidiaDev0ID) + require.Equal(1, nvidiaDevice.instances[nvidiaDev0ID]) + + // Check only one instance of the intel device is set up since the other is + // unhealthy + intelDevice, ok := d.devices[*n.NodeResources.Devices[1].ID()] + require.True(ok) + require.Len(intelDevice.instances, 1) + require.Equal(0, intelDevice.instances[intelDev0ID]) +} + +// Add alloc with unknown ID to a node with devices. This tests that we can +// operate on previous allocs even if the device has changed to unhealthy and we +// don't track it +func TestDeviceAllocator_AddAllocs_UnknownID(t *testing.T) { + require := require.New(t) + _, ctx := testContext(t) + n := devNode() + d := newDeviceAllocator(ctx, n) + require.NotNil(d) + + // Create three allocations, one with a device, one without, and one + // terminal + a1, a2, a3 := mock.Alloc(), nvidiaAlloc(), mock.Alloc() + + // a2 will have a random ID since it is generated + + allocs := []*structs.Allocation{a1, a2, a3} + a3.DesiredStatus = structs.AllocDesiredStatusStop + + require.False(d.AddAllocs(allocs)) + require.Len(d.devices, 2) + + // Check that we have two devices for nvidia and that one of them is used + nvidiaDevice, ok := d.devices[*n.NodeResources.Devices[0].ID()] + require.True(ok) + require.Len(nvidiaDevice.instances, 2) + for _, v := range nvidiaDevice.instances { + require.Equal(0, v) + } +} + +// Test that collision detection works +func TestDeviceAllocator_AddAllocs_Collision(t *testing.T) { + require := require.New(t) + _, ctx := testContext(t) + n := devNode() + d := newDeviceAllocator(ctx, n) + require.NotNil(d) + + // Create two allocations, both with the same device + a1, a2 := nvidiaAlloc(), nvidiaAlloc() + + nvidiaDev0ID := n.NodeResources.Devices[0].Instances[0].ID + a1.AllocatedResources.Tasks["web"].Devices[0].DeviceIDs = []string{nvidiaDev0ID} + a2.AllocatedResources.Tasks["web"].Devices[0].DeviceIDs = []string{nvidiaDev0ID} + + allocs := []*structs.Allocation{a1, a2} + require.True(d.AddAllocs(allocs)) +} + +// Make sure that the device allocator works even if the node has no devices +func TestDeviceAllocator_AddReserved_NoDeviceNode(t *testing.T) { + require := require.New(t) + _, ctx := testContext(t) + n := mock.Node() + d := newDeviceAllocator(ctx, n) + require.NotNil(d) + + require.False(d.AddReserved(nvidiaAllocatedDevice())) + require.Len(d.devices, 0) +} + +// Add reserved to a node with a device +func TestDeviceAllocator_AddReserved(t *testing.T) { + require := require.New(t) + _, ctx := testContext(t) + n := devNode() + d := newDeviceAllocator(ctx, n) + require.NotNil(d) + + nvidiaDev0ID := n.NodeResources.Devices[0].Instances[0].ID + intelDev0ID := n.NodeResources.Devices[1].Instances[0].ID + + res := nvidiaAllocatedDevice() + res.DeviceIDs = []string{nvidiaDev0ID} + + require.False(d.AddReserved(res)) + require.Len(d.devices, 2) + + // Check that we have two devices for nvidia and that one of them is used + nvidiaDevice, ok := d.devices[*n.NodeResources.Devices[0].ID()] + require.True(ok) + require.Len(nvidiaDevice.instances, 2) + require.Contains(nvidiaDevice.instances, nvidiaDev0ID) + require.Equal(1, nvidiaDevice.instances[nvidiaDev0ID]) + + // Check only one instance of the intel device is set up since the other is + // unhealthy + intelDevice, ok := d.devices[*n.NodeResources.Devices[1].ID()] + require.True(ok) + require.Len(intelDevice.instances, 1) + require.Equal(0, intelDevice.instances[intelDev0ID]) +} + +// Test that collision detection works +func TestDeviceAllocator_AddReserved_Collision(t *testing.T) { + require := require.New(t) + _, ctx := testContext(t) + n := devNode() + d := newDeviceAllocator(ctx, n) + require.NotNil(d) + + nvidiaDev0ID := n.NodeResources.Devices[0].Instances[0].ID + + // Create an alloc with nvidia + a1 := nvidiaAlloc() + a1.AllocatedResources.Tasks["web"].Devices[0].DeviceIDs = []string{nvidiaDev0ID} + require.False(d.AddAllocs([]*structs.Allocation{a1})) + + // Reserve the same device + res := nvidiaAllocatedDevice() + res.DeviceIDs = []string{nvidiaDev0ID} + require.True(d.AddReserved(res)) +} + +// Test that asking for a device on a node with no devices doesn't work +func TestDeviceAllocator_Allocate_NoDeviceNode(t *testing.T) { + require := require.New(t) + _, ctx := testContext(t) + n := mock.Node() + d := newDeviceAllocator(ctx, n) + require.NotNil(d) + + // Build the request + ask := deviceRequest("nvidia/gpu", 1, nil, nil) + + out, err := d.AssignDevice(ask) + require.Nil(out) + require.Error(err) + require.Contains(err.Error(), "no devices available") +} + +// Test that asking for a device that isn't fully specified works. +func TestDeviceAllocator_Allocate_GenericRequest(t *testing.T) { + require := require.New(t) + _, ctx := testContext(t) + n := devNode() + d := newDeviceAllocator(ctx, n) + require.NotNil(d) + + // Build the request + ask := deviceRequest("gpu", 1, nil, nil) + + out, err := d.AssignDevice(ask) + require.NotNil(out) + require.NoError(err) + + // Check that we got the nvidia device + require.Len(out.DeviceIDs, 1) + require.Contains(collectInstanceIDs(n.NodeResources.Devices[0]), out.DeviceIDs[0]) +} + +// Test that asking for a device that is fully specified works. +func TestDeviceAllocator_Allocate_FullyQualifiedRequest(t *testing.T) { + require := require.New(t) + _, ctx := testContext(t) + n := devNode() + d := newDeviceAllocator(ctx, n) + require.NotNil(d) + + // Build the request + ask := deviceRequest("intel/fpga/F100", 1, nil, nil) + + out, err := d.AssignDevice(ask) + require.NotNil(out) + require.NoError(err) + + // Check that we got the nvidia device + require.Len(out.DeviceIDs, 1) + require.Contains(collectInstanceIDs(n.NodeResources.Devices[1]), out.DeviceIDs[0]) +} + +// Test that asking for a device with too much count doesn't place +func TestDeviceAllocator_Allocate_NotEnoughInstances(t *testing.T) { + require := require.New(t) + _, ctx := testContext(t) + n := devNode() + d := newDeviceAllocator(ctx, n) + require.NotNil(d) + + // Build the request + ask := deviceRequest("gpu", 4, nil, nil) + + out, err := d.AssignDevice(ask) + require.Nil(out) + require.Error(err) + require.Contains(err.Error(), "no devices match request") +} + +// Test that asking for a device with constraints works +func TestDeviceAllocator_Allocate_Constraints(t *testing.T) { + n := multipleNvidiaNode() + nvidia0 := n.NodeResources.Devices[0] + nvidia1 := n.NodeResources.Devices[1] + + cases := []struct { + Name string + Constraints []*structs.Constraint + ExpectedDevice *structs.NodeDeviceResource + NoPlacement bool + }{ + { + Name: "gpu", + Constraints: []*structs.Constraint{ + { + LTarget: "${driver.attr.cuda_cores}", + Operand: ">", + RTarget: "4000", + }, + }, + ExpectedDevice: nvidia1, + }, + { + Name: "gpu", + Constraints: []*structs.Constraint{ + { + LTarget: "${driver.attr.cuda_cores}", + Operand: "<", + RTarget: "4000", + }, + }, + ExpectedDevice: nvidia0, + }, + { + Name: "nvidia/gpu", + Constraints: []*structs.Constraint{ + // First two are shared across both devices + { + LTarget: "${driver.attr.memory_bandwidth}", + Operand: ">", + RTarget: "10 GB/s", + }, + { + LTarget: "${driver.attr.memory}", + Operand: "is", + RTarget: "11264 MiB", + }, + { + LTarget: "${driver.attr.graphics_clock}", + Operand: ">", + RTarget: "1.4 GHz", + }, + }, + ExpectedDevice: nvidia0, + }, + { + Name: "intel/gpu", + NoPlacement: true, + }, + { + Name: "nvidia/gpu", + Constraints: []*structs.Constraint{ + { + LTarget: "${driver.attr.memory_bandwidth}", + Operand: ">", + RTarget: "10 GB/s", + }, + { + LTarget: "${driver.attr.memory}", + Operand: "is", + RTarget: "11264 MiB", + }, + // Rules both out + { + LTarget: "${driver.attr.graphics_clock}", + Operand: ">", + RTarget: "2.4 GHz", + }, + }, + NoPlacement: true, + }, + } + + for _, c := range cases { + t.Run(c.Name, func(t *testing.T) { + require := require.New(t) + _, ctx := testContext(t) + d := newDeviceAllocator(ctx, n) + require.NotNil(d) + + // Build the request + ask := deviceRequest(c.Name, 1, c.Constraints, nil) + + out, err := d.AssignDevice(ask) + if c.NoPlacement { + require.Nil(out) + } else { + require.NotNil(out) + require.NoError(err) + + // Check that we got the nvidia device + require.Len(out.DeviceIDs, 1) + require.Contains(collectInstanceIDs(c.ExpectedDevice), out.DeviceIDs[0]) + } + }) + } +} + +// TODO +// Assign with priorities to pick the best one diff --git a/scheduler/feasible.go b/scheduler/feasible.go index aa37e0f3b..ef786b1a1 100644 --- a/scheduler/feasible.go +++ b/scheduler/feasible.go @@ -7,10 +7,9 @@ import ( "strconv" "strings" - psstructs "github.com/hashicorp/nomad/plugins/shared/structs" - "github.com/hashicorp/go-version" "github.com/hashicorp/nomad/nomad/structs" + psstructs "github.com/hashicorp/nomad/plugins/shared/structs" ) // FeasibleIterator is used to iteratively yield nodes that @@ -896,6 +895,8 @@ OUTER: continue } + // TODO invert the count logic since it is cheaper than checking if + // devices match if nodeDeviceMatches(c.ctx, d, req) { // Consume the instances if unused >= desiredCount { @@ -904,18 +905,19 @@ OUTER: // Move on to the next request continue OUTER - } else { - // This device partially satisfies our requests - available[d] = 0 - desiredCount -= unused - } + } // else { + // This device partially satisfies our requests + //available[d] = 0 + //desiredCount -= unused + //} } } + // TODO I don't think this behavior is desirable // We couldn't match the request for the device - if desiredCount > 0 { - return false - } + //if desiredCount > 0 { + return false + //} } // Only satisfied if there are no more devices to place diff --git a/scheduler/feasible_test.go b/scheduler/feasible_test.go index d1d5f6160..0c9c5c114 100644 --- a/scheduler/feasible_test.go +++ b/scheduler/feasible_test.go @@ -1714,22 +1714,6 @@ func TestDeviceChecker(t *testing.T) { }, } - intel := &structs.NodeDeviceResource{ - Vendor: "intel", - Type: "gpu", - Name: "GT640", - Instances: []*structs.NodeDevice{ - { - ID: uuid.Generate(), - Healthy: true, - }, - { - ID: uuid.Generate(), - Healthy: false, - }, - }, - } - cases := []struct { Name string Result bool @@ -1796,12 +1780,12 @@ func TestDeviceChecker(t *testing.T) { NodeDevices: []*structs.NodeDeviceResource{nvidia}, RequestedDevices: []*structs.RequestedDevice{gpuTypeHighCountReq}, }, - { - Name: "request split over groups", - Result: true, - NodeDevices: []*structs.NodeDeviceResource{nvidia, intel}, - RequestedDevices: []*structs.RequestedDevice{gpuTypeHighCountReq}, - }, + //{ + //Name: "request split over groups", + //Result: true, + //NodeDevices: []*structs.NodeDeviceResource{nvidia, intel}, + //RequestedDevices: []*structs.RequestedDevice{gpuTypeHighCountReq}, + //}, { Name: "meets constraints requirement", Result: true, diff --git a/scheduler/rank.go b/scheduler/rank.go index 5e7ff8a3c..f986cb598 100644 --- a/scheduler/rank.go +++ b/scheduler/rank.go @@ -191,6 +191,10 @@ OUTER: netIdx.SetNode(option.Node) netIdx.AddAllocs(proposed) + // Create a device allocator + devAllocator := newDeviceAllocator(iter.ctx, option.Node) + devAllocator.AddAllocs(proposed) + // Assign the resources for each task total := &structs.AllocatedResources{ Tasks: make(map[string]*structs.AllocatedTaskResources, @@ -273,6 +277,17 @@ OUTER: taskResources.Networks = []*structs.NetworkResource{offer} } + // Check if we need to assign devices + for _, req := range task.Resources.Devices { + offer, err := devAllocator.AssignDevice(req) + if offer == nil { + iter.ctx.Metrics().ExhaustedNode(option.Node, fmt.Sprintf("devices: %s", err)) + continue OUTER + } + devAllocator.AddReserved(offer) + taskResources.Devices = append(taskResources.Devices, offer) + } + // Store the task resource option.SetTaskResources(task, taskResources) diff --git a/scheduler/rank_test.go b/scheduler/rank_test.go index b60d7716e..31124c982 100644 --- a/scheduler/rank_test.go +++ b/scheduler/rank_test.go @@ -467,6 +467,280 @@ func TestBinPackIterator_ExistingAlloc_PlannedEvict(t *testing.T) { } } +// This is a fairly high level test that asserts the bin packer uses the device +// allocator properly. It is not intended to handle every possible device +// request versus availability scenario. That should be covered in device +// allocator tests. +func TestBinPackIterator_Devices(t *testing.T) { + nvidiaNode := mock.NvidiaNode() + devs := nvidiaNode.NodeResources.Devices[0].Instances + nvidiaDevices := []string{devs[0].ID, devs[1].ID} + + nvidiaDev0 := mock.Alloc() + nvidiaDev0.AllocatedResources.Tasks["web"].Devices = []*structs.AllocatedDeviceResource{ + { + Type: "gpu", + Vendor: "nvidia", + Name: "1080ti", + DeviceIDs: []string{nvidiaDevices[0]}, + }, + } + + type devPlacementTuple struct { + Count int + ExcludeIDs []string + } + + cases := []struct { + Name string + Node *structs.Node + PlannedAllocs []*structs.Allocation + ExistingAllocs []*structs.Allocation + TaskGroup *structs.TaskGroup + NoPlace bool + ExpectedPlacements map[string]map[structs.DeviceIdTuple]devPlacementTuple + }{ + { + Name: "single request, match", + Node: nvidiaNode, + TaskGroup: &structs.TaskGroup{ + EphemeralDisk: &structs.EphemeralDisk{}, + Tasks: []*structs.Task{ + { + Name: "web", + Resources: &structs.Resources{ + CPU: 1024, + MemoryMB: 1024, + Devices: []*structs.RequestedDevice{ + { + Name: "nvidia/gpu", + Count: 1, + }, + }, + }, + }, + }, + }, + ExpectedPlacements: map[string]map[structs.DeviceIdTuple]devPlacementTuple{ + "web": map[structs.DeviceIdTuple]devPlacementTuple{ + { + Vendor: "nvidia", + Type: "gpu", + Name: "1080ti", + }: { + Count: 1, + }, + }, + }, + }, + { + Name: "single request multiple count, match", + Node: nvidiaNode, + TaskGroup: &structs.TaskGroup{ + EphemeralDisk: &structs.EphemeralDisk{}, + Tasks: []*structs.Task{ + { + Name: "web", + Resources: &structs.Resources{ + CPU: 1024, + MemoryMB: 1024, + Devices: []*structs.RequestedDevice{ + { + Name: "nvidia/gpu", + Count: 2, + }, + }, + }, + }, + }, + }, + ExpectedPlacements: map[string]map[structs.DeviceIdTuple]devPlacementTuple{ + "web": map[structs.DeviceIdTuple]devPlacementTuple{ + { + Vendor: "nvidia", + Type: "gpu", + Name: "1080ti", + }: { + Count: 2, + }, + }, + }, + }, + { + Name: "single request over count, no match", + Node: nvidiaNode, + TaskGroup: &structs.TaskGroup{ + EphemeralDisk: &structs.EphemeralDisk{}, + Tasks: []*structs.Task{ + { + Name: "web", + Resources: &structs.Resources{ + CPU: 1024, + MemoryMB: 1024, + Devices: []*structs.RequestedDevice{ + { + Name: "nvidia/gpu", + Count: 6, + }, + }, + }, + }, + }, + }, + NoPlace: true, + }, + { + Name: "single request no device of matching type", + Node: nvidiaNode, + TaskGroup: &structs.TaskGroup{ + EphemeralDisk: &structs.EphemeralDisk{}, + Tasks: []*structs.Task{ + { + Name: "web", + Resources: &structs.Resources{ + CPU: 1024, + MemoryMB: 1024, + Devices: []*structs.RequestedDevice{ + { + Name: "fpga", + Count: 1, + }, + }, + }, + }, + }, + }, + NoPlace: true, + }, + { + Name: "single request with previous uses", + Node: nvidiaNode, + TaskGroup: &structs.TaskGroup{ + EphemeralDisk: &structs.EphemeralDisk{}, + Tasks: []*structs.Task{ + { + Name: "web", + Resources: &structs.Resources{ + CPU: 1024, + MemoryMB: 1024, + Devices: []*structs.RequestedDevice{ + { + Name: "nvidia/gpu", + Count: 1, + }, + }, + }, + }, + }, + }, + ExpectedPlacements: map[string]map[structs.DeviceIdTuple]devPlacementTuple{ + "web": map[structs.DeviceIdTuple]devPlacementTuple{ + { + Vendor: "nvidia", + Type: "gpu", + Name: "1080ti", + }: { + Count: 1, + ExcludeIDs: []string{nvidiaDevices[0]}, + }, + }, + }, + ExistingAllocs: []*structs.Allocation{nvidiaDev0}, + }, + { + Name: "single request with planned uses", + Node: nvidiaNode, + TaskGroup: &structs.TaskGroup{ + EphemeralDisk: &structs.EphemeralDisk{}, + Tasks: []*structs.Task{ + { + Name: "web", + Resources: &structs.Resources{ + CPU: 1024, + MemoryMB: 1024, + Devices: []*structs.RequestedDevice{ + { + Name: "nvidia/gpu", + Count: 1, + }, + }, + }, + }, + }, + }, + ExpectedPlacements: map[string]map[structs.DeviceIdTuple]devPlacementTuple{ + "web": map[structs.DeviceIdTuple]devPlacementTuple{ + { + Vendor: "nvidia", + Type: "gpu", + Name: "1080ti", + }: { + Count: 1, + ExcludeIDs: []string{nvidiaDevices[0]}, + }, + }, + }, + PlannedAllocs: []*structs.Allocation{nvidiaDev0}, + }, + } + + for _, c := range cases { + t.Run(c.Name, func(t *testing.T) { + require := require.New(t) + + // Setup the context + state, ctx := testContext(t) + + // Add the planned allocs + if len(c.PlannedAllocs) != 0 { + for _, alloc := range c.PlannedAllocs { + alloc.NodeID = c.Node.ID + } + plan := ctx.Plan() + plan.NodeAllocation[c.Node.ID] = c.PlannedAllocs + } + + // Add the existing allocs + if len(c.ExistingAllocs) != 0 { + for _, alloc := range c.ExistingAllocs { + alloc.NodeID = c.Node.ID + } + require.NoError(state.UpsertAllocs(1000, c.ExistingAllocs)) + } + + static := NewStaticRankIterator(ctx, []*RankedNode{&RankedNode{Node: c.Node}}) + binp := NewBinPackIterator(ctx, static, false, 0) + binp.SetTaskGroup(c.TaskGroup) + + out := binp.Next() + if out == nil && !c.NoPlace { + t.Fatalf("expected placement") + } + + // Check we got the placements we are expecting + for tname, devices := range c.ExpectedPlacements { + tr, ok := out.TaskResources[tname] + require.True(ok) + + want := len(devices) + got := 0 + for _, placed := range tr.Devices { + got++ + + expected, ok := devices[*placed.ID()] + require.True(ok) + require.Equal(expected.Count, len(placed.DeviceIDs)) + for _, id := range expected.ExcludeIDs { + require.NotContains(placed.DeviceIDs, id) + } + } + + require.Equal(want, got) + } + }) + } +} + func TestJobAntiAffinity_PlannedAlloc(t *testing.T) { _, ctx := testContext(t) nodes := []*RankedNode{ From 6fa893c801168597a9403468014fa31ed0d538a8 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Wed, 17 Oct 2018 11:04:54 -0700 Subject: [PATCH 3/6] affinities --- nomad/mock/mock.go | 1 + scheduler/device.go | 71 +++++++++++++++------ scheduler/device_test.go | 124 ++++++++++++++++++++++++++++++++++--- scheduler/feasible.go | 39 +++++------- scheduler/feasible_test.go | 6 -- scheduler/rank.go | 22 ++++++- scheduler/rank_test.go | 49 +++++++++++++++ 7 files changed, 257 insertions(+), 55 deletions(-) diff --git a/nomad/mock/mock.go b/nomad/mock/mock.go index ea34170a5..04ce66ecf 100644 --- a/nomad/mock/mock.go +++ b/nomad/mock/mock.go @@ -92,6 +92,7 @@ func Node() *structs.Node { return node } +// NvidiaNode returns a node with two instances of an Nvidia GPU func NvidiaNode() *structs.Node { n := Node() n.NodeResources.Devices = []*structs.NodeDeviceResource{ diff --git a/scheduler/device.go b/scheduler/device.go index 94f683904..e8e9e3a94 100644 --- a/scheduler/device.go +++ b/scheduler/device.go @@ -6,22 +6,27 @@ import ( "github.com/hashicorp/nomad/nomad/structs" ) +// deviceAllocator is used to allocate devices to allocations. The allocator +// tracks availability as to not double allocate devices. type deviceAllocator struct { ctx Context devices map[structs.DeviceIdTuple]*deviceAllocatorInstance } +// deviceAllocatorInstance wraps a device and adds tracking to the instances of +// the device to determine if they are free or not. type deviceAllocatorInstance struct { - d *structs.NodeDeviceResource + // d is the device being wrapped + d *structs.NodeDeviceResource + + // instances is a mapping of the device IDs of the instances to their usage. + // Only a value of 0 indicates that the instance is unused. instances map[string]int } -// Free returns if the device is free to use. -func (d *deviceAllocatorInstance) Free(id string) bool { - uses, ok := d.instances[id] - return ok && uses == 0 -} - +// newDeviceAllocator returns a new device allocator. The node is used to +// populate the set of available devices based on what healthy device instances +// exist on the node. func newDeviceAllocator(ctx Context, n *structs.Node) *deviceAllocator { numDevices := 0 var devices []*structs.NodeDeviceResource @@ -57,7 +62,8 @@ func newDeviceAllocator(ctx Context, n *structs.Node) *deviceAllocator { } // AddAllocs takes a set of allocations and internally marks which devices are -// used. +// used. If a device is used more than once by the set of passed allocations, +// the collision will be returned as true. func (d *deviceAllocator) AddAllocs(allocs []*structs.Allocation) (collision bool) { for _, a := range allocs { // Filter any terminal allocation @@ -103,18 +109,23 @@ func (d *deviceAllocator) AddAllocs(allocs []*structs.Allocation) (collision boo return } +// AddReserved marks the device instances in the passed device reservation as +// used and returns if there is a collision. func (d *deviceAllocator) AddReserved(res *structs.AllocatedDeviceResource) (collision bool) { + // Lookup the device. devInst, ok := d.devices[*res.ID()] if !ok { return false } + // For each reserved instance, mark it as used for _, id := range res.DeviceIDs { cur, ok := devInst.instances[id] if !ok { continue } + // It has already been used, so mark that there is a collision if cur != 0 { collision = true } @@ -125,18 +136,21 @@ func (d *deviceAllocator) AddReserved(res *structs.AllocatedDeviceResource) (col return } -func (d *deviceAllocator) AssignDevice(ask *structs.RequestedDevice) (out *structs.AllocatedDeviceResource, err error) { +// AssignDevice takes a device request and returns an assignment as well as a +// score for the assignment. If no assignment could be made, an error is +// returned explaining why. +func (d *deviceAllocator) AssignDevice(ask *structs.RequestedDevice) (out *structs.AllocatedDeviceResource, score float64, err error) { // Try to hot path if len(d.devices) == 0 { - return nil, fmt.Errorf("no devices available") + return nil, 0.0, fmt.Errorf("no devices available") } if ask.Count == 0 { - return nil, fmt.Errorf("invalid request of zero devices") + return nil, 0.0, fmt.Errorf("invalid request of zero devices") } // Hold the current best offer var offer *structs.AllocatedDeviceResource - var score float64 + var offerScore float64 // Determine the devices that are feasible based on availability and // constraints @@ -161,16 +175,36 @@ func (d *deviceAllocator) AssignDevice(ask *structs.RequestedDevice) (out *struc // Score the choice var choiceScore float64 - if len(ask.Affinities) != 0 { - // TODO + if l := len(ask.Affinities); l != 0 { + for _, a := range ask.Affinities { + // Resolve the targets + lVal, ok := resolveDeviceTarget(a.LTarget, devInst.d) + if !ok { + continue + } + rVal, ok := resolveDeviceTarget(a.RTarget, devInst.d) + if !ok { + continue + } + + // Check if satisfied + if !checkAttributeAffinity(d.ctx, a.Operand, lVal, rVal) { + continue + } + choiceScore += a.Weight + } + + // normalize + choiceScore /= float64(l) } - if offer != nil && choiceScore < score { + // Only use the device if it is a higher score than we have already seen + if offer != nil && choiceScore < offerScore { continue } // Set the new highest score - score = choiceScore + offerScore = choiceScore // Build the choice offer = &structs.AllocatedDeviceResource{ @@ -192,9 +226,10 @@ func (d *deviceAllocator) AssignDevice(ask *structs.RequestedDevice) (out *struc } } + // Failed to find a match if offer == nil { - return nil, fmt.Errorf("no devices match request") + return nil, 0.0, fmt.Errorf("no devices match request") } - return offer, nil + return offer, offerScore, nil } diff --git a/scheduler/device_test.go b/scheduler/device_test.go index 95772060f..b097050e7 100644 --- a/scheduler/device_test.go +++ b/scheduler/device_test.go @@ -10,6 +10,8 @@ import ( "github.com/stretchr/testify/require" ) +// deviceRequest takes the name, count and potential constraints and affinities +// and returns a device request. func deviceRequest(name string, count uint64, constraints []*structs.Constraint, affinities []*structs.Affinity) *structs.RequestedDevice { return &structs.RequestedDevice{ @@ -20,6 +22,7 @@ func deviceRequest(name string, count uint64, } } +// nvidiaAllocatedDevice returns an allocated nvidia device func nvidiaAllocatedDevice() *structs.AllocatedDeviceResource { return &structs.AllocatedDeviceResource{ Type: "gpu", @@ -29,6 +32,7 @@ func nvidiaAllocatedDevice() *structs.AllocatedDeviceResource { } } +// nvidiaAlloc returns an allocation that has been assigned an nvidia device. func nvidiaAlloc() *structs.Allocation { a := mock.Alloc() a.AllocatedResources.Tasks["web"].Devices = []*structs.AllocatedDeviceResource{ @@ -37,6 +41,8 @@ func nvidiaAlloc() *structs.Allocation { return a } +// devNode returns a node containing two devices, an nvidia gpu and an intel +// FPGA. func devNode() *structs.Node { n := mock.NvidiaNode() n.NodeResources.Devices = append(n.NodeResources.Devices, &structs.NodeDeviceResource{ @@ -60,6 +66,7 @@ func devNode() *structs.Node { return n } +// multipleNvidiaNode returns a node containing multiple nvidia device types. func multipleNvidiaNode() *structs.Node { n := mock.NvidiaNode() n.NodeResources.Devices = append(n.NodeResources.Devices, &structs.NodeDeviceResource{ @@ -279,9 +286,10 @@ func TestDeviceAllocator_Allocate_NoDeviceNode(t *testing.T) { // Build the request ask := deviceRequest("nvidia/gpu", 1, nil, nil) - out, err := d.AssignDevice(ask) + out, score, err := d.AssignDevice(ask) require.Nil(out) require.Error(err) + require.Zero(score) require.Contains(err.Error(), "no devices available") } @@ -296,8 +304,9 @@ func TestDeviceAllocator_Allocate_GenericRequest(t *testing.T) { // Build the request ask := deviceRequest("gpu", 1, nil, nil) - out, err := d.AssignDevice(ask) + out, score, err := d.AssignDevice(ask) require.NotNil(out) + require.Zero(score) require.NoError(err) // Check that we got the nvidia device @@ -316,8 +325,9 @@ func TestDeviceAllocator_Allocate_FullyQualifiedRequest(t *testing.T) { // Build the request ask := deviceRequest("intel/fpga/F100", 1, nil, nil) - out, err := d.AssignDevice(ask) + out, score, err := d.AssignDevice(ask) require.NotNil(out) + require.Zero(score) require.NoError(err) // Check that we got the nvidia device @@ -336,7 +346,7 @@ func TestDeviceAllocator_Allocate_NotEnoughInstances(t *testing.T) { // Build the request ask := deviceRequest("gpu", 4, nil, nil) - out, err := d.AssignDevice(ask) + out, _, err := d.AssignDevice(ask) require.Nil(out) require.Error(err) require.Contains(err.Error(), "no devices match request") @@ -436,11 +446,12 @@ func TestDeviceAllocator_Allocate_Constraints(t *testing.T) { // Build the request ask := deviceRequest(c.Name, 1, c.Constraints, nil) - out, err := d.AssignDevice(ask) + out, score, err := d.AssignDevice(ask) if c.NoPlacement { require.Nil(out) } else { require.NotNil(out) + require.Zero(score) require.NoError(err) // Check that we got the nvidia device @@ -451,5 +462,104 @@ func TestDeviceAllocator_Allocate_Constraints(t *testing.T) { } } -// TODO -// Assign with priorities to pick the best one +// Test that asking for a device with affinities works +func TestDeviceAllocator_Allocate_Affinities(t *testing.T) { + n := multipleNvidiaNode() + nvidia0 := n.NodeResources.Devices[0] + nvidia1 := n.NodeResources.Devices[1] + + cases := []struct { + Name string + Affinities []*structs.Affinity + ExpectedDevice *structs.NodeDeviceResource + ZeroScore bool + }{ + { + Name: "gpu", + Affinities: []*structs.Affinity{ + { + LTarget: "${driver.attr.cuda_cores}", + Operand: ">", + RTarget: "4000", + Weight: 0.6, + }, + }, + ExpectedDevice: nvidia1, + }, + { + Name: "gpu", + Affinities: []*structs.Affinity{ + { + LTarget: "${driver.attr.cuda_cores}", + Operand: "<", + RTarget: "4000", + Weight: 0.1, + }, + }, + ExpectedDevice: nvidia0, + }, + { + Name: "gpu", + Affinities: []*structs.Affinity{ + { + LTarget: "${driver.attr.cuda_cores}", + Operand: ">", + RTarget: "4000", + Weight: -0.2, + }, + }, + ExpectedDevice: nvidia0, + ZeroScore: true, + }, + { + Name: "nvidia/gpu", + Affinities: []*structs.Affinity{ + // First two are shared across both devices + { + LTarget: "${driver.attr.memory_bandwidth}", + Operand: ">", + RTarget: "10 GB/s", + Weight: 0.2, + }, + { + LTarget: "${driver.attr.memory}", + Operand: "is", + RTarget: "11264 MiB", + Weight: 0.2, + }, + { + LTarget: "${driver.attr.graphics_clock}", + Operand: ">", + RTarget: "1.4 GHz", + Weight: 0.9, + }, + }, + ExpectedDevice: nvidia0, + }, + } + + for _, c := range cases { + t.Run(c.Name, func(t *testing.T) { + require := require.New(t) + _, ctx := testContext(t) + d := newDeviceAllocator(ctx, n) + require.NotNil(d) + + // Build the request + ask := deviceRequest(c.Name, 1, nil, c.Affinities) + + out, score, err := d.AssignDevice(ask) + require.NotNil(out) + require.NoError(err) + if c.ZeroScore { + require.Zero(score) + } else { + require.NotZero(score) + } + + // Check that we got the nvidia device + require.Len(out.DeviceIDs, 1) + require.Contains(collectInstanceIDs(c.ExpectedDevice), out.DeviceIDs[0]) + }) + } +} diff --git a/scheduler/feasible.go b/scheduler/feasible.go index ef786b1a1..872d5a81c 100644 --- a/scheduler/feasible.go +++ b/scheduler/feasible.go @@ -480,14 +480,12 @@ func checkConstraint(ctx Context, operand string, lVal, rVal interface{}) bool { // checkAffinity checks if a specific affinity is satisfied func checkAffinity(ctx Context, operand string, lVal, rVal interface{}) bool { - switch operand { - case structs.ConstraintSetContainsAny: - return checkSetContainsAny(lVal, rVal) - case structs.ConstraintSetContainsAll, structs.ConstraintSetContains: - return checkSetContainsAll(ctx, lVal, rVal) - default: - return checkConstraint(ctx, operand, lVal, rVal) - } + return checkConstraint(ctx, operand, lVal, rVal) +} + +// checkAttributeAffinity checks if an affinity is satisfied +func checkAttributeAffinity(ctx Context, operand string, lVal, rVal *psstructs.Attribute) bool { + return checkAttributeConstraint(ctx, operand, lVal, rVal) } // checkLexicalOrder is used to check for lexical ordering @@ -895,29 +893,24 @@ OUTER: continue } - // TODO invert the count logic since it is cheaper than checking if - // devices match + // First check we have enough instances of the device since this is + // cheaper than checking the constraints + if unused < desiredCount { + continue + } + + // Check the constriants if nodeDeviceMatches(c.ctx, d, req) { // Consume the instances - if unused >= desiredCount { - // This device satisfies all our requests - available[d] -= desiredCount + available[d] -= desiredCount - // Move on to the next request - continue OUTER - } // else { - // This device partially satisfies our requests - //available[d] = 0 - //desiredCount -= unused - //} + // Move on to the next request + continue OUTER } } - // TODO I don't think this behavior is desirable // We couldn't match the request for the device - //if desiredCount > 0 { return false - //} } // Only satisfied if there are no more devices to place diff --git a/scheduler/feasible_test.go b/scheduler/feasible_test.go index 0c9c5c114..abbdcb484 100644 --- a/scheduler/feasible_test.go +++ b/scheduler/feasible_test.go @@ -1780,12 +1780,6 @@ func TestDeviceChecker(t *testing.T) { NodeDevices: []*structs.NodeDeviceResource{nvidia}, RequestedDevices: []*structs.RequestedDevice{gpuTypeHighCountReq}, }, - //{ - //Name: "request split over groups", - //Result: true, - //NodeDevices: []*structs.NodeDeviceResource{nvidia, intel}, - //RequestedDevices: []*structs.RequestedDevice{gpuTypeHighCountReq}, - //}, { Name: "meets constraints requirement", Result: true, diff --git a/scheduler/rank.go b/scheduler/rank.go index f986cb598..e313f1d6b 100644 --- a/scheduler/rank.go +++ b/scheduler/rank.go @@ -195,6 +195,10 @@ OUTER: devAllocator := newDeviceAllocator(iter.ctx, option.Node) devAllocator.AddAllocs(proposed) + // Track the affinities of the devices + devicesWithAffinities := 0 + deviceAffinityScore := 0.0 + // Assign the resources for each task total := &structs.AllocatedResources{ Tasks: make(map[string]*structs.AllocatedTaskResources, @@ -279,13 +283,21 @@ OUTER: // Check if we need to assign devices for _, req := range task.Resources.Devices { - offer, err := devAllocator.AssignDevice(req) + offer, score, err := devAllocator.AssignDevice(req) if offer == nil { iter.ctx.Metrics().ExhaustedNode(option.Node, fmt.Sprintf("devices: %s", err)) continue OUTER } + + // Store the resource devAllocator.AddReserved(offer) taskResources.Devices = append(taskResources.Devices, offer) + + // Add the scores + if len(req.Affinities) != 0 { + devicesWithAffinities++ + deviceAffinityScore += score + } } // Store the task resource @@ -336,6 +348,14 @@ OUTER: normalizedFit := fitness / binPackingMaxFitScore option.Scores = append(option.Scores, normalizedFit) iter.ctx.Metrics().ScoreNode(option.Node, "binpack", normalizedFit) + + // Score the device affinity + if devicesWithAffinities != 0 { + deviceAffinityScore /= float64(devicesWithAffinities) + option.Scores = append(option.Scores, deviceAffinityScore) + iter.ctx.Metrics().ScoreNode(option.Node, "devices", deviceAffinityScore) + } + return option } } diff --git a/scheduler/rank_test.go b/scheduler/rank_test.go index 31124c982..61e3dee7a 100644 --- a/scheduler/rank_test.go +++ b/scheduler/rank_test.go @@ -499,6 +499,7 @@ func TestBinPackIterator_Devices(t *testing.T) { TaskGroup *structs.TaskGroup NoPlace bool ExpectedPlacements map[string]map[structs.DeviceIdTuple]devPlacementTuple + DeviceScore float64 }{ { Name: "single request, match", @@ -566,6 +567,48 @@ func TestBinPackIterator_Devices(t *testing.T) { }, }, }, + { + Name: "single request, with affinities", + Node: nvidiaNode, + TaskGroup: &structs.TaskGroup{ + EphemeralDisk: &structs.EphemeralDisk{}, + Tasks: []*structs.Task{ + { + Name: "web", + Resources: &structs.Resources{ + CPU: 1024, + MemoryMB: 1024, + Devices: []*structs.RequestedDevice{ + { + Name: "nvidia/gpu", + Count: 1, + Affinities: []*structs.Affinity{ + { + LTarget: "${driver.attr.graphics_clock}", + Operand: ">", + RTarget: "1.4 GHz", + Weight: 0.9, + }, + }, + }, + }, + }, + }, + }, + }, + ExpectedPlacements: map[string]map[structs.DeviceIdTuple]devPlacementTuple{ + "web": map[structs.DeviceIdTuple]devPlacementTuple{ + { + Vendor: "nvidia", + Type: "gpu", + Name: "1080ti", + }: { + Count: 1, + }, + }, + }, + DeviceScore: 0.9, + }, { Name: "single request over count, no match", Node: nvidiaNode, @@ -737,6 +780,12 @@ func TestBinPackIterator_Devices(t *testing.T) { require.Equal(want, got) } + + // Check potential affinity scores + if c.DeviceScore != 0.0 { + require.Len(out.Scores, 2) + require.Equal(out.Scores[1], c.DeviceScore) + } }) } } From 4f9b3ede879b917bdb4d878f205415a6cf5ca488 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Wed, 17 Oct 2018 14:26:25 -0700 Subject: [PATCH 4/6] Split device accounter and allocator --- nomad/structs/devices.go | 130 ++++++++++++++++++ nomad/structs/devices_test.go | 216 ++++++++++++++++++++++++++++++ nomad/structs/testing.go | 245 ++++++++++++++++++++++++++++++++++ scheduler/device.go | 136 ++----------------- scheduler/device_test.go | 207 ---------------------------- 5 files changed, 603 insertions(+), 331 deletions(-) create mode 100644 nomad/structs/devices.go create mode 100644 nomad/structs/devices_test.go diff --git a/nomad/structs/devices.go b/nomad/structs/devices.go new file mode 100644 index 000000000..6f179f42c --- /dev/null +++ b/nomad/structs/devices.go @@ -0,0 +1,130 @@ +package structs + +// DeviceAccounter is used to account for device usage on a node. It can detect +// when a node is oversubscribed and can be used for deciding what devices are +// free +type DeviceAccounter struct { + Devices map[DeviceIdTuple]*DeviceAccounterInstance +} + +// DeviceAccounterInstance wraps a device and adds tracking to the instances of +// the device to determine if they are free or not. +type DeviceAccounterInstance struct { + // Device is the device being wrapped + Device *NodeDeviceResource + + // Instances is a mapping of the device IDs of the instances to their usage. + // Only a value of 0 indicates that the instance is unused. + Instances map[string]int +} + +// AddAllocs takes a set of allocations and internally marks which devices are +// used. If a device is used more than once by the set of passed allocations, +// the collision will be returned as true. +func (d *DeviceAccounter) AddAllocs(allocs []*Allocation) (collision bool) { + for _, a := range allocs { + // Filter any terminal allocation + if a.TerminalStatus() { + continue + } + + // COMPAT(0.11): Remove in 0.11 + // If the alloc doesn't have the new style resources, it can't have + // devices + if a.AllocatedResources == nil { + continue + } + + // Go through each task resource + for _, tr := range a.AllocatedResources.Tasks { + + // Go through each assigned device group + for _, device := range tr.Devices { + devID := device.ID() + + // Go through each assigned device + for _, instanceID := range device.DeviceIDs { + + // Mark that we are using the device. It may not be in the + // map if the device is no longer being fingerprinted, is + // unhealthy, etc. + if devInst, ok := d.Devices[*devID]; ok { + if i, ok := devInst.Instances[instanceID]; ok { + // Mark that the device is in use + devInst.Instances[instanceID]++ + + if i != 0 { + collision = true + } + } + } + } + } + } + } + + return +} + +// AddReserved marks the device instances in the passed device reservation as +// used and returns if there is a collision. +func (d *DeviceAccounter) AddReserved(res *AllocatedDeviceResource) (collision bool) { + // Lookup the device. + devInst, ok := d.Devices[*res.ID()] + if !ok { + return false + } + + // For each reserved instance, mark it as used + for _, id := range res.DeviceIDs { + cur, ok := devInst.Instances[id] + if !ok { + continue + } + + // It has already been used, so mark that there is a collision + if cur != 0 { + collision = true + } + + devInst.Instances[id]++ + } + + return +} + +// NewDeviceAccounter returns a new device accounter. The node is used to +// populate the set of available devices based on what healthy device instances +// exist on the node. +func NewDeviceAccounter(n *Node) *DeviceAccounter { + numDevices := 0 + var devices []*NodeDeviceResource + + // COMPAT(0.11): Remove in 0.11 + if n.NodeResources != nil { + numDevices = len(n.NodeResources.Devices) + devices = n.NodeResources.Devices + } + + d := &DeviceAccounter{ + Devices: make(map[DeviceIdTuple]*DeviceAccounterInstance, numDevices), + } + + for _, dev := range devices { + id := *dev.ID() + d.Devices[id] = &DeviceAccounterInstance{ + Device: dev, + Instances: make(map[string]int, len(dev.Instances)), + } + for _, instance := range dev.Instances { + // Skip unhealthy devices as they aren't allocatable + if !instance.Healthy { + continue + } + + d.Devices[id].Instances[instance.ID] = 0 + } + } + + return d +} diff --git a/nomad/structs/devices_test.go b/nomad/structs/devices_test.go new file mode 100644 index 000000000..013b6fcec --- /dev/null +++ b/nomad/structs/devices_test.go @@ -0,0 +1,216 @@ +package structs + +import ( + "testing" + + "github.com/hashicorp/nomad/helper/uuid" + psstructs "github.com/hashicorp/nomad/plugins/shared/structs" + "github.com/stretchr/testify/require" +) + +// nvidiaAllocatedDevice returns an allocated nvidia device +func nvidiaAllocatedDevice() *AllocatedDeviceResource { + return &AllocatedDeviceResource{ + Type: "gpu", + Vendor: "nvidia", + Name: "1080ti", + DeviceIDs: []string{uuid.Generate()}, + } +} + +// nvidiaAlloc returns an allocation that has been assigned an nvidia device. +func nvidiaAlloc() *Allocation { + a := MockAlloc() + a.AllocatedResources.Tasks["web"].Devices = []*AllocatedDeviceResource{ + nvidiaAllocatedDevice(), + } + return a +} + +// devNode returns a node containing two devices, an nvidia gpu and an intel +// FPGA. +func devNode() *Node { + n := MockNvidiaNode() + n.NodeResources.Devices = append(n.NodeResources.Devices, &NodeDeviceResource{ + Type: "fpga", + Vendor: "intel", + Name: "F100", + Attributes: map[string]*psstructs.Attribute{ + "memory": psstructs.NewIntAttribute(4, psstructs.UnitGiB), + }, + Instances: []*NodeDevice{ + { + ID: uuid.Generate(), + Healthy: true, + }, + { + ID: uuid.Generate(), + Healthy: false, + }, + }, + }) + return n +} + +// Make sure that the device accounter works even if the node has no devices +func TestDeviceAccounter_AddAllocs_NoDeviceNode(t *testing.T) { + require := require.New(t) + n := MockNode() + d := NewDeviceAccounter(n) + require.NotNil(d) + + // Create three allocations, one with a device, one without, and one + // terminal + a1, a2, a3 := MockAlloc(), nvidiaAlloc(), MockAlloc() + allocs := []*Allocation{a1, a2, a3} + a3.DesiredStatus = AllocDesiredStatusStop + + require.False(d.AddAllocs(allocs)) + require.Len(d.Devices, 0) +} + +// Add allocs to a node with a device +func TestDeviceAccounter_AddAllocs(t *testing.T) { + require := require.New(t) + n := devNode() + d := NewDeviceAccounter(n) + require.NotNil(d) + + // Create three allocations, one with a device, one without, and one + // terminal + a1, a2, a3 := MockAlloc(), nvidiaAlloc(), MockAlloc() + + nvidiaDev0ID := n.NodeResources.Devices[0].Instances[0].ID + intelDev0ID := n.NodeResources.Devices[1].Instances[0].ID + a2.AllocatedResources.Tasks["web"].Devices[0].DeviceIDs = []string{nvidiaDev0ID} + + allocs := []*Allocation{a1, a2, a3} + a3.DesiredStatus = AllocDesiredStatusStop + + require.False(d.AddAllocs(allocs)) + require.Len(d.Devices, 2) + + // Check that we have two devices for nvidia and that one of them is used + nvidiaDevice, ok := d.Devices[*n.NodeResources.Devices[0].ID()] + require.True(ok) + require.Len(nvidiaDevice.Instances, 2) + require.Contains(nvidiaDevice.Instances, nvidiaDev0ID) + require.Equal(1, nvidiaDevice.Instances[nvidiaDev0ID]) + + // Check only one instance of the intel device is set up since the other is + // unhealthy + intelDevice, ok := d.Devices[*n.NodeResources.Devices[1].ID()] + require.True(ok) + require.Len(intelDevice.Instances, 1) + require.Equal(0, intelDevice.Instances[intelDev0ID]) +} + +// Add alloc with unknown ID to a node with devices. This tests that we can +// operate on previous allocs even if the device has changed to unhealthy and we +// don't track it +func TestDeviceAccounter_AddAllocs_UnknownID(t *testing.T) { + require := require.New(t) + n := devNode() + d := NewDeviceAccounter(n) + require.NotNil(d) + + // Create three allocations, one with a device, one without, and one + // terminal + a1, a2, a3 := MockAlloc(), nvidiaAlloc(), MockAlloc() + + // a2 will have a random ID since it is generated + + allocs := []*Allocation{a1, a2, a3} + a3.DesiredStatus = AllocDesiredStatusStop + + require.False(d.AddAllocs(allocs)) + require.Len(d.Devices, 2) + + // Check that we have two devices for nvidia and that one of them is used + nvidiaDevice, ok := d.Devices[*n.NodeResources.Devices[0].ID()] + require.True(ok) + require.Len(nvidiaDevice.Instances, 2) + for _, v := range nvidiaDevice.Instances { + require.Equal(0, v) + } +} + +// Test that collision detection works +func TestDeviceAccounter_AddAllocs_Collision(t *testing.T) { + require := require.New(t) + n := devNode() + d := NewDeviceAccounter(n) + require.NotNil(d) + + // Create two allocations, both with the same device + a1, a2 := nvidiaAlloc(), nvidiaAlloc() + + nvidiaDev0ID := n.NodeResources.Devices[0].Instances[0].ID + a1.AllocatedResources.Tasks["web"].Devices[0].DeviceIDs = []string{nvidiaDev0ID} + a2.AllocatedResources.Tasks["web"].Devices[0].DeviceIDs = []string{nvidiaDev0ID} + + allocs := []*Allocation{a1, a2} + require.True(d.AddAllocs(allocs)) +} + +// Make sure that the device allocator works even if the node has no devices +func TestDeviceAccounter_AddReserved_NoDeviceNode(t *testing.T) { + require := require.New(t) + n := MockNode() + d := NewDeviceAccounter(n) + require.NotNil(d) + + require.False(d.AddReserved(nvidiaAllocatedDevice())) + require.Len(d.Devices, 0) +} + +// Add reserved to a node with a device +func TestDeviceAccounter_AddReserved(t *testing.T) { + require := require.New(t) + n := devNode() + d := NewDeviceAccounter(n) + require.NotNil(d) + + nvidiaDev0ID := n.NodeResources.Devices[0].Instances[0].ID + intelDev0ID := n.NodeResources.Devices[1].Instances[0].ID + + res := nvidiaAllocatedDevice() + res.DeviceIDs = []string{nvidiaDev0ID} + + require.False(d.AddReserved(res)) + require.Len(d.Devices, 2) + + // Check that we have two devices for nvidia and that one of them is used + nvidiaDevice, ok := d.Devices[*n.NodeResources.Devices[0].ID()] + require.True(ok) + require.Len(nvidiaDevice.Instances, 2) + require.Contains(nvidiaDevice.Instances, nvidiaDev0ID) + require.Equal(1, nvidiaDevice.Instances[nvidiaDev0ID]) + + // Check only one instance of the intel device is set up since the other is + // unhealthy + intelDevice, ok := d.Devices[*n.NodeResources.Devices[1].ID()] + require.True(ok) + require.Len(intelDevice.Instances, 1) + require.Equal(0, intelDevice.Instances[intelDev0ID]) +} + +// Test that collision detection works +func TestDeviceAccounter_AddReserved_Collision(t *testing.T) { + require := require.New(t) + n := devNode() + d := NewDeviceAccounter(n) + require.NotNil(d) + + nvidiaDev0ID := n.NodeResources.Devices[0].Instances[0].ID + + // Create an alloc with nvidia + a1 := nvidiaAlloc() + a1.AllocatedResources.Tasks["web"].Devices[0].DeviceIDs = []string{nvidiaDev0ID} + require.False(d.AddAllocs([]*Allocation{a1})) + + // Reserve the same device + res := nvidiaAllocatedDevice() + res.DeviceIDs = []string{nvidiaDev0ID} + require.True(d.AddReserved(res)) +} diff --git a/nomad/structs/testing.go b/nomad/structs/testing.go index bbe2a2e58..7f4435366 100644 --- a/nomad/structs/testing.go +++ b/nomad/structs/testing.go @@ -1,5 +1,13 @@ package structs +import ( + "fmt" + "time" + + "github.com/hashicorp/nomad/helper/uuid" + psstructs "github.com/hashicorp/nomad/plugins/shared/structs" +) + // NodeResourcesToAllocatedResources converts a node resources to an allocated // resources. The task name used is "web" and network is omitted. This is // useful when trying to make an allocation fill an entire node. @@ -24,3 +32,240 @@ func NodeResourcesToAllocatedResources(n *NodeResources) *AllocatedResources { }, } } + +func MockNode() *Node { + node := &Node{ + ID: uuid.Generate(), + SecretID: uuid.Generate(), + Datacenter: "dc1", + Name: "foobar", + Attributes: map[string]string{ + "kernel.name": "linux", + "arch": "x86", + "nomad.version": "0.5.0", + "driver.exec": "1", + "driver.mock_driver": "1", + }, + NodeResources: &NodeResources{ + Cpu: NodeCpuResources{ + CpuShares: 4000, + }, + Memory: NodeMemoryResources{ + MemoryMB: 8192, + }, + Disk: NodeDiskResources{ + DiskMB: 100 * 1024, + }, + Networks: []*NetworkResource{ + { + Device: "eth0", + CIDR: "192.168.0.100/32", + MBits: 1000, + }, + }, + }, + ReservedResources: &NodeReservedResources{ + Cpu: NodeReservedCpuResources{ + CpuShares: 100, + }, + Memory: NodeReservedMemoryResources{ + MemoryMB: 256, + }, + Disk: NodeReservedDiskResources{ + DiskMB: 4 * 1024, + }, + Networks: NodeReservedNetworkResources{ + ReservedHostPorts: "22", + }, + }, + Links: map[string]string{ + "consul": "foobar.dc1", + }, + Meta: map[string]string{ + "pci-dss": "true", + "database": "mysql", + "version": "5.6", + }, + NodeClass: "linux-medium-pci", + Status: NodeStatusReady, + SchedulingEligibility: NodeSchedulingEligible, + } + node.ComputeClass() + return node +} + +// NvidiaNode returns a node with two instances of an Nvidia GPU +func MockNvidiaNode() *Node { + n := MockNode() + n.NodeResources.Devices = []*NodeDeviceResource{ + { + Type: "gpu", + Vendor: "nvidia", + Name: "1080ti", + Attributes: map[string]*psstructs.Attribute{ + "memory": psstructs.NewIntAttribute(11, psstructs.UnitGiB), + "cuda_cores": psstructs.NewIntAttribute(3584, ""), + "graphics_clock": psstructs.NewIntAttribute(1480, psstructs.UnitMHz), + "memory_bandwidth": psstructs.NewIntAttribute(11, psstructs.UnitGBPerS), + }, + Instances: []*NodeDevice{ + { + ID: uuid.Generate(), + Healthy: true, + }, + { + ID: uuid.Generate(), + Healthy: true, + }, + }, + }, + } + n.ComputeClass() + return n +} + +func MockJob() *Job { + job := &Job{ + Region: "global", + ID: fmt.Sprintf("mock-service-%s", uuid.Generate()), + Name: "my-job", + Namespace: DefaultNamespace, + Type: JobTypeService, + Priority: 50, + AllAtOnce: false, + Datacenters: []string{"dc1"}, + Constraints: []*Constraint{ + { + LTarget: "${attr.kernel.name}", + RTarget: "linux", + Operand: "=", + }, + }, + TaskGroups: []*TaskGroup{ + { + Name: "web", + Count: 10, + EphemeralDisk: &EphemeralDisk{ + SizeMB: 150, + }, + RestartPolicy: &RestartPolicy{ + Attempts: 3, + Interval: 10 * time.Minute, + Delay: 1 * time.Minute, + Mode: RestartPolicyModeDelay, + }, + ReschedulePolicy: &ReschedulePolicy{ + Attempts: 2, + Interval: 10 * time.Minute, + Delay: 5 * time.Second, + DelayFunction: "constant", + }, + Migrate: DefaultMigrateStrategy(), + Tasks: []*Task{ + { + Name: "web", + Driver: "exec", + Config: map[string]interface{}{ + "command": "/bin/date", + }, + Env: map[string]string{ + "FOO": "bar", + }, + Services: []*Service{ + { + Name: "${TASK}-frontend", + PortLabel: "http", + Tags: []string{"pci:${meta.pci-dss}", "datacenter:${node.datacenter}"}, + Checks: []*ServiceCheck{ + { + Name: "check-table", + Type: ServiceCheckScript, + Command: "/usr/local/check-table-${meta.database}", + Args: []string{"${meta.version}"}, + Interval: 30 * time.Second, + Timeout: 5 * time.Second, + }, + }, + }, + { + Name: "${TASK}-admin", + PortLabel: "admin", + }, + }, + LogConfig: DefaultLogConfig(), + Resources: &Resources{ + CPU: 500, + MemoryMB: 256, + Networks: []*NetworkResource{ + { + MBits: 50, + DynamicPorts: []Port{ + {Label: "http"}, + {Label: "admin"}, + }, + }, + }, + }, + Meta: map[string]string{ + "foo": "bar", + }, + }, + }, + Meta: map[string]string{ + "elb_check_type": "http", + "elb_check_interval": "30s", + "elb_check_min": "3", + }, + }, + }, + Meta: map[string]string{ + "owner": "armon", + }, + Status: JobStatusPending, + Version: 0, + CreateIndex: 42, + ModifyIndex: 99, + JobModifyIndex: 99, + } + job.Canonicalize() + return job +} + +func MockAlloc() *Allocation { + alloc := &Allocation{ + ID: uuid.Generate(), + EvalID: uuid.Generate(), + NodeID: "12345678-abcd-efab-cdef-123456789abc", + Namespace: DefaultNamespace, + TaskGroup: "web", + AllocatedResources: &AllocatedResources{ + Tasks: map[string]*AllocatedTaskResources{ + "web": { + Cpu: AllocatedCpuResources{ + CpuShares: 500, + }, + Memory: AllocatedMemoryResources{ + MemoryMB: 256, + }, + Networks: []*NetworkResource{ + { + Device: "eth0", + IP: "192.168.0.100", + ReservedPorts: []Port{{Label: "admin", Value: 5000}}, + MBits: 50, + DynamicPorts: []Port{{Label: "http", Value: 9876}}, + }, + }, + }, + }, + Shared: AllocatedSharedResources{ + DiskMB: 150, + }, + }, + Job: MockJob(), + DesiredStatus: AllocDesiredStatusRun, + ClientStatus: AllocClientStatusPending, + } + alloc.JobID = alloc.Job.ID + return alloc +} diff --git a/scheduler/device.go b/scheduler/device.go index e8e9e3a94..a922bee10 100644 --- a/scheduler/device.go +++ b/scheduler/device.go @@ -9,131 +9,19 @@ import ( // deviceAllocator is used to allocate devices to allocations. The allocator // tracks availability as to not double allocate devices. type deviceAllocator struct { - ctx Context - devices map[structs.DeviceIdTuple]*deviceAllocatorInstance -} + *structs.DeviceAccounter -// deviceAllocatorInstance wraps a device and adds tracking to the instances of -// the device to determine if they are free or not. -type deviceAllocatorInstance struct { - // d is the device being wrapped - d *structs.NodeDeviceResource - - // instances is a mapping of the device IDs of the instances to their usage. - // Only a value of 0 indicates that the instance is unused. - instances map[string]int + ctx Context } // newDeviceAllocator returns a new device allocator. The node is used to // populate the set of available devices based on what healthy device instances // exist on the node. func newDeviceAllocator(ctx Context, n *structs.Node) *deviceAllocator { - numDevices := 0 - var devices []*structs.NodeDeviceResource - - // COMPAT(0.11): Remove in 0.11 - if n.NodeResources != nil { - numDevices = len(n.NodeResources.Devices) - devices = n.NodeResources.Devices + return &deviceAllocator{ + ctx: ctx, + DeviceAccounter: structs.NewDeviceAccounter(n), } - - d := &deviceAllocator{ - ctx: ctx, - devices: make(map[structs.DeviceIdTuple]*deviceAllocatorInstance, numDevices), - } - - for _, dev := range devices { - id := *dev.ID() - d.devices[id] = &deviceAllocatorInstance{ - d: dev, - instances: make(map[string]int, len(dev.Instances)), - } - for _, instance := range dev.Instances { - // Skip unhealthy devices as they aren't allocatable - if !instance.Healthy { - continue - } - - d.devices[id].instances[instance.ID] = 0 - } - } - - return d -} - -// AddAllocs takes a set of allocations and internally marks which devices are -// used. If a device is used more than once by the set of passed allocations, -// the collision will be returned as true. -func (d *deviceAllocator) AddAllocs(allocs []*structs.Allocation) (collision bool) { - for _, a := range allocs { - // Filter any terminal allocation - if a.TerminalStatus() { - continue - } - - // COMPAT(0.11): Remove in 0.11 - // If the alloc doesn't have the new style resources, it can't have - // devices - if a.AllocatedResources == nil { - continue - } - - // Go through each task resource - for _, tr := range a.AllocatedResources.Tasks { - - // Go through each assigned device group - for _, device := range tr.Devices { - devID := device.ID() - - // Go through each assigned device - for _, instanceID := range device.DeviceIDs { - - // Mark that we are using the device. It may not be in the - // map if the device is no longer being fingerprinted, is - // unhealthy, etc. - if devInst, ok := d.devices[*devID]; ok { - if i, ok := devInst.instances[instanceID]; ok { - // Mark that the device is in use - devInst.instances[instanceID]++ - - if i != 0 { - collision = true - } - } - } - } - } - } - } - - return -} - -// AddReserved marks the device instances in the passed device reservation as -// used and returns if there is a collision. -func (d *deviceAllocator) AddReserved(res *structs.AllocatedDeviceResource) (collision bool) { - // Lookup the device. - devInst, ok := d.devices[*res.ID()] - if !ok { - return false - } - - // For each reserved instance, mark it as used - for _, id := range res.DeviceIDs { - cur, ok := devInst.instances[id] - if !ok { - continue - } - - // It has already been used, so mark that there is a collision - if cur != 0 { - collision = true - } - - devInst.instances[id]++ - } - - return } // AssignDevice takes a device request and returns an assignment as well as a @@ -141,7 +29,7 @@ func (d *deviceAllocator) AddReserved(res *structs.AllocatedDeviceResource) (col // returned explaining why. func (d *deviceAllocator) AssignDevice(ask *structs.RequestedDevice) (out *structs.AllocatedDeviceResource, score float64, err error) { // Try to hot path - if len(d.devices) == 0 { + if len(d.Devices) == 0 { return nil, 0.0, fmt.Errorf("no devices available") } if ask.Count == 0 { @@ -154,10 +42,10 @@ func (d *deviceAllocator) AssignDevice(ask *structs.RequestedDevice) (out *struc // Determine the devices that are feasible based on availability and // constraints - for id, devInst := range d.devices { + for id, devInst := range d.Devices { // Check if we have enough unused instances to use this assignable := uint64(0) - for _, v := range devInst.instances { + for _, v := range devInst.Instances { if v == 0 { assignable++ } @@ -169,7 +57,7 @@ func (d *deviceAllocator) AssignDevice(ask *structs.RequestedDevice) (out *struc } // Check if the device works - if !nodeDeviceMatches(d.ctx, devInst.d, ask) { + if !nodeDeviceMatches(d.ctx, devInst.Device, ask) { continue } @@ -178,11 +66,11 @@ func (d *deviceAllocator) AssignDevice(ask *structs.RequestedDevice) (out *struc if l := len(ask.Affinities); l != 0 { for _, a := range ask.Affinities { // Resolve the targets - lVal, ok := resolveDeviceTarget(a.LTarget, devInst.d) + lVal, ok := resolveDeviceTarget(a.LTarget, devInst.Device) if !ok { continue } - rVal, ok := resolveDeviceTarget(a.RTarget, devInst.d) + rVal, ok := resolveDeviceTarget(a.RTarget, devInst.Device) if !ok { continue } @@ -215,7 +103,7 @@ func (d *deviceAllocator) AssignDevice(ask *structs.RequestedDevice) (out *struc } assigned := uint64(0) - for id, v := range devInst.instances { + for id, v := range devInst.Instances { if v == 0 && assigned < ask.Count { assigned++ offer.DeviceIDs = append(offer.DeviceIDs, id) diff --git a/scheduler/device_test.go b/scheduler/device_test.go index b097050e7..36643555f 100644 --- a/scheduler/device_test.go +++ b/scheduler/device_test.go @@ -22,25 +22,6 @@ func deviceRequest(name string, count uint64, } } -// nvidiaAllocatedDevice returns an allocated nvidia device -func nvidiaAllocatedDevice() *structs.AllocatedDeviceResource { - return &structs.AllocatedDeviceResource{ - Type: "gpu", - Vendor: "nvidia", - Name: "1080ti", - DeviceIDs: []string{uuid.Generate()}, - } -} - -// nvidiaAlloc returns an allocation that has been assigned an nvidia device. -func nvidiaAlloc() *structs.Allocation { - a := mock.Alloc() - a.AllocatedResources.Tasks["web"].Devices = []*structs.AllocatedDeviceResource{ - nvidiaAllocatedDevice(), - } - return a -} - // devNode returns a node containing two devices, an nvidia gpu and an intel // FPGA. func devNode() *structs.Node { @@ -105,194 +86,6 @@ func collectInstanceIDs(devices ...*structs.NodeDeviceResource) []string { return out } -// Make sure that the device allocator works even if the node has no devices -func TestDeviceAllocator_AddAllocs_NoDeviceNode(t *testing.T) { - require := require.New(t) - _, ctx := testContext(t) - n := mock.Node() - d := newDeviceAllocator(ctx, n) - require.NotNil(d) - - // Create three allocations, one with a device, one without, and one - // terminal - a1, a2, a3 := mock.Alloc(), nvidiaAlloc(), mock.Alloc() - allocs := []*structs.Allocation{a1, a2, a3} - a3.DesiredStatus = structs.AllocDesiredStatusStop - - require.False(d.AddAllocs(allocs)) - require.Len(d.devices, 0) -} - -// Add allocs to a node with a device -func TestDeviceAllocator_AddAllocs(t *testing.T) { - require := require.New(t) - _, ctx := testContext(t) - n := devNode() - d := newDeviceAllocator(ctx, n) - require.NotNil(d) - - // Create three allocations, one with a device, one without, and one - // terminal - a1, a2, a3 := mock.Alloc(), nvidiaAlloc(), mock.Alloc() - - nvidiaDev0ID := n.NodeResources.Devices[0].Instances[0].ID - intelDev0ID := n.NodeResources.Devices[1].Instances[0].ID - a2.AllocatedResources.Tasks["web"].Devices[0].DeviceIDs = []string{nvidiaDev0ID} - - allocs := []*structs.Allocation{a1, a2, a3} - a3.DesiredStatus = structs.AllocDesiredStatusStop - - require.False(d.AddAllocs(allocs)) - require.Len(d.devices, 2) - - // Check that we have two devices for nvidia and that one of them is used - nvidiaDevice, ok := d.devices[*n.NodeResources.Devices[0].ID()] - require.True(ok) - require.Len(nvidiaDevice.instances, 2) - require.Contains(nvidiaDevice.instances, nvidiaDev0ID) - require.Equal(1, nvidiaDevice.instances[nvidiaDev0ID]) - - // Check only one instance of the intel device is set up since the other is - // unhealthy - intelDevice, ok := d.devices[*n.NodeResources.Devices[1].ID()] - require.True(ok) - require.Len(intelDevice.instances, 1) - require.Equal(0, intelDevice.instances[intelDev0ID]) -} - -// Add alloc with unknown ID to a node with devices. This tests that we can -// operate on previous allocs even if the device has changed to unhealthy and we -// don't track it -func TestDeviceAllocator_AddAllocs_UnknownID(t *testing.T) { - require := require.New(t) - _, ctx := testContext(t) - n := devNode() - d := newDeviceAllocator(ctx, n) - require.NotNil(d) - - // Create three allocations, one with a device, one without, and one - // terminal - a1, a2, a3 := mock.Alloc(), nvidiaAlloc(), mock.Alloc() - - // a2 will have a random ID since it is generated - - allocs := []*structs.Allocation{a1, a2, a3} - a3.DesiredStatus = structs.AllocDesiredStatusStop - - require.False(d.AddAllocs(allocs)) - require.Len(d.devices, 2) - - // Check that we have two devices for nvidia and that one of them is used - nvidiaDevice, ok := d.devices[*n.NodeResources.Devices[0].ID()] - require.True(ok) - require.Len(nvidiaDevice.instances, 2) - for _, v := range nvidiaDevice.instances { - require.Equal(0, v) - } -} - -// Test that collision detection works -func TestDeviceAllocator_AddAllocs_Collision(t *testing.T) { - require := require.New(t) - _, ctx := testContext(t) - n := devNode() - d := newDeviceAllocator(ctx, n) - require.NotNil(d) - - // Create two allocations, both with the same device - a1, a2 := nvidiaAlloc(), nvidiaAlloc() - - nvidiaDev0ID := n.NodeResources.Devices[0].Instances[0].ID - a1.AllocatedResources.Tasks["web"].Devices[0].DeviceIDs = []string{nvidiaDev0ID} - a2.AllocatedResources.Tasks["web"].Devices[0].DeviceIDs = []string{nvidiaDev0ID} - - allocs := []*structs.Allocation{a1, a2} - require.True(d.AddAllocs(allocs)) -} - -// Make sure that the device allocator works even if the node has no devices -func TestDeviceAllocator_AddReserved_NoDeviceNode(t *testing.T) { - require := require.New(t) - _, ctx := testContext(t) - n := mock.Node() - d := newDeviceAllocator(ctx, n) - require.NotNil(d) - - require.False(d.AddReserved(nvidiaAllocatedDevice())) - require.Len(d.devices, 0) -} - -// Add reserved to a node with a device -func TestDeviceAllocator_AddReserved(t *testing.T) { - require := require.New(t) - _, ctx := testContext(t) - n := devNode() - d := newDeviceAllocator(ctx, n) - require.NotNil(d) - - nvidiaDev0ID := n.NodeResources.Devices[0].Instances[0].ID - intelDev0ID := n.NodeResources.Devices[1].Instances[0].ID - - res := nvidiaAllocatedDevice() - res.DeviceIDs = []string{nvidiaDev0ID} - - require.False(d.AddReserved(res)) - require.Len(d.devices, 2) - - // Check that we have two devices for nvidia and that one of them is used - nvidiaDevice, ok := d.devices[*n.NodeResources.Devices[0].ID()] - require.True(ok) - require.Len(nvidiaDevice.instances, 2) - require.Contains(nvidiaDevice.instances, nvidiaDev0ID) - require.Equal(1, nvidiaDevice.instances[nvidiaDev0ID]) - - // Check only one instance of the intel device is set up since the other is - // unhealthy - intelDevice, ok := d.devices[*n.NodeResources.Devices[1].ID()] - require.True(ok) - require.Len(intelDevice.instances, 1) - require.Equal(0, intelDevice.instances[intelDev0ID]) -} - -// Test that collision detection works -func TestDeviceAllocator_AddReserved_Collision(t *testing.T) { - require := require.New(t) - _, ctx := testContext(t) - n := devNode() - d := newDeviceAllocator(ctx, n) - require.NotNil(d) - - nvidiaDev0ID := n.NodeResources.Devices[0].Instances[0].ID - - // Create an alloc with nvidia - a1 := nvidiaAlloc() - a1.AllocatedResources.Tasks["web"].Devices[0].DeviceIDs = []string{nvidiaDev0ID} - require.False(d.AddAllocs([]*structs.Allocation{a1})) - - // Reserve the same device - res := nvidiaAllocatedDevice() - res.DeviceIDs = []string{nvidiaDev0ID} - require.True(d.AddReserved(res)) -} - -// Test that asking for a device on a node with no devices doesn't work -func TestDeviceAllocator_Allocate_NoDeviceNode(t *testing.T) { - require := require.New(t) - _, ctx := testContext(t) - n := mock.Node() - d := newDeviceAllocator(ctx, n) - require.NotNil(d) - - // Build the request - ask := deviceRequest("nvidia/gpu", 1, nil, nil) - - out, score, err := d.AssignDevice(ask) - require.Nil(out) - require.Error(err) - require.Zero(score) - require.Contains(err.Error(), "no devices available") -} - // Test that asking for a device that isn't fully specified works. func TestDeviceAllocator_Allocate_GenericRequest(t *testing.T) { require := require.New(t) From e3cbb2c82e3eb3b9a98115a02f6208066a18d722 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Wed, 17 Oct 2018 15:03:38 -0700 Subject: [PATCH 5/6] allocs fit checks if devices get oversubscribed --- nomad/plan_apply.go | 5 +-- nomad/plan_apply_test.go | 59 +++++++++++++++++++++++++ nomad/structs/devices.go | 72 +++++++++++++++---------------- nomad/structs/funcs.go | 16 +++++-- nomad/structs/funcs_test.go | 86 ++++++++++++++++++++++++++++++++----- scheduler/rank.go | 4 +- 6 files changed, 187 insertions(+), 55 deletions(-) diff --git a/nomad/plan_apply.go b/nomad/plan_apply.go index 95dd007f0..28a2b14db 100644 --- a/nomad/plan_apply.go +++ b/nomad/plan_apply.go @@ -5,10 +5,9 @@ import ( "runtime" "time" + "github.com/armon/go-metrics" log "github.com/hashicorp/go-hclog" memdb "github.com/hashicorp/go-memdb" - - "github.com/armon/go-metrics" "github.com/hashicorp/go-multierror" "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/state" @@ -536,6 +535,6 @@ func evaluateNodePlan(snap *state.StateSnapshot, plan *structs.Plan, nodeID stri proposed = append(proposed, plan.NodeAllocation[nodeID]...) // Check if these allocations fit - fit, reason, _, err := structs.AllocsFit(node, proposed, nil) + fit, reason, _, err := structs.AllocsFit(node, proposed, nil, true) return fit, reason, err } diff --git a/nomad/plan_apply_test.go b/nomad/plan_apply_test.go index 1b5142aa9..76ed0ad89 100644 --- a/nomad/plan_apply_test.go +++ b/nomad/plan_apply_test.go @@ -640,6 +640,65 @@ func TestPlanApply_EvalNodePlan_NodeFull(t *testing.T) { } } +// Test that we detect device oversubscription +func TestPlanApply_EvalNodePlan_NodeFull_Device(t *testing.T) { + t.Parallel() + alloc := mock.Alloc() + state := testStateStore(t) + node := mock.NvidiaNode() + node.ReservedResources = nil + + nvidia0 := node.NodeResources.Devices[0].Instances[0].ID + + // Have the allocation use a Nvidia device + alloc.NodeID = node.ID + alloc.AllocatedResources.Tasks["web"].Devices = []*structs.AllocatedDeviceResource{ + { + Type: "gpu", + Vendor: "nvidia", + Name: "1080ti", + DeviceIDs: []string{nvidia0}, + }, + } + + state.UpsertJobSummary(999, mock.JobSummary(alloc.JobID)) + state.UpsertNode(1000, node) + state.UpsertAllocs(1001, []*structs.Allocation{alloc}) + + // Alloc2 tries to use the same device + alloc2 := mock.Alloc() + alloc2.AllocatedResources.Tasks["web"].Networks = nil + alloc2.AllocatedResources.Tasks["web"].Devices = []*structs.AllocatedDeviceResource{ + { + Type: "gpu", + Vendor: "nvidia", + Name: "1080ti", + DeviceIDs: []string{nvidia0}, + }, + } + alloc2.NodeID = node.ID + state.UpsertJobSummary(1200, mock.JobSummary(alloc2.JobID)) + + snap, _ := state.Snapshot() + plan := &structs.Plan{ + Job: alloc.Job, + NodeAllocation: map[string][]*structs.Allocation{ + node.ID: {alloc2}, + }, + } + + fit, reason, err := evaluateNodePlan(snap, plan, node.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + if fit { + t.Fatalf("bad") + } + if reason != "device oversubscribed" { + t.Fatalf("bad: %q", reason) + } +} + func TestPlanApply_EvalNodePlan_UpdateExisting(t *testing.T) { t.Parallel() alloc := mock.Alloc() diff --git a/nomad/structs/devices.go b/nomad/structs/devices.go index 6f179f42c..5ee537e04 100644 --- a/nomad/structs/devices.go +++ b/nomad/structs/devices.go @@ -18,6 +18,42 @@ type DeviceAccounterInstance struct { Instances map[string]int } +// NewDeviceAccounter returns a new device accounter. The node is used to +// populate the set of available devices based on what healthy device instances +// exist on the node. +func NewDeviceAccounter(n *Node) *DeviceAccounter { + numDevices := 0 + var devices []*NodeDeviceResource + + // COMPAT(0.11): Remove in 0.11 + if n.NodeResources != nil { + numDevices = len(n.NodeResources.Devices) + devices = n.NodeResources.Devices + } + + d := &DeviceAccounter{ + Devices: make(map[DeviceIdTuple]*DeviceAccounterInstance, numDevices), + } + + for _, dev := range devices { + id := *dev.ID() + d.Devices[id] = &DeviceAccounterInstance{ + Device: dev, + Instances: make(map[string]int, len(dev.Instances)), + } + for _, instance := range dev.Instances { + // Skip unhealthy devices as they aren't allocatable + if !instance.Healthy { + continue + } + + d.Devices[id].Instances[instance.ID] = 0 + } + } + + return d +} + // AddAllocs takes a set of allocations and internally marks which devices are // used. If a device is used more than once by the set of passed allocations, // the collision will be returned as true. @@ -92,39 +128,3 @@ func (d *DeviceAccounter) AddReserved(res *AllocatedDeviceResource) (collision b return } - -// NewDeviceAccounter returns a new device accounter. The node is used to -// populate the set of available devices based on what healthy device instances -// exist on the node. -func NewDeviceAccounter(n *Node) *DeviceAccounter { - numDevices := 0 - var devices []*NodeDeviceResource - - // COMPAT(0.11): Remove in 0.11 - if n.NodeResources != nil { - numDevices = len(n.NodeResources.Devices) - devices = n.NodeResources.Devices - } - - d := &DeviceAccounter{ - Devices: make(map[DeviceIdTuple]*DeviceAccounterInstance, numDevices), - } - - for _, dev := range devices { - id := *dev.ID() - d.Devices[id] = &DeviceAccounterInstance{ - Device: dev, - Instances: make(map[string]int, len(dev.Instances)), - } - for _, instance := range dev.Instances { - // Skip unhealthy devices as they aren't allocatable - if !instance.Healthy { - continue - } - - d.Devices[id].Instances[instance.ID] = 0 - } - } - - return d -} diff --git a/nomad/structs/funcs.go b/nomad/structs/funcs.go index 7493596f8..f79cb6c06 100644 --- a/nomad/structs/funcs.go +++ b/nomad/structs/funcs.go @@ -10,11 +10,10 @@ import ( "strconv" "strings" - "golang.org/x/crypto/blake2b" - multierror "github.com/hashicorp/go-multierror" lru "github.com/hashicorp/golang-lru" "github.com/hashicorp/nomad/acl" + "golang.org/x/crypto/blake2b" ) // MergeMultierrorWarnings takes job warnings and canonicalize warnings and @@ -98,8 +97,9 @@ func FilterTerminalAllocs(allocs []*Allocation) ([]*Allocation, map[string]*Allo // AllocsFit checks if a given set of allocations will fit on a node. // The netIdx can optionally be provided if its already been computed. // If the netIdx is provided, it is assumed that the client has already -// ensured there are no collisions. -func AllocsFit(node *Node, allocs []*Allocation, netIdx *NetworkIndex) (bool, string, *ComparableResources, error) { +// ensured there are no collisions. If checkDevices is set to true, we check if +// there is a device oversubscription. +func AllocsFit(node *Node, allocs []*Allocation, netIdx *NetworkIndex, checkDevices bool) (bool, string, *ComparableResources, error) { // Compute the utilization from zero used := new(ComparableResources) @@ -136,6 +136,14 @@ func AllocsFit(node *Node, allocs []*Allocation, netIdx *NetworkIndex) (bool, st return false, "bandwidth exceeded", used, nil } + // Check devices + if checkDevices { + accounter := NewDeviceAccounter(node) + if accounter.AddAllocs(allocs) { + return false, "device oversubscribed", used, nil + } + } + // Allocations fit! return true, "", used, nil } diff --git a/nomad/structs/funcs_test.go b/nomad/structs/funcs_test.go index 01571ba34..db81c67b5 100644 --- a/nomad/structs/funcs_test.go +++ b/nomad/structs/funcs_test.go @@ -116,7 +116,7 @@ func TestAllocsFit_PortsOvercommitted_Old(t *testing.T) { } // Should fit one allocation - fit, dim, _, err := AllocsFit(n, []*Allocation{a1}, nil) + fit, dim, _, err := AllocsFit(n, []*Allocation{a1}, nil, false) if err != nil { t.Fatalf("err: %v", err) } @@ -125,7 +125,7 @@ func TestAllocsFit_PortsOvercommitted_Old(t *testing.T) { } // Should not fit second allocation - fit, _, _, err = AllocsFit(n, []*Allocation{a1, a1}, nil) + fit, _, _, err = AllocsFit(n, []*Allocation{a1, a1}, nil, false) if err != nil { t.Fatalf("err: %v", err) } @@ -186,7 +186,7 @@ func TestAllocsFit_Old(t *testing.T) { } // Should fit one allocation - fit, _, used, err := AllocsFit(n, []*Allocation{a1}, nil) + fit, _, used, err := AllocsFit(n, []*Allocation{a1}, nil, false) require.NoError(err) require.True(fit) @@ -195,7 +195,7 @@ func TestAllocsFit_Old(t *testing.T) { require.EqualValues(2048, used.Flattened.Memory.MemoryMB) // Should not fit second allocation - fit, _, used, err = AllocsFit(n, []*Allocation{a1, a1}, nil) + fit, _, used, err = AllocsFit(n, []*Allocation{a1, a1}, nil, false) require.NoError(err) require.False(fit) @@ -256,7 +256,7 @@ func TestAllocsFit_TerminalAlloc_Old(t *testing.T) { } // Should fit one allocation - fit, _, used, err := AllocsFit(n, []*Allocation{a1}, nil) + fit, _, used, err := AllocsFit(n, []*Allocation{a1}, nil, false) require.NoError(err) require.True(fit) @@ -267,7 +267,7 @@ func TestAllocsFit_TerminalAlloc_Old(t *testing.T) { // Should fit second allocation since it is terminal a2 := a1.Copy() a2.DesiredStatus = AllocDesiredStatusStop - fit, _, used, err = AllocsFit(n, []*Allocation{a1, a2}, nil) + fit, _, used, err = AllocsFit(n, []*Allocation{a1, a2}, nil, false) require.NoError(err) require.True(fit) @@ -341,7 +341,7 @@ func TestAllocsFit(t *testing.T) { } // Should fit one allocation - fit, _, used, err := AllocsFit(n, []*Allocation{a1}, nil) + fit, _, used, err := AllocsFit(n, []*Allocation{a1}, nil, false) require.NoError(err) require.True(fit) @@ -350,7 +350,7 @@ func TestAllocsFit(t *testing.T) { require.EqualValues(2048, used.Flattened.Memory.MemoryMB) // Should not fit second allocation - fit, _, used, err = AllocsFit(n, []*Allocation{a1, a1}, nil) + fit, _, used, err = AllocsFit(n, []*Allocation{a1, a1}, nil, false) require.NoError(err) require.False(fit) @@ -425,7 +425,7 @@ func TestAllocsFit_TerminalAlloc(t *testing.T) { } // Should fit one allocation - fit, _, used, err := AllocsFit(n, []*Allocation{a1}, nil) + fit, _, used, err := AllocsFit(n, []*Allocation{a1}, nil, false) require.NoError(err) require.True(fit) @@ -436,7 +436,7 @@ func TestAllocsFit_TerminalAlloc(t *testing.T) { // Should fit second allocation since it is terminal a2 := a1.Copy() a2.DesiredStatus = AllocDesiredStatusStop - fit, dim, used, err := AllocsFit(n, []*Allocation{a1, a2}, nil) + fit, dim, used, err := AllocsFit(n, []*Allocation{a1, a2}, nil, false) require.NoError(err) require.True(fit, dim) @@ -445,6 +445,72 @@ func TestAllocsFit_TerminalAlloc(t *testing.T) { require.EqualValues(2048, used.Flattened.Memory.MemoryMB) } +// Tests that AllocsFit detects device collisions +func TestAllocsFit_Devices(t *testing.T) { + require := require.New(t) + + n := MockNvidiaNode() + a1 := &Allocation{ + AllocatedResources: &AllocatedResources{ + Tasks: map[string]*AllocatedTaskResources{ + "web": { + Cpu: AllocatedCpuResources{ + CpuShares: 1000, + }, + Memory: AllocatedMemoryResources{ + MemoryMB: 1024, + }, + Devices: []*AllocatedDeviceResource{ + { + Type: "gpu", + Vendor: "nvidia", + Name: "1080ti", + DeviceIDs: []string{n.NodeResources.Devices[0].Instances[0].ID}, + }, + }, + }, + }, + Shared: AllocatedSharedResources{ + DiskMB: 5000, + }, + }, + } + a2 := a1.Copy() + a2.AllocatedResources.Tasks["web"] = &AllocatedTaskResources{ + Cpu: AllocatedCpuResources{ + CpuShares: 1000, + }, + Memory: AllocatedMemoryResources{ + MemoryMB: 1024, + }, + Devices: []*AllocatedDeviceResource{ + { + Type: "gpu", + Vendor: "nvidia", + Name: "1080ti", + DeviceIDs: []string{n.NodeResources.Devices[0].Instances[0].ID}, // Use the same ID + }, + }, + } + + // Should fit one allocation + fit, _, _, err := AllocsFit(n, []*Allocation{a1}, nil, true) + require.NoError(err) + require.True(fit) + + // Should not fit second allocation + fit, msg, _, err := AllocsFit(n, []*Allocation{a1, a1}, nil, true) + require.NoError(err) + require.False(fit) + require.Equal("device oversubscribed", msg) + + // Should not fit second allocation but won't detect since we disabled + // devices + fit, _, _, err = AllocsFit(n, []*Allocation{a1, a1}, nil, false) + require.NoError(err) + require.True(fit) +} + // COMPAT(0.11): Remove in 0.11 func TestScoreFit_Old(t *testing.T) { node := &Node{} diff --git a/scheduler/rank.go b/scheduler/rank.go index e313f1d6b..55d45c995 100644 --- a/scheduler/rank.go +++ b/scheduler/rank.go @@ -313,8 +313,8 @@ OUTER: // Add the resources we are trying to fit proposed = append(proposed, &structs.Allocation{AllocatedResources: total}) - // Check if these allocations fit - fit, dim, util, _ := structs.AllocsFit(option.Node, proposed, netIdx) + // Check if these allocations fit, if they do not, simply skip this node + fit, dim, util, _ := structs.AllocsFit(option.Node, proposed, netIdx, false) netIdx.Release() if !fit { // Skip the node if evictions are not enabled From 36abd3a3d878a67a601225f9479dab063ac453f5 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Wed, 31 Oct 2018 13:57:43 -0700 Subject: [PATCH 6/6] review comments --- nomad/plan_apply_test.go | 13 ++++--------- nomad/structs/devices.go | 3 ++- nomad/structs/funcs_test.go | 4 ++-- scheduler/device.go | 5 ++++- scheduler/rank.go | 10 ++++++---- 5 files changed, 18 insertions(+), 17 deletions(-) diff --git a/nomad/plan_apply_test.go b/nomad/plan_apply_test.go index 76ed0ad89..4dfa7a43b 100644 --- a/nomad/plan_apply_test.go +++ b/nomad/plan_apply_test.go @@ -643,6 +643,7 @@ func TestPlanApply_EvalNodePlan_NodeFull(t *testing.T) { // Test that we detect device oversubscription func TestPlanApply_EvalNodePlan_NodeFull_Device(t *testing.T) { t.Parallel() + require := require.New(t) alloc := mock.Alloc() state := testStateStore(t) node := mock.NvidiaNode() @@ -688,15 +689,9 @@ func TestPlanApply_EvalNodePlan_NodeFull_Device(t *testing.T) { } fit, reason, err := evaluateNodePlan(snap, plan, node.ID) - if err != nil { - t.Fatalf("err: %v", err) - } - if fit { - t.Fatalf("bad") - } - if reason != "device oversubscribed" { - t.Fatalf("bad: %q", reason) - } + require.NoError(err) + require.False(fit) + require.Equal("device oversubscribed", reason) } func TestPlanApply_EvalNodePlan_UpdateExisting(t *testing.T) { diff --git a/nomad/structs/devices.go b/nomad/structs/devices.go index 5ee537e04..9ad363454 100644 --- a/nomad/structs/devices.go +++ b/nomad/structs/devices.go @@ -4,6 +4,7 @@ package structs // when a node is oversubscribed and can be used for deciding what devices are // free type DeviceAccounter struct { + // Devices maps a device group to its device accounter instance Devices map[DeviceIdTuple]*DeviceAccounterInstance } @@ -13,7 +14,7 @@ type DeviceAccounterInstance struct { // Device is the device being wrapped Device *NodeDeviceResource - // Instances is a mapping of the device IDs of the instances to their usage. + // Instances is a mapping of the device IDs to their usage. // Only a value of 0 indicates that the instance is unused. Instances map[string]int } diff --git a/nomad/structs/funcs_test.go b/nomad/structs/funcs_test.go index db81c67b5..1408c49f0 100644 --- a/nomad/structs/funcs_test.go +++ b/nomad/structs/funcs_test.go @@ -499,14 +499,14 @@ func TestAllocsFit_Devices(t *testing.T) { require.True(fit) // Should not fit second allocation - fit, msg, _, err := AllocsFit(n, []*Allocation{a1, a1}, nil, true) + fit, msg, _, err := AllocsFit(n, []*Allocation{a1, a2}, nil, true) require.NoError(err) require.False(fit) require.Equal("device oversubscribed", msg) // Should not fit second allocation but won't detect since we disabled // devices - fit, _, _, err = AllocsFit(n, []*Allocation{a1, a1}, nil, false) + fit, _, _, err = AllocsFit(n, []*Allocation{a1, a2}, nil, false) require.NoError(err) require.True(fit) } diff --git a/scheduler/device.go b/scheduler/device.go index a922bee10..9375a4000 100644 --- a/scheduler/device.go +++ b/scheduler/device.go @@ -64,6 +64,7 @@ func (d *deviceAllocator) AssignDevice(ask *structs.RequestedDevice) (out *struc // Score the choice var choiceScore float64 if l := len(ask.Affinities); l != 0 { + totalWeight := 0.0 for _, a := range ask.Affinities { // Resolve the targets lVal, ok := resolveDeviceTarget(a.LTarget, devInst.Device) @@ -75,6 +76,8 @@ func (d *deviceAllocator) AssignDevice(ask *structs.RequestedDevice) (out *struc continue } + totalWeight += a.Weight + // Check if satisfied if !checkAttributeAffinity(d.ctx, a.Operand, lVal, rVal) { continue @@ -83,7 +86,7 @@ func (d *deviceAllocator) AssignDevice(ask *structs.RequestedDevice) (out *struc } // normalize - choiceScore /= float64(l) + choiceScore /= totalWeight } // Only use the device if it is a higher score than we have already seen diff --git a/scheduler/rank.go b/scheduler/rank.go index 55d45c995..e8ee27ae8 100644 --- a/scheduler/rank.go +++ b/scheduler/rank.go @@ -196,7 +196,7 @@ OUTER: devAllocator.AddAllocs(proposed) // Track the affinities of the devices - devicesWithAffinities := 0 + totalDeviceAffinityWeight := 0.0 deviceAffinityScore := 0.0 // Assign the resources for each task @@ -295,7 +295,9 @@ OUTER: // Add the scores if len(req.Affinities) != 0 { - devicesWithAffinities++ + for _, a := range req.Affinities { + totalDeviceAffinityWeight += a.Weight + } deviceAffinityScore += score } } @@ -350,8 +352,8 @@ OUTER: iter.ctx.Metrics().ScoreNode(option.Node, "binpack", normalizedFit) // Score the device affinity - if devicesWithAffinities != 0 { - deviceAffinityScore /= float64(devicesWithAffinities) + if totalDeviceAffinityWeight != 0 { + deviceAffinityScore /= totalDeviceAffinityWeight option.Scores = append(option.Scores, deviceAffinityScore) iter.ctx.Metrics().ScoreNode(option.Node, "devices", deviceAffinityScore) }