diff --git a/nomad/mock/mock.go b/nomad/mock/mock.go index 3b90ee15d..04ce66ecf 100644 --- a/nomad/mock/mock.go +++ b/nomad/mock/mock.go @@ -6,6 +6,7 @@ import ( "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/structs" + psstructs "github.com/hashicorp/nomad/plugins/shared/structs" ) func Node() *structs.Node { @@ -91,6 +92,36 @@ func Node() *structs.Node { return node } +// NvidiaNode returns a node with two instances of an Nvidia GPU +func NvidiaNode() *structs.Node { + n := Node() + n.NodeResources.Devices = []*structs.NodeDeviceResource{ + { + Type: "gpu", + Vendor: "nvidia", + Name: "1080ti", + Attributes: map[string]*psstructs.Attribute{ + "memory": psstructs.NewIntAttribute(11, psstructs.UnitGiB), + "cuda_cores": psstructs.NewIntAttribute(3584, ""), + "graphics_clock": psstructs.NewIntAttribute(1480, psstructs.UnitMHz), + "memory_bandwidth": psstructs.NewIntAttribute(11, psstructs.UnitGBPerS), + }, + Instances: []*structs.NodeDevice{ + { + ID: uuid.Generate(), + Healthy: true, + }, + { + ID: uuid.Generate(), + Healthy: true, + }, + }, + }, + } + n.ComputeClass() + return n +} + func HCL() string { return `job "my-job" { datacenters = ["dc1"] diff --git a/nomad/plan_apply.go b/nomad/plan_apply.go index 95dd007f0..28a2b14db 100644 --- a/nomad/plan_apply.go +++ b/nomad/plan_apply.go @@ -5,10 +5,9 @@ import ( "runtime" "time" + "github.com/armon/go-metrics" log "github.com/hashicorp/go-hclog" memdb "github.com/hashicorp/go-memdb" - - "github.com/armon/go-metrics" "github.com/hashicorp/go-multierror" "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/state" @@ -536,6 +535,6 @@ func evaluateNodePlan(snap *state.StateSnapshot, plan *structs.Plan, nodeID stri proposed = append(proposed, plan.NodeAllocation[nodeID]...) // Check if these allocations fit - fit, reason, _, err := structs.AllocsFit(node, proposed, nil) + fit, reason, _, err := structs.AllocsFit(node, proposed, nil, true) return fit, reason, err } diff --git a/nomad/plan_apply_test.go b/nomad/plan_apply_test.go index 1b5142aa9..4dfa7a43b 100644 --- a/nomad/plan_apply_test.go +++ b/nomad/plan_apply_test.go @@ -640,6 +640,60 @@ func TestPlanApply_EvalNodePlan_NodeFull(t *testing.T) { } } +// Test that we detect device oversubscription +func TestPlanApply_EvalNodePlan_NodeFull_Device(t *testing.T) { + t.Parallel() + require := require.New(t) + alloc := mock.Alloc() + state := testStateStore(t) + node := mock.NvidiaNode() + node.ReservedResources = nil + + nvidia0 := node.NodeResources.Devices[0].Instances[0].ID + + // Have the allocation use a Nvidia device + alloc.NodeID = node.ID + alloc.AllocatedResources.Tasks["web"].Devices = []*structs.AllocatedDeviceResource{ + { + Type: "gpu", + Vendor: "nvidia", + Name: "1080ti", + DeviceIDs: []string{nvidia0}, + }, + } + + state.UpsertJobSummary(999, mock.JobSummary(alloc.JobID)) + state.UpsertNode(1000, node) + state.UpsertAllocs(1001, []*structs.Allocation{alloc}) + + // Alloc2 tries to use the same device + alloc2 := mock.Alloc() + alloc2.AllocatedResources.Tasks["web"].Networks = nil + alloc2.AllocatedResources.Tasks["web"].Devices = []*structs.AllocatedDeviceResource{ + { + Type: "gpu", + Vendor: "nvidia", + Name: "1080ti", + DeviceIDs: []string{nvidia0}, + }, + } + alloc2.NodeID = node.ID + state.UpsertJobSummary(1200, mock.JobSummary(alloc2.JobID)) + + snap, _ := state.Snapshot() + plan := &structs.Plan{ + Job: alloc.Job, + NodeAllocation: map[string][]*structs.Allocation{ + node.ID: {alloc2}, + }, + } + + fit, reason, err := evaluateNodePlan(snap, plan, node.ID) + require.NoError(err) + require.False(fit) + require.Equal("device oversubscribed", reason) +} + func TestPlanApply_EvalNodePlan_UpdateExisting(t *testing.T) { t.Parallel() alloc := mock.Alloc() diff --git a/nomad/structs/devices.go b/nomad/structs/devices.go new file mode 100644 index 000000000..9ad363454 --- /dev/null +++ b/nomad/structs/devices.go @@ -0,0 +1,131 @@ +package structs + +// DeviceAccounter is used to account for device usage on a node. It can detect +// when a node is oversubscribed and can be used for deciding what devices are +// free +type DeviceAccounter struct { + // Devices maps a device group to its device accounter instance + Devices map[DeviceIdTuple]*DeviceAccounterInstance +} + +// DeviceAccounterInstance wraps a device and adds tracking to the instances of +// the device to determine if they are free or not. +type DeviceAccounterInstance struct { + // Device is the device being wrapped + Device *NodeDeviceResource + + // Instances is a mapping of the device IDs to their usage. + // Only a value of 0 indicates that the instance is unused. + Instances map[string]int +} + +// NewDeviceAccounter returns a new device accounter. The node is used to +// populate the set of available devices based on what healthy device instances +// exist on the node. +func NewDeviceAccounter(n *Node) *DeviceAccounter { + numDevices := 0 + var devices []*NodeDeviceResource + + // COMPAT(0.11): Remove in 0.11 + if n.NodeResources != nil { + numDevices = len(n.NodeResources.Devices) + devices = n.NodeResources.Devices + } + + d := &DeviceAccounter{ + Devices: make(map[DeviceIdTuple]*DeviceAccounterInstance, numDevices), + } + + for _, dev := range devices { + id := *dev.ID() + d.Devices[id] = &DeviceAccounterInstance{ + Device: dev, + Instances: make(map[string]int, len(dev.Instances)), + } + for _, instance := range dev.Instances { + // Skip unhealthy devices as they aren't allocatable + if !instance.Healthy { + continue + } + + d.Devices[id].Instances[instance.ID] = 0 + } + } + + return d +} + +// AddAllocs takes a set of allocations and internally marks which devices are +// used. If a device is used more than once by the set of passed allocations, +// the collision will be returned as true. +func (d *DeviceAccounter) AddAllocs(allocs []*Allocation) (collision bool) { + for _, a := range allocs { + // Filter any terminal allocation + if a.TerminalStatus() { + continue + } + + // COMPAT(0.11): Remove in 0.11 + // If the alloc doesn't have the new style resources, it can't have + // devices + if a.AllocatedResources == nil { + continue + } + + // Go through each task resource + for _, tr := range a.AllocatedResources.Tasks { + + // Go through each assigned device group + for _, device := range tr.Devices { + devID := device.ID() + + // Go through each assigned device + for _, instanceID := range device.DeviceIDs { + + // Mark that we are using the device. It may not be in the + // map if the device is no longer being fingerprinted, is + // unhealthy, etc. + if devInst, ok := d.Devices[*devID]; ok { + if i, ok := devInst.Instances[instanceID]; ok { + // Mark that the device is in use + devInst.Instances[instanceID]++ + + if i != 0 { + collision = true + } + } + } + } + } + } + } + + return +} + +// AddReserved marks the device instances in the passed device reservation as +// used and returns if there is a collision. +func (d *DeviceAccounter) AddReserved(res *AllocatedDeviceResource) (collision bool) { + // Lookup the device. + devInst, ok := d.Devices[*res.ID()] + if !ok { + return false + } + + // For each reserved instance, mark it as used + for _, id := range res.DeviceIDs { + cur, ok := devInst.Instances[id] + if !ok { + continue + } + + // It has already been used, so mark that there is a collision + if cur != 0 { + collision = true + } + + devInst.Instances[id]++ + } + + return +} diff --git a/nomad/structs/devices_test.go b/nomad/structs/devices_test.go new file mode 100644 index 000000000..013b6fcec --- /dev/null +++ b/nomad/structs/devices_test.go @@ -0,0 +1,216 @@ +package structs + +import ( + "testing" + + "github.com/hashicorp/nomad/helper/uuid" + psstructs "github.com/hashicorp/nomad/plugins/shared/structs" + "github.com/stretchr/testify/require" +) + +// nvidiaAllocatedDevice returns an allocated nvidia device +func nvidiaAllocatedDevice() *AllocatedDeviceResource { + return &AllocatedDeviceResource{ + Type: "gpu", + Vendor: "nvidia", + Name: "1080ti", + DeviceIDs: []string{uuid.Generate()}, + } +} + +// nvidiaAlloc returns an allocation that has been assigned an nvidia device. +func nvidiaAlloc() *Allocation { + a := MockAlloc() + a.AllocatedResources.Tasks["web"].Devices = []*AllocatedDeviceResource{ + nvidiaAllocatedDevice(), + } + return a +} + +// devNode returns a node containing two devices, an nvidia gpu and an intel +// FPGA. +func devNode() *Node { + n := MockNvidiaNode() + n.NodeResources.Devices = append(n.NodeResources.Devices, &NodeDeviceResource{ + Type: "fpga", + Vendor: "intel", + Name: "F100", + Attributes: map[string]*psstructs.Attribute{ + "memory": psstructs.NewIntAttribute(4, psstructs.UnitGiB), + }, + Instances: []*NodeDevice{ + { + ID: uuid.Generate(), + Healthy: true, + }, + { + ID: uuid.Generate(), + Healthy: false, + }, + }, + }) + return n +} + +// Make sure that the device accounter works even if the node has no devices +func TestDeviceAccounter_AddAllocs_NoDeviceNode(t *testing.T) { + require := require.New(t) + n := MockNode() + d := NewDeviceAccounter(n) + require.NotNil(d) + + // Create three allocations, one with a device, one without, and one + // terminal + a1, a2, a3 := MockAlloc(), nvidiaAlloc(), MockAlloc() + allocs := []*Allocation{a1, a2, a3} + a3.DesiredStatus = AllocDesiredStatusStop + + require.False(d.AddAllocs(allocs)) + require.Len(d.Devices, 0) +} + +// Add allocs to a node with a device +func TestDeviceAccounter_AddAllocs(t *testing.T) { + require := require.New(t) + n := devNode() + d := NewDeviceAccounter(n) + require.NotNil(d) + + // Create three allocations, one with a device, one without, and one + // terminal + a1, a2, a3 := MockAlloc(), nvidiaAlloc(), MockAlloc() + + nvidiaDev0ID := n.NodeResources.Devices[0].Instances[0].ID + intelDev0ID := n.NodeResources.Devices[1].Instances[0].ID + a2.AllocatedResources.Tasks["web"].Devices[0].DeviceIDs = []string{nvidiaDev0ID} + + allocs := []*Allocation{a1, a2, a3} + a3.DesiredStatus = AllocDesiredStatusStop + + require.False(d.AddAllocs(allocs)) + require.Len(d.Devices, 2) + + // Check that we have two devices for nvidia and that one of them is used + nvidiaDevice, ok := d.Devices[*n.NodeResources.Devices[0].ID()] + require.True(ok) + require.Len(nvidiaDevice.Instances, 2) + require.Contains(nvidiaDevice.Instances, nvidiaDev0ID) + require.Equal(1, nvidiaDevice.Instances[nvidiaDev0ID]) + + // Check only one instance of the intel device is set up since the other is + // unhealthy + intelDevice, ok := d.Devices[*n.NodeResources.Devices[1].ID()] + require.True(ok) + require.Len(intelDevice.Instances, 1) + require.Equal(0, intelDevice.Instances[intelDev0ID]) +} + +// Add alloc with unknown ID to a node with devices. This tests that we can +// operate on previous allocs even if the device has changed to unhealthy and we +// don't track it +func TestDeviceAccounter_AddAllocs_UnknownID(t *testing.T) { + require := require.New(t) + n := devNode() + d := NewDeviceAccounter(n) + require.NotNil(d) + + // Create three allocations, one with a device, one without, and one + // terminal + a1, a2, a3 := MockAlloc(), nvidiaAlloc(), MockAlloc() + + // a2 will have a random ID since it is generated + + allocs := []*Allocation{a1, a2, a3} + a3.DesiredStatus = AllocDesiredStatusStop + + require.False(d.AddAllocs(allocs)) + require.Len(d.Devices, 2) + + // Check that we have two devices for nvidia and that one of them is used + nvidiaDevice, ok := d.Devices[*n.NodeResources.Devices[0].ID()] + require.True(ok) + require.Len(nvidiaDevice.Instances, 2) + for _, v := range nvidiaDevice.Instances { + require.Equal(0, v) + } +} + +// Test that collision detection works +func TestDeviceAccounter_AddAllocs_Collision(t *testing.T) { + require := require.New(t) + n := devNode() + d := NewDeviceAccounter(n) + require.NotNil(d) + + // Create two allocations, both with the same device + a1, a2 := nvidiaAlloc(), nvidiaAlloc() + + nvidiaDev0ID := n.NodeResources.Devices[0].Instances[0].ID + a1.AllocatedResources.Tasks["web"].Devices[0].DeviceIDs = []string{nvidiaDev0ID} + a2.AllocatedResources.Tasks["web"].Devices[0].DeviceIDs = []string{nvidiaDev0ID} + + allocs := []*Allocation{a1, a2} + require.True(d.AddAllocs(allocs)) +} + +// Make sure that the device allocator works even if the node has no devices +func TestDeviceAccounter_AddReserved_NoDeviceNode(t *testing.T) { + require := require.New(t) + n := MockNode() + d := NewDeviceAccounter(n) + require.NotNil(d) + + require.False(d.AddReserved(nvidiaAllocatedDevice())) + require.Len(d.Devices, 0) +} + +// Add reserved to a node with a device +func TestDeviceAccounter_AddReserved(t *testing.T) { + require := require.New(t) + n := devNode() + d := NewDeviceAccounter(n) + require.NotNil(d) + + nvidiaDev0ID := n.NodeResources.Devices[0].Instances[0].ID + intelDev0ID := n.NodeResources.Devices[1].Instances[0].ID + + res := nvidiaAllocatedDevice() + res.DeviceIDs = []string{nvidiaDev0ID} + + require.False(d.AddReserved(res)) + require.Len(d.Devices, 2) + + // Check that we have two devices for nvidia and that one of them is used + nvidiaDevice, ok := d.Devices[*n.NodeResources.Devices[0].ID()] + require.True(ok) + require.Len(nvidiaDevice.Instances, 2) + require.Contains(nvidiaDevice.Instances, nvidiaDev0ID) + require.Equal(1, nvidiaDevice.Instances[nvidiaDev0ID]) + + // Check only one instance of the intel device is set up since the other is + // unhealthy + intelDevice, ok := d.Devices[*n.NodeResources.Devices[1].ID()] + require.True(ok) + require.Len(intelDevice.Instances, 1) + require.Equal(0, intelDevice.Instances[intelDev0ID]) +} + +// Test that collision detection works +func TestDeviceAccounter_AddReserved_Collision(t *testing.T) { + require := require.New(t) + n := devNode() + d := NewDeviceAccounter(n) + require.NotNil(d) + + nvidiaDev0ID := n.NodeResources.Devices[0].Instances[0].ID + + // Create an alloc with nvidia + a1 := nvidiaAlloc() + a1.AllocatedResources.Tasks["web"].Devices[0].DeviceIDs = []string{nvidiaDev0ID} + require.False(d.AddAllocs([]*Allocation{a1})) + + // Reserve the same device + res := nvidiaAllocatedDevice() + res.DeviceIDs = []string{nvidiaDev0ID} + require.True(d.AddReserved(res)) +} diff --git a/nomad/structs/funcs.go b/nomad/structs/funcs.go index 7493596f8..f79cb6c06 100644 --- a/nomad/structs/funcs.go +++ b/nomad/structs/funcs.go @@ -10,11 +10,10 @@ import ( "strconv" "strings" - "golang.org/x/crypto/blake2b" - multierror "github.com/hashicorp/go-multierror" lru "github.com/hashicorp/golang-lru" "github.com/hashicorp/nomad/acl" + "golang.org/x/crypto/blake2b" ) // MergeMultierrorWarnings takes job warnings and canonicalize warnings and @@ -98,8 +97,9 @@ func FilterTerminalAllocs(allocs []*Allocation) ([]*Allocation, map[string]*Allo // AllocsFit checks if a given set of allocations will fit on a node. // The netIdx can optionally be provided if its already been computed. // If the netIdx is provided, it is assumed that the client has already -// ensured there are no collisions. -func AllocsFit(node *Node, allocs []*Allocation, netIdx *NetworkIndex) (bool, string, *ComparableResources, error) { +// ensured there are no collisions. If checkDevices is set to true, we check if +// there is a device oversubscription. +func AllocsFit(node *Node, allocs []*Allocation, netIdx *NetworkIndex, checkDevices bool) (bool, string, *ComparableResources, error) { // Compute the utilization from zero used := new(ComparableResources) @@ -136,6 +136,14 @@ func AllocsFit(node *Node, allocs []*Allocation, netIdx *NetworkIndex) (bool, st return false, "bandwidth exceeded", used, nil } + // Check devices + if checkDevices { + accounter := NewDeviceAccounter(node) + if accounter.AddAllocs(allocs) { + return false, "device oversubscribed", used, nil + } + } + // Allocations fit! return true, "", used, nil } diff --git a/nomad/structs/funcs_test.go b/nomad/structs/funcs_test.go index 01571ba34..1408c49f0 100644 --- a/nomad/structs/funcs_test.go +++ b/nomad/structs/funcs_test.go @@ -116,7 +116,7 @@ func TestAllocsFit_PortsOvercommitted_Old(t *testing.T) { } // Should fit one allocation - fit, dim, _, err := AllocsFit(n, []*Allocation{a1}, nil) + fit, dim, _, err := AllocsFit(n, []*Allocation{a1}, nil, false) if err != nil { t.Fatalf("err: %v", err) } @@ -125,7 +125,7 @@ func TestAllocsFit_PortsOvercommitted_Old(t *testing.T) { } // Should not fit second allocation - fit, _, _, err = AllocsFit(n, []*Allocation{a1, a1}, nil) + fit, _, _, err = AllocsFit(n, []*Allocation{a1, a1}, nil, false) if err != nil { t.Fatalf("err: %v", err) } @@ -186,7 +186,7 @@ func TestAllocsFit_Old(t *testing.T) { } // Should fit one allocation - fit, _, used, err := AllocsFit(n, []*Allocation{a1}, nil) + fit, _, used, err := AllocsFit(n, []*Allocation{a1}, nil, false) require.NoError(err) require.True(fit) @@ -195,7 +195,7 @@ func TestAllocsFit_Old(t *testing.T) { require.EqualValues(2048, used.Flattened.Memory.MemoryMB) // Should not fit second allocation - fit, _, used, err = AllocsFit(n, []*Allocation{a1, a1}, nil) + fit, _, used, err = AllocsFit(n, []*Allocation{a1, a1}, nil, false) require.NoError(err) require.False(fit) @@ -256,7 +256,7 @@ func TestAllocsFit_TerminalAlloc_Old(t *testing.T) { } // Should fit one allocation - fit, _, used, err := AllocsFit(n, []*Allocation{a1}, nil) + fit, _, used, err := AllocsFit(n, []*Allocation{a1}, nil, false) require.NoError(err) require.True(fit) @@ -267,7 +267,7 @@ func TestAllocsFit_TerminalAlloc_Old(t *testing.T) { // Should fit second allocation since it is terminal a2 := a1.Copy() a2.DesiredStatus = AllocDesiredStatusStop - fit, _, used, err = AllocsFit(n, []*Allocation{a1, a2}, nil) + fit, _, used, err = AllocsFit(n, []*Allocation{a1, a2}, nil, false) require.NoError(err) require.True(fit) @@ -341,7 +341,7 @@ func TestAllocsFit(t *testing.T) { } // Should fit one allocation - fit, _, used, err := AllocsFit(n, []*Allocation{a1}, nil) + fit, _, used, err := AllocsFit(n, []*Allocation{a1}, nil, false) require.NoError(err) require.True(fit) @@ -350,7 +350,7 @@ func TestAllocsFit(t *testing.T) { require.EqualValues(2048, used.Flattened.Memory.MemoryMB) // Should not fit second allocation - fit, _, used, err = AllocsFit(n, []*Allocation{a1, a1}, nil) + fit, _, used, err = AllocsFit(n, []*Allocation{a1, a1}, nil, false) require.NoError(err) require.False(fit) @@ -425,7 +425,7 @@ func TestAllocsFit_TerminalAlloc(t *testing.T) { } // Should fit one allocation - fit, _, used, err := AllocsFit(n, []*Allocation{a1}, nil) + fit, _, used, err := AllocsFit(n, []*Allocation{a1}, nil, false) require.NoError(err) require.True(fit) @@ -436,7 +436,7 @@ func TestAllocsFit_TerminalAlloc(t *testing.T) { // Should fit second allocation since it is terminal a2 := a1.Copy() a2.DesiredStatus = AllocDesiredStatusStop - fit, dim, used, err := AllocsFit(n, []*Allocation{a1, a2}, nil) + fit, dim, used, err := AllocsFit(n, []*Allocation{a1, a2}, nil, false) require.NoError(err) require.True(fit, dim) @@ -445,6 +445,72 @@ func TestAllocsFit_TerminalAlloc(t *testing.T) { require.EqualValues(2048, used.Flattened.Memory.MemoryMB) } +// Tests that AllocsFit detects device collisions +func TestAllocsFit_Devices(t *testing.T) { + require := require.New(t) + + n := MockNvidiaNode() + a1 := &Allocation{ + AllocatedResources: &AllocatedResources{ + Tasks: map[string]*AllocatedTaskResources{ + "web": { + Cpu: AllocatedCpuResources{ + CpuShares: 1000, + }, + Memory: AllocatedMemoryResources{ + MemoryMB: 1024, + }, + Devices: []*AllocatedDeviceResource{ + { + Type: "gpu", + Vendor: "nvidia", + Name: "1080ti", + DeviceIDs: []string{n.NodeResources.Devices[0].Instances[0].ID}, + }, + }, + }, + }, + Shared: AllocatedSharedResources{ + DiskMB: 5000, + }, + }, + } + a2 := a1.Copy() + a2.AllocatedResources.Tasks["web"] = &AllocatedTaskResources{ + Cpu: AllocatedCpuResources{ + CpuShares: 1000, + }, + Memory: AllocatedMemoryResources{ + MemoryMB: 1024, + }, + Devices: []*AllocatedDeviceResource{ + { + Type: "gpu", + Vendor: "nvidia", + Name: "1080ti", + DeviceIDs: []string{n.NodeResources.Devices[0].Instances[0].ID}, // Use the same ID + }, + }, + } + + // Should fit one allocation + fit, _, _, err := AllocsFit(n, []*Allocation{a1}, nil, true) + require.NoError(err) + require.True(fit) + + // Should not fit second allocation + fit, msg, _, err := AllocsFit(n, []*Allocation{a1, a2}, nil, true) + require.NoError(err) + require.False(fit) + require.Equal("device oversubscribed", msg) + + // Should not fit second allocation but won't detect since we disabled + // devices + fit, _, _, err = AllocsFit(n, []*Allocation{a1, a2}, nil, false) + require.NoError(err) + require.True(fit) +} + // COMPAT(0.11): Remove in 0.11 func TestScoreFit_Old(t *testing.T) { node := &Node{} diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index d5665f9b8..f999739ae 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -2388,6 +2388,17 @@ func (id *DeviceIdTuple) Matches(other *DeviceIdTuple) bool { return true } +// Equals returns if this Device ID is the same as the passed ID. +func (id *DeviceIdTuple) Equals(o *DeviceIdTuple) bool { + if id == nil && o == nil { + return true + } else if id == nil || o == nil { + return false + } + + return o.Vendor == id.Vendor && o.Type == id.Type && o.Name == id.Name +} + // NodeDeviceResource captures a set of devices sharing a common // vendor/type/device_name tuple. type NodeDeviceResource struct { @@ -2694,6 +2705,7 @@ type AllocatedTaskResources struct { Cpu AllocatedCpuResources Memory AllocatedMemoryResources Networks Networks + Devices []*AllocatedDeviceResource } func (a *AllocatedTaskResources) Copy() *AllocatedTaskResources { @@ -2702,6 +2714,8 @@ func (a *AllocatedTaskResources) Copy() *AllocatedTaskResources { } newA := new(AllocatedTaskResources) *newA = *a + + // Copy the networks if a.Networks != nil { n := len(a.Networks) newA.Networks = make([]*NetworkResource, n) @@ -2709,6 +2723,16 @@ func (a *AllocatedTaskResources) Copy() *AllocatedTaskResources { newA.Networks[i] = a.Networks[i].Copy() } } + + // Copy the devices + if newA.Devices != nil { + n := len(a.Devices) + newA.Devices = make([]*AllocatedDeviceResource, n) + for i := 0; i < n; i++ { + newA.Devices[i] = a.Devices[i].Copy() + } + } + return newA } @@ -2734,6 +2758,16 @@ func (a *AllocatedTaskResources) Add(delta *AllocatedTaskResources) { a.Networks[idx].Add(n) } } + + for _, d := range delta.Devices { + // Find the matching device + idx := AllocatedDevices(a.Devices).Index(d) + if idx == -1 { + a.Devices = append(a.Devices, d.Copy()) + } else { + a.Devices[idx].Add(d) + } + } } // Comparable turns AllocatedTaskResources into ComparableResources @@ -2831,6 +2865,72 @@ func (a *AllocatedMemoryResources) Subtract(delta *AllocatedMemoryResources) { a.MemoryMB -= delta.MemoryMB } +type AllocatedDevices []*AllocatedDeviceResource + +// Index finds the matching index using the passed device. If not found, -1 is +// returned. +func (a AllocatedDevices) Index(d *AllocatedDeviceResource) int { + if d == nil { + return -1 + } + + for i, o := range a { + if o.ID().Equals(d.ID()) { + return i + } + } + + return -1 +} + +// AllocatedDeviceResource captures a set of allocated devices. +type AllocatedDeviceResource struct { + // Vendor, Type, and Name are used to select the plugin to request the + // device IDs from. + Vendor string + Type string + Name string + + // DeviceIDs is the set of allocated devices + DeviceIDs []string +} + +func (a *AllocatedDeviceResource) ID() *DeviceIdTuple { + if a == nil { + return nil + } + + return &DeviceIdTuple{ + Vendor: a.Vendor, + Type: a.Type, + Name: a.Name, + } +} + +func (a *AllocatedDeviceResource) Add(delta *AllocatedDeviceResource) { + if delta == nil { + return + } + + a.DeviceIDs = append(a.DeviceIDs, delta.DeviceIDs...) +} + +func (a *AllocatedDeviceResource) Copy() *AllocatedDeviceResource { + if a == nil { + return a + } + + na := *a + + // Copy the devices + na.DeviceIDs = make([]string, len(a.DeviceIDs)) + for i, id := range a.DeviceIDs { + na.DeviceIDs[i] = id + } + + return &na +} + // ComparableResources is the set of resources allocated to a task group but // not keyed by Task, making it easier to compare. type ComparableResources struct { diff --git a/nomad/structs/testing.go b/nomad/structs/testing.go index bbe2a2e58..7f4435366 100644 --- a/nomad/structs/testing.go +++ b/nomad/structs/testing.go @@ -1,5 +1,13 @@ package structs +import ( + "fmt" + "time" + + "github.com/hashicorp/nomad/helper/uuid" + psstructs "github.com/hashicorp/nomad/plugins/shared/structs" +) + // NodeResourcesToAllocatedResources converts a node resources to an allocated // resources. The task name used is "web" and network is omitted. This is // useful when trying to make an allocation fill an entire node. @@ -24,3 +32,240 @@ func NodeResourcesToAllocatedResources(n *NodeResources) *AllocatedResources { }, } } + +func MockNode() *Node { + node := &Node{ + ID: uuid.Generate(), + SecretID: uuid.Generate(), + Datacenter: "dc1", + Name: "foobar", + Attributes: map[string]string{ + "kernel.name": "linux", + "arch": "x86", + "nomad.version": "0.5.0", + "driver.exec": "1", + "driver.mock_driver": "1", + }, + NodeResources: &NodeResources{ + Cpu: NodeCpuResources{ + CpuShares: 4000, + }, + Memory: NodeMemoryResources{ + MemoryMB: 8192, + }, + Disk: NodeDiskResources{ + DiskMB: 100 * 1024, + }, + Networks: []*NetworkResource{ + { + Device: "eth0", + CIDR: "192.168.0.100/32", + MBits: 1000, + }, + }, + }, + ReservedResources: &NodeReservedResources{ + Cpu: NodeReservedCpuResources{ + CpuShares: 100, + }, + Memory: NodeReservedMemoryResources{ + MemoryMB: 256, + }, + Disk: NodeReservedDiskResources{ + DiskMB: 4 * 1024, + }, + Networks: NodeReservedNetworkResources{ + ReservedHostPorts: "22", + }, + }, + Links: map[string]string{ + "consul": "foobar.dc1", + }, + Meta: map[string]string{ + "pci-dss": "true", + "database": "mysql", + "version": "5.6", + }, + NodeClass: "linux-medium-pci", + Status: NodeStatusReady, + SchedulingEligibility: NodeSchedulingEligible, + } + node.ComputeClass() + return node +} + +// NvidiaNode returns a node with two instances of an Nvidia GPU +func MockNvidiaNode() *Node { + n := MockNode() + n.NodeResources.Devices = []*NodeDeviceResource{ + { + Type: "gpu", + Vendor: "nvidia", + Name: "1080ti", + Attributes: map[string]*psstructs.Attribute{ + "memory": psstructs.NewIntAttribute(11, psstructs.UnitGiB), + "cuda_cores": psstructs.NewIntAttribute(3584, ""), + "graphics_clock": psstructs.NewIntAttribute(1480, psstructs.UnitMHz), + "memory_bandwidth": psstructs.NewIntAttribute(11, psstructs.UnitGBPerS), + }, + Instances: []*NodeDevice{ + { + ID: uuid.Generate(), + Healthy: true, + }, + { + ID: uuid.Generate(), + Healthy: true, + }, + }, + }, + } + n.ComputeClass() + return n +} + +func MockJob() *Job { + job := &Job{ + Region: "global", + ID: fmt.Sprintf("mock-service-%s", uuid.Generate()), + Name: "my-job", + Namespace: DefaultNamespace, + Type: JobTypeService, + Priority: 50, + AllAtOnce: false, + Datacenters: []string{"dc1"}, + Constraints: []*Constraint{ + { + LTarget: "${attr.kernel.name}", + RTarget: "linux", + Operand: "=", + }, + }, + TaskGroups: []*TaskGroup{ + { + Name: "web", + Count: 10, + EphemeralDisk: &EphemeralDisk{ + SizeMB: 150, + }, + RestartPolicy: &RestartPolicy{ + Attempts: 3, + Interval: 10 * time.Minute, + Delay: 1 * time.Minute, + Mode: RestartPolicyModeDelay, + }, + ReschedulePolicy: &ReschedulePolicy{ + Attempts: 2, + Interval: 10 * time.Minute, + Delay: 5 * time.Second, + DelayFunction: "constant", + }, + Migrate: DefaultMigrateStrategy(), + Tasks: []*Task{ + { + Name: "web", + Driver: "exec", + Config: map[string]interface{}{ + "command": "/bin/date", + }, + Env: map[string]string{ + "FOO": "bar", + }, + Services: []*Service{ + { + Name: "${TASK}-frontend", + PortLabel: "http", + Tags: []string{"pci:${meta.pci-dss}", "datacenter:${node.datacenter}"}, + Checks: []*ServiceCheck{ + { + Name: "check-table", + Type: ServiceCheckScript, + Command: "/usr/local/check-table-${meta.database}", + Args: []string{"${meta.version}"}, + Interval: 30 * time.Second, + Timeout: 5 * time.Second, + }, + }, + }, + { + Name: "${TASK}-admin", + PortLabel: "admin", + }, + }, + LogConfig: DefaultLogConfig(), + Resources: &Resources{ + CPU: 500, + MemoryMB: 256, + Networks: []*NetworkResource{ + { + MBits: 50, + DynamicPorts: []Port{ + {Label: "http"}, + {Label: "admin"}, + }, + }, + }, + }, + Meta: map[string]string{ + "foo": "bar", + }, + }, + }, + Meta: map[string]string{ + "elb_check_type": "http", + "elb_check_interval": "30s", + "elb_check_min": "3", + }, + }, + }, + Meta: map[string]string{ + "owner": "armon", + }, + Status: JobStatusPending, + Version: 0, + CreateIndex: 42, + ModifyIndex: 99, + JobModifyIndex: 99, + } + job.Canonicalize() + return job +} + +func MockAlloc() *Allocation { + alloc := &Allocation{ + ID: uuid.Generate(), + EvalID: uuid.Generate(), + NodeID: "12345678-abcd-efab-cdef-123456789abc", + Namespace: DefaultNamespace, + TaskGroup: "web", + AllocatedResources: &AllocatedResources{ + Tasks: map[string]*AllocatedTaskResources{ + "web": { + Cpu: AllocatedCpuResources{ + CpuShares: 500, + }, + Memory: AllocatedMemoryResources{ + MemoryMB: 256, + }, + Networks: []*NetworkResource{ + { + Device: "eth0", + IP: "192.168.0.100", + ReservedPorts: []Port{{Label: "admin", Value: 5000}}, + MBits: 50, + DynamicPorts: []Port{{Label: "http", Value: 9876}}, + }, + }, + }, + }, + Shared: AllocatedSharedResources{ + DiskMB: 150, + }, + }, + Job: MockJob(), + DesiredStatus: AllocDesiredStatusRun, + ClientStatus: AllocClientStatusPending, + } + alloc.JobID = alloc.Job.ID + return alloc +} diff --git a/plugins/shared/structs/units.go b/plugins/shared/structs/units.go index 597be99c6..4310b8443 100644 --- a/plugins/shared/structs/units.go +++ b/plugins/shared/structs/units.go @@ -206,7 +206,7 @@ var ( { Name: UnitMHz, Base: UnitHertz, - Multiplier: Pow(1000, 1), + Multiplier: Pow(1000, 2), }, { Name: UnitGHz, diff --git a/scheduler/device.go b/scheduler/device.go new file mode 100644 index 000000000..9375a4000 --- /dev/null +++ b/scheduler/device.go @@ -0,0 +1,126 @@ +package scheduler + +import ( + "fmt" + + "github.com/hashicorp/nomad/nomad/structs" +) + +// deviceAllocator is used to allocate devices to allocations. The allocator +// tracks availability as to not double allocate devices. +type deviceAllocator struct { + *structs.DeviceAccounter + + ctx Context +} + +// newDeviceAllocator returns a new device allocator. The node is used to +// populate the set of available devices based on what healthy device instances +// exist on the node. +func newDeviceAllocator(ctx Context, n *structs.Node) *deviceAllocator { + return &deviceAllocator{ + ctx: ctx, + DeviceAccounter: structs.NewDeviceAccounter(n), + } +} + +// AssignDevice takes a device request and returns an assignment as well as a +// score for the assignment. If no assignment could be made, an error is +// returned explaining why. +func (d *deviceAllocator) AssignDevice(ask *structs.RequestedDevice) (out *structs.AllocatedDeviceResource, score float64, err error) { + // Try to hot path + if len(d.Devices) == 0 { + return nil, 0.0, fmt.Errorf("no devices available") + } + if ask.Count == 0 { + return nil, 0.0, fmt.Errorf("invalid request of zero devices") + } + + // Hold the current best offer + var offer *structs.AllocatedDeviceResource + var offerScore float64 + + // Determine the devices that are feasible based on availability and + // constraints + for id, devInst := range d.Devices { + // Check if we have enough unused instances to use this + assignable := uint64(0) + for _, v := range devInst.Instances { + if v == 0 { + assignable++ + } + } + + // This device doesn't have enough instances + if assignable < ask.Count { + continue + } + + // Check if the device works + if !nodeDeviceMatches(d.ctx, devInst.Device, ask) { + continue + } + + // Score the choice + var choiceScore float64 + if l := len(ask.Affinities); l != 0 { + totalWeight := 0.0 + for _, a := range ask.Affinities { + // Resolve the targets + lVal, ok := resolveDeviceTarget(a.LTarget, devInst.Device) + if !ok { + continue + } + rVal, ok := resolveDeviceTarget(a.RTarget, devInst.Device) + if !ok { + continue + } + + totalWeight += a.Weight + + // Check if satisfied + if !checkAttributeAffinity(d.ctx, a.Operand, lVal, rVal) { + continue + } + choiceScore += a.Weight + } + + // normalize + choiceScore /= totalWeight + } + + // Only use the device if it is a higher score than we have already seen + if offer != nil && choiceScore < offerScore { + continue + } + + // Set the new highest score + offerScore = choiceScore + + // Build the choice + offer = &structs.AllocatedDeviceResource{ + Vendor: id.Vendor, + Type: id.Type, + Name: id.Name, + DeviceIDs: make([]string, 0, ask.Count), + } + + assigned := uint64(0) + for id, v := range devInst.Instances { + if v == 0 && assigned < ask.Count { + assigned++ + offer.DeviceIDs = append(offer.DeviceIDs, id) + if assigned == ask.Count { + break + } + } + } + } + + // Failed to find a match + if offer == nil { + return nil, 0.0, fmt.Errorf("no devices match request") + } + + return offer, offerScore, nil +} diff --git a/scheduler/device_test.go b/scheduler/device_test.go new file mode 100644 index 000000000..36643555f --- /dev/null +++ b/scheduler/device_test.go @@ -0,0 +1,358 @@ +package scheduler + +import ( + "testing" + + "github.com/hashicorp/nomad/helper/uuid" + "github.com/hashicorp/nomad/nomad/mock" + "github.com/hashicorp/nomad/nomad/structs" + psstructs "github.com/hashicorp/nomad/plugins/shared/structs" + "github.com/stretchr/testify/require" +) + +// deviceRequest takes the name, count and potential constraints and affinities +// and returns a device request. +func deviceRequest(name string, count uint64, + constraints []*structs.Constraint, affinities []*structs.Affinity) *structs.RequestedDevice { + return &structs.RequestedDevice{ + Name: name, + Count: count, + Constraints: constraints, + Affinities: affinities, + } +} + +// devNode returns a node containing two devices, an nvidia gpu and an intel +// FPGA. +func devNode() *structs.Node { + n := mock.NvidiaNode() + n.NodeResources.Devices = append(n.NodeResources.Devices, &structs.NodeDeviceResource{ + Type: "fpga", + Vendor: "intel", + Name: "F100", + Attributes: map[string]*psstructs.Attribute{ + "memory": psstructs.NewIntAttribute(4, psstructs.UnitGiB), + }, + Instances: []*structs.NodeDevice{ + { + ID: uuid.Generate(), + Healthy: true, + }, + { + ID: uuid.Generate(), + Healthy: false, + }, + }, + }) + return n +} + +// multipleNvidiaNode returns a node containing multiple nvidia device types. +func multipleNvidiaNode() *structs.Node { + n := mock.NvidiaNode() + n.NodeResources.Devices = append(n.NodeResources.Devices, &structs.NodeDeviceResource{ + Type: "gpu", + Vendor: "nvidia", + Name: "2080ti", + Attributes: map[string]*psstructs.Attribute{ + "memory": psstructs.NewIntAttribute(11, psstructs.UnitGiB), + "cuda_cores": psstructs.NewIntAttribute(4352, ""), + "graphics_clock": psstructs.NewIntAttribute(1350, psstructs.UnitMHz), + "memory_bandwidth": psstructs.NewIntAttribute(14, psstructs.UnitGBPerS), + }, + Instances: []*structs.NodeDevice{ + { + ID: uuid.Generate(), + Healthy: true, + }, + { + ID: uuid.Generate(), + Healthy: true, + }, + }, + }) + return n + +} + +// collectInstanceIDs returns the IDs of the device instances +func collectInstanceIDs(devices ...*structs.NodeDeviceResource) []string { + var out []string + for _, d := range devices { + for _, i := range d.Instances { + out = append(out, i.ID) + } + } + return out +} + +// Test that asking for a device that isn't fully specified works. +func TestDeviceAllocator_Allocate_GenericRequest(t *testing.T) { + require := require.New(t) + _, ctx := testContext(t) + n := devNode() + d := newDeviceAllocator(ctx, n) + require.NotNil(d) + + // Build the request + ask := deviceRequest("gpu", 1, nil, nil) + + out, score, err := d.AssignDevice(ask) + require.NotNil(out) + require.Zero(score) + require.NoError(err) + + // Check that we got the nvidia device + require.Len(out.DeviceIDs, 1) + require.Contains(collectInstanceIDs(n.NodeResources.Devices[0]), out.DeviceIDs[0]) +} + +// Test that asking for a device that is fully specified works. +func TestDeviceAllocator_Allocate_FullyQualifiedRequest(t *testing.T) { + require := require.New(t) + _, ctx := testContext(t) + n := devNode() + d := newDeviceAllocator(ctx, n) + require.NotNil(d) + + // Build the request + ask := deviceRequest("intel/fpga/F100", 1, nil, nil) + + out, score, err := d.AssignDevice(ask) + require.NotNil(out) + require.Zero(score) + require.NoError(err) + + // Check that we got the nvidia device + require.Len(out.DeviceIDs, 1) + require.Contains(collectInstanceIDs(n.NodeResources.Devices[1]), out.DeviceIDs[0]) +} + +// Test that asking for a device with too much count doesn't place +func TestDeviceAllocator_Allocate_NotEnoughInstances(t *testing.T) { + require := require.New(t) + _, ctx := testContext(t) + n := devNode() + d := newDeviceAllocator(ctx, n) + require.NotNil(d) + + // Build the request + ask := deviceRequest("gpu", 4, nil, nil) + + out, _, err := d.AssignDevice(ask) + require.Nil(out) + require.Error(err) + require.Contains(err.Error(), "no devices match request") +} + +// Test that asking for a device with constraints works +func TestDeviceAllocator_Allocate_Constraints(t *testing.T) { + n := multipleNvidiaNode() + nvidia0 := n.NodeResources.Devices[0] + nvidia1 := n.NodeResources.Devices[1] + + cases := []struct { + Name string + Constraints []*structs.Constraint + ExpectedDevice *structs.NodeDeviceResource + NoPlacement bool + }{ + { + Name: "gpu", + Constraints: []*structs.Constraint{ + { + LTarget: "${driver.attr.cuda_cores}", + Operand: ">", + RTarget: "4000", + }, + }, + ExpectedDevice: nvidia1, + }, + { + Name: "gpu", + Constraints: []*structs.Constraint{ + { + LTarget: "${driver.attr.cuda_cores}", + Operand: "<", + RTarget: "4000", + }, + }, + ExpectedDevice: nvidia0, + }, + { + Name: "nvidia/gpu", + Constraints: []*structs.Constraint{ + // First two are shared across both devices + { + LTarget: "${driver.attr.memory_bandwidth}", + Operand: ">", + RTarget: "10 GB/s", + }, + { + LTarget: "${driver.attr.memory}", + Operand: "is", + RTarget: "11264 MiB", + }, + { + LTarget: "${driver.attr.graphics_clock}", + Operand: ">", + RTarget: "1.4 GHz", + }, + }, + ExpectedDevice: nvidia0, + }, + { + Name: "intel/gpu", + NoPlacement: true, + }, + { + Name: "nvidia/gpu", + Constraints: []*structs.Constraint{ + { + LTarget: "${driver.attr.memory_bandwidth}", + Operand: ">", + RTarget: "10 GB/s", + }, + { + LTarget: "${driver.attr.memory}", + Operand: "is", + RTarget: "11264 MiB", + }, + // Rules both out + { + LTarget: "${driver.attr.graphics_clock}", + Operand: ">", + RTarget: "2.4 GHz", + }, + }, + NoPlacement: true, + }, + } + + for _, c := range cases { + t.Run(c.Name, func(t *testing.T) { + require := require.New(t) + _, ctx := testContext(t) + d := newDeviceAllocator(ctx, n) + require.NotNil(d) + + // Build the request + ask := deviceRequest(c.Name, 1, c.Constraints, nil) + + out, score, err := d.AssignDevice(ask) + if c.NoPlacement { + require.Nil(out) + } else { + require.NotNil(out) + require.Zero(score) + require.NoError(err) + + // Check that we got the nvidia device + require.Len(out.DeviceIDs, 1) + require.Contains(collectInstanceIDs(c.ExpectedDevice), out.DeviceIDs[0]) + } + }) + } +} + +// Test that asking for a device with affinities works +func TestDeviceAllocator_Allocate_Affinities(t *testing.T) { + n := multipleNvidiaNode() + nvidia0 := n.NodeResources.Devices[0] + nvidia1 := n.NodeResources.Devices[1] + + cases := []struct { + Name string + Affinities []*structs.Affinity + ExpectedDevice *structs.NodeDeviceResource + ZeroScore bool + }{ + { + Name: "gpu", + Affinities: []*structs.Affinity{ + { + LTarget: "${driver.attr.cuda_cores}", + Operand: ">", + RTarget: "4000", + Weight: 0.6, + }, + }, + ExpectedDevice: nvidia1, + }, + { + Name: "gpu", + Affinities: []*structs.Affinity{ + { + LTarget: "${driver.attr.cuda_cores}", + Operand: "<", + RTarget: "4000", + Weight: 0.1, + }, + }, + ExpectedDevice: nvidia0, + }, + { + Name: "gpu", + Affinities: []*structs.Affinity{ + { + LTarget: "${driver.attr.cuda_cores}", + Operand: ">", + RTarget: "4000", + Weight: -0.2, + }, + }, + ExpectedDevice: nvidia0, + ZeroScore: true, + }, + { + Name: "nvidia/gpu", + Affinities: []*structs.Affinity{ + // First two are shared across both devices + { + LTarget: "${driver.attr.memory_bandwidth}", + Operand: ">", + RTarget: "10 GB/s", + Weight: 0.2, + }, + { + LTarget: "${driver.attr.memory}", + Operand: "is", + RTarget: "11264 MiB", + Weight: 0.2, + }, + { + LTarget: "${driver.attr.graphics_clock}", + Operand: ">", + RTarget: "1.4 GHz", + Weight: 0.9, + }, + }, + ExpectedDevice: nvidia0, + }, + } + + for _, c := range cases { + t.Run(c.Name, func(t *testing.T) { + require := require.New(t) + _, ctx := testContext(t) + d := newDeviceAllocator(ctx, n) + require.NotNil(d) + + // Build the request + ask := deviceRequest(c.Name, 1, nil, c.Affinities) + + out, score, err := d.AssignDevice(ask) + require.NotNil(out) + require.NoError(err) + if c.ZeroScore { + require.Zero(score) + } else { + require.NotZero(score) + } + + // Check that we got the nvidia device + require.Len(out.DeviceIDs, 1) + require.Contains(collectInstanceIDs(c.ExpectedDevice), out.DeviceIDs[0]) + }) + } +} diff --git a/scheduler/feasible.go b/scheduler/feasible.go index aa37e0f3b..872d5a81c 100644 --- a/scheduler/feasible.go +++ b/scheduler/feasible.go @@ -7,10 +7,9 @@ import ( "strconv" "strings" - psstructs "github.com/hashicorp/nomad/plugins/shared/structs" - "github.com/hashicorp/go-version" "github.com/hashicorp/nomad/nomad/structs" + psstructs "github.com/hashicorp/nomad/plugins/shared/structs" ) // FeasibleIterator is used to iteratively yield nodes that @@ -481,14 +480,12 @@ func checkConstraint(ctx Context, operand string, lVal, rVal interface{}) bool { // checkAffinity checks if a specific affinity is satisfied func checkAffinity(ctx Context, operand string, lVal, rVal interface{}) bool { - switch operand { - case structs.ConstraintSetContainsAny: - return checkSetContainsAny(lVal, rVal) - case structs.ConstraintSetContainsAll, structs.ConstraintSetContains: - return checkSetContainsAll(ctx, lVal, rVal) - default: - return checkConstraint(ctx, operand, lVal, rVal) - } + return checkConstraint(ctx, operand, lVal, rVal) +} + +// checkAttributeAffinity checks if an affinity is satisfied +func checkAttributeAffinity(ctx Context, operand string, lVal, rVal *psstructs.Attribute) bool { + return checkAttributeConstraint(ctx, operand, lVal, rVal) } // checkLexicalOrder is used to check for lexical ordering @@ -896,26 +893,24 @@ OUTER: continue } + // First check we have enough instances of the device since this is + // cheaper than checking the constraints + if unused < desiredCount { + continue + } + + // Check the constriants if nodeDeviceMatches(c.ctx, d, req) { // Consume the instances - if unused >= desiredCount { - // This device satisfies all our requests - available[d] -= desiredCount + available[d] -= desiredCount - // Move on to the next request - continue OUTER - } else { - // This device partially satisfies our requests - available[d] = 0 - desiredCount -= unused - } + // Move on to the next request + continue OUTER } } // We couldn't match the request for the device - if desiredCount > 0 { - return false - } + return false } // Only satisfied if there are no more devices to place diff --git a/scheduler/feasible_test.go b/scheduler/feasible_test.go index d1d5f6160..abbdcb484 100644 --- a/scheduler/feasible_test.go +++ b/scheduler/feasible_test.go @@ -1714,22 +1714,6 @@ func TestDeviceChecker(t *testing.T) { }, } - intel := &structs.NodeDeviceResource{ - Vendor: "intel", - Type: "gpu", - Name: "GT640", - Instances: []*structs.NodeDevice{ - { - ID: uuid.Generate(), - Healthy: true, - }, - { - ID: uuid.Generate(), - Healthy: false, - }, - }, - } - cases := []struct { Name string Result bool @@ -1796,12 +1780,6 @@ func TestDeviceChecker(t *testing.T) { NodeDevices: []*structs.NodeDeviceResource{nvidia}, RequestedDevices: []*structs.RequestedDevice{gpuTypeHighCountReq}, }, - { - Name: "request split over groups", - Result: true, - NodeDevices: []*structs.NodeDeviceResource{nvidia, intel}, - RequestedDevices: []*structs.RequestedDevice{gpuTypeHighCountReq}, - }, { Name: "meets constraints requirement", Result: true, diff --git a/scheduler/rank.go b/scheduler/rank.go index 5e7ff8a3c..e8ee27ae8 100644 --- a/scheduler/rank.go +++ b/scheduler/rank.go @@ -191,6 +191,14 @@ OUTER: netIdx.SetNode(option.Node) netIdx.AddAllocs(proposed) + // Create a device allocator + devAllocator := newDeviceAllocator(iter.ctx, option.Node) + devAllocator.AddAllocs(proposed) + + // Track the affinities of the devices + totalDeviceAffinityWeight := 0.0 + deviceAffinityScore := 0.0 + // Assign the resources for each task total := &structs.AllocatedResources{ Tasks: make(map[string]*structs.AllocatedTaskResources, @@ -273,6 +281,27 @@ OUTER: taskResources.Networks = []*structs.NetworkResource{offer} } + // Check if we need to assign devices + for _, req := range task.Resources.Devices { + offer, score, err := devAllocator.AssignDevice(req) + if offer == nil { + iter.ctx.Metrics().ExhaustedNode(option.Node, fmt.Sprintf("devices: %s", err)) + continue OUTER + } + + // Store the resource + devAllocator.AddReserved(offer) + taskResources.Devices = append(taskResources.Devices, offer) + + // Add the scores + if len(req.Affinities) != 0 { + for _, a := range req.Affinities { + totalDeviceAffinityWeight += a.Weight + } + deviceAffinityScore += score + } + } + // Store the task resource option.SetTaskResources(task, taskResources) @@ -286,8 +315,8 @@ OUTER: // Add the resources we are trying to fit proposed = append(proposed, &structs.Allocation{AllocatedResources: total}) - // Check if these allocations fit - fit, dim, util, _ := structs.AllocsFit(option.Node, proposed, netIdx) + // Check if these allocations fit, if they do not, simply skip this node + fit, dim, util, _ := structs.AllocsFit(option.Node, proposed, netIdx, false) netIdx.Release() if !fit { // Skip the node if evictions are not enabled @@ -321,6 +350,14 @@ OUTER: normalizedFit := fitness / binPackingMaxFitScore option.Scores = append(option.Scores, normalizedFit) iter.ctx.Metrics().ScoreNode(option.Node, "binpack", normalizedFit) + + // Score the device affinity + if totalDeviceAffinityWeight != 0 { + deviceAffinityScore /= totalDeviceAffinityWeight + option.Scores = append(option.Scores, deviceAffinityScore) + iter.ctx.Metrics().ScoreNode(option.Node, "devices", deviceAffinityScore) + } + return option } } diff --git a/scheduler/rank_test.go b/scheduler/rank_test.go index b60d7716e..61e3dee7a 100644 --- a/scheduler/rank_test.go +++ b/scheduler/rank_test.go @@ -467,6 +467,329 @@ func TestBinPackIterator_ExistingAlloc_PlannedEvict(t *testing.T) { } } +// This is a fairly high level test that asserts the bin packer uses the device +// allocator properly. It is not intended to handle every possible device +// request versus availability scenario. That should be covered in device +// allocator tests. +func TestBinPackIterator_Devices(t *testing.T) { + nvidiaNode := mock.NvidiaNode() + devs := nvidiaNode.NodeResources.Devices[0].Instances + nvidiaDevices := []string{devs[0].ID, devs[1].ID} + + nvidiaDev0 := mock.Alloc() + nvidiaDev0.AllocatedResources.Tasks["web"].Devices = []*structs.AllocatedDeviceResource{ + { + Type: "gpu", + Vendor: "nvidia", + Name: "1080ti", + DeviceIDs: []string{nvidiaDevices[0]}, + }, + } + + type devPlacementTuple struct { + Count int + ExcludeIDs []string + } + + cases := []struct { + Name string + Node *structs.Node + PlannedAllocs []*structs.Allocation + ExistingAllocs []*structs.Allocation + TaskGroup *structs.TaskGroup + NoPlace bool + ExpectedPlacements map[string]map[structs.DeviceIdTuple]devPlacementTuple + DeviceScore float64 + }{ + { + Name: "single request, match", + Node: nvidiaNode, + TaskGroup: &structs.TaskGroup{ + EphemeralDisk: &structs.EphemeralDisk{}, + Tasks: []*structs.Task{ + { + Name: "web", + Resources: &structs.Resources{ + CPU: 1024, + MemoryMB: 1024, + Devices: []*structs.RequestedDevice{ + { + Name: "nvidia/gpu", + Count: 1, + }, + }, + }, + }, + }, + }, + ExpectedPlacements: map[string]map[structs.DeviceIdTuple]devPlacementTuple{ + "web": map[structs.DeviceIdTuple]devPlacementTuple{ + { + Vendor: "nvidia", + Type: "gpu", + Name: "1080ti", + }: { + Count: 1, + }, + }, + }, + }, + { + Name: "single request multiple count, match", + Node: nvidiaNode, + TaskGroup: &structs.TaskGroup{ + EphemeralDisk: &structs.EphemeralDisk{}, + Tasks: []*structs.Task{ + { + Name: "web", + Resources: &structs.Resources{ + CPU: 1024, + MemoryMB: 1024, + Devices: []*structs.RequestedDevice{ + { + Name: "nvidia/gpu", + Count: 2, + }, + }, + }, + }, + }, + }, + ExpectedPlacements: map[string]map[structs.DeviceIdTuple]devPlacementTuple{ + "web": map[structs.DeviceIdTuple]devPlacementTuple{ + { + Vendor: "nvidia", + Type: "gpu", + Name: "1080ti", + }: { + Count: 2, + }, + }, + }, + }, + { + Name: "single request, with affinities", + Node: nvidiaNode, + TaskGroup: &structs.TaskGroup{ + EphemeralDisk: &structs.EphemeralDisk{}, + Tasks: []*structs.Task{ + { + Name: "web", + Resources: &structs.Resources{ + CPU: 1024, + MemoryMB: 1024, + Devices: []*structs.RequestedDevice{ + { + Name: "nvidia/gpu", + Count: 1, + Affinities: []*structs.Affinity{ + { + LTarget: "${driver.attr.graphics_clock}", + Operand: ">", + RTarget: "1.4 GHz", + Weight: 0.9, + }, + }, + }, + }, + }, + }, + }, + }, + ExpectedPlacements: map[string]map[structs.DeviceIdTuple]devPlacementTuple{ + "web": map[structs.DeviceIdTuple]devPlacementTuple{ + { + Vendor: "nvidia", + Type: "gpu", + Name: "1080ti", + }: { + Count: 1, + }, + }, + }, + DeviceScore: 0.9, + }, + { + Name: "single request over count, no match", + Node: nvidiaNode, + TaskGroup: &structs.TaskGroup{ + EphemeralDisk: &structs.EphemeralDisk{}, + Tasks: []*structs.Task{ + { + Name: "web", + Resources: &structs.Resources{ + CPU: 1024, + MemoryMB: 1024, + Devices: []*structs.RequestedDevice{ + { + Name: "nvidia/gpu", + Count: 6, + }, + }, + }, + }, + }, + }, + NoPlace: true, + }, + { + Name: "single request no device of matching type", + Node: nvidiaNode, + TaskGroup: &structs.TaskGroup{ + EphemeralDisk: &structs.EphemeralDisk{}, + Tasks: []*structs.Task{ + { + Name: "web", + Resources: &structs.Resources{ + CPU: 1024, + MemoryMB: 1024, + Devices: []*structs.RequestedDevice{ + { + Name: "fpga", + Count: 1, + }, + }, + }, + }, + }, + }, + NoPlace: true, + }, + { + Name: "single request with previous uses", + Node: nvidiaNode, + TaskGroup: &structs.TaskGroup{ + EphemeralDisk: &structs.EphemeralDisk{}, + Tasks: []*structs.Task{ + { + Name: "web", + Resources: &structs.Resources{ + CPU: 1024, + MemoryMB: 1024, + Devices: []*structs.RequestedDevice{ + { + Name: "nvidia/gpu", + Count: 1, + }, + }, + }, + }, + }, + }, + ExpectedPlacements: map[string]map[structs.DeviceIdTuple]devPlacementTuple{ + "web": map[structs.DeviceIdTuple]devPlacementTuple{ + { + Vendor: "nvidia", + Type: "gpu", + Name: "1080ti", + }: { + Count: 1, + ExcludeIDs: []string{nvidiaDevices[0]}, + }, + }, + }, + ExistingAllocs: []*structs.Allocation{nvidiaDev0}, + }, + { + Name: "single request with planned uses", + Node: nvidiaNode, + TaskGroup: &structs.TaskGroup{ + EphemeralDisk: &structs.EphemeralDisk{}, + Tasks: []*structs.Task{ + { + Name: "web", + Resources: &structs.Resources{ + CPU: 1024, + MemoryMB: 1024, + Devices: []*structs.RequestedDevice{ + { + Name: "nvidia/gpu", + Count: 1, + }, + }, + }, + }, + }, + }, + ExpectedPlacements: map[string]map[structs.DeviceIdTuple]devPlacementTuple{ + "web": map[structs.DeviceIdTuple]devPlacementTuple{ + { + Vendor: "nvidia", + Type: "gpu", + Name: "1080ti", + }: { + Count: 1, + ExcludeIDs: []string{nvidiaDevices[0]}, + }, + }, + }, + PlannedAllocs: []*structs.Allocation{nvidiaDev0}, + }, + } + + for _, c := range cases { + t.Run(c.Name, func(t *testing.T) { + require := require.New(t) + + // Setup the context + state, ctx := testContext(t) + + // Add the planned allocs + if len(c.PlannedAllocs) != 0 { + for _, alloc := range c.PlannedAllocs { + alloc.NodeID = c.Node.ID + } + plan := ctx.Plan() + plan.NodeAllocation[c.Node.ID] = c.PlannedAllocs + } + + // Add the existing allocs + if len(c.ExistingAllocs) != 0 { + for _, alloc := range c.ExistingAllocs { + alloc.NodeID = c.Node.ID + } + require.NoError(state.UpsertAllocs(1000, c.ExistingAllocs)) + } + + static := NewStaticRankIterator(ctx, []*RankedNode{&RankedNode{Node: c.Node}}) + binp := NewBinPackIterator(ctx, static, false, 0) + binp.SetTaskGroup(c.TaskGroup) + + out := binp.Next() + if out == nil && !c.NoPlace { + t.Fatalf("expected placement") + } + + // Check we got the placements we are expecting + for tname, devices := range c.ExpectedPlacements { + tr, ok := out.TaskResources[tname] + require.True(ok) + + want := len(devices) + got := 0 + for _, placed := range tr.Devices { + got++ + + expected, ok := devices[*placed.ID()] + require.True(ok) + require.Equal(expected.Count, len(placed.DeviceIDs)) + for _, id := range expected.ExcludeIDs { + require.NotContains(placed.DeviceIDs, id) + } + } + + require.Equal(want, got) + } + + // Check potential affinity scores + if c.DeviceScore != 0.0 { + require.Len(out.Scores, 2) + require.Equal(out.Scores[1], c.DeviceScore) + } + }) + } +} + func TestJobAntiAffinity_PlannedAlloc(t *testing.T) { _, ctx := testContext(t) nodes := []*RankedNode{