Merge pull request #4783 from hashicorp/f-device-scheduling

Device constraint checking
This commit is contained in:
Alex Dadgar 2018-10-15 15:31:32 -07:00 committed by GitHub
commit 06719d0b5a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 1293 additions and 131 deletions

View file

@ -655,8 +655,8 @@ func parseAffinities(result *[]*api.Affinity, list *ast.ObjectList) error {
// If "set_contains_any" is provided, set the operand
// to "set_contains_any" and the value to the "RTarget"
if affinity, ok := m[structs.ConstraintSetContaintsAny]; ok {
m["Operand"] = structs.ConstraintSetContaintsAny
if affinity, ok := m[structs.ConstraintSetContainsAny]; ok {
m["Operand"] = structs.ConstraintSetContainsAny
m["RTarget"] = affinity
}

View file

@ -27,9 +27,13 @@ import (
"container/heap"
"math"
hcodec "github.com/hashicorp/go-msgpack/codec"
multierror "github.com/hashicorp/go-multierror"
psstructs "github.com/hashicorp/nomad/plugins/shared/structs"
"github.com/gorhill/cronexpr"
"github.com/hashicorp/consul/api"
multierror "github.com/hashicorp/go-multierror"
"github.com/hashicorp/go-version"
"github.com/hashicorp/nomad/acl"
"github.com/hashicorp/nomad/helper"
@ -38,8 +42,6 @@ import (
"github.com/hashicorp/nomad/lib/kheap"
"github.com/mitchellh/copystructure"
"github.com/ugorji/go/codec"
hcodec "github.com/hashicorp/go-msgpack/codec"
)
var (
@ -1721,6 +1723,26 @@ func (r *Resources) DiskInBytes() int64 {
return int64(r.DiskMB * BytesInMegabyte)
}
func (r *Resources) Validate() error {
var mErr multierror.Error
if err := r.MeetsMinResources(); err != nil {
mErr.Errors = append(mErr.Errors, err)
}
// Ensure the task isn't asking for disk resources
if r.DiskMB > 0 {
mErr.Errors = append(mErr.Errors, errors.New("Task can't ask for disk resources, they have to be specified at the task group level."))
}
for i, d := range r.Devices {
if err := d.Validate(); err != nil {
mErr.Errors = append(mErr.Errors, fmt.Errorf("device %d failed validation: %v", i+1, err))
}
}
return mErr.ErrorOrNil()
}
// Merge merges this resource with another resource.
func (r *Resources) Merge(other *Resources) {
if other.CPU != 0 {
@ -2033,7 +2055,6 @@ type RequestedDevice struct {
// Count is the number of requested devices
Count uint64
// TODO validate
// Constraints are a set of constraints to apply when selecting the device
// to use.
Constraints []*Constraint
@ -2055,20 +2076,82 @@ func (r *RequestedDevice) Copy() *RequestedDevice {
return &nr
}
func (r *RequestedDevice) ID() *DeviceIdTuple {
if r == nil || r.Name == "" {
return nil
}
parts := strings.SplitN(r.Name, "/", 3)
switch len(parts) {
case 1:
return &DeviceIdTuple{
Type: parts[0],
}
case 2:
return &DeviceIdTuple{
Vendor: parts[0],
Type: parts[1],
}
default:
return &DeviceIdTuple{
Vendor: parts[0],
Type: parts[1],
Name: parts[2],
}
}
}
func (r *RequestedDevice) Validate() error {
if r == nil {
return nil
}
var mErr multierror.Error
if r.Name == "" {
multierror.Append(&mErr, errors.New("device name must be given as one of the following: type, vendor/type, or vendor/type/name"))
}
for idx, constr := range r.Constraints {
// Ensure that the constraint doesn't use an operand we do not allow
switch constr.Operand {
case ConstraintDistinctHosts, ConstraintDistinctProperty:
outer := fmt.Errorf("Constraint %d validation failed: using unsupported operand %q", idx+1, constr.Operand)
multierror.Append(&mErr, outer)
default:
if err := constr.Validate(); err != nil {
outer := fmt.Errorf("Constraint %d validation failed: %s", idx+1, err)
multierror.Append(&mErr, outer)
}
}
}
for idx, affinity := range r.Affinities {
if err := affinity.Validate(); err != nil {
outer := fmt.Errorf("Affinity %d validation failed: %s", idx+1, err)
multierror.Append(&mErr, outer)
}
}
return mErr.ErrorOrNil()
}
// NodeResources is used to define the resources available on a client node.
type NodeResources struct {
Cpu NodeCpuResources
Memory NodeMemoryResources
Disk NodeDiskResources
Networks Networks
Devices []*NodeDeviceResource
}
func (n *NodeResources) Copy() *NodeResources {
if n == nil {
return nil
}
newN := new(NodeResources)
*newN = *n
// Copy the networks
if n.Networks != nil {
networks := len(n.Networks)
newN.Networks = make([]*NetworkResource, networks)
@ -2076,6 +2159,16 @@ func (n *NodeResources) Copy() *NodeResources {
newN.Networks[i] = n.Networks[i].Copy()
}
}
// Copy the devices
if n.Devices != nil {
devices := len(n.Devices)
newN.Devices = make([]*NodeDeviceResource, devices)
for i := 0; i < devices; i++ {
newN.Devices[i] = n.Devices[i].Copy()
}
}
return newN
}
@ -2115,6 +2208,10 @@ func (n *NodeResources) Merge(o *NodeResources) {
if len(o.Networks) != 0 {
n.Networks = o.Networks
}
if len(o.Devices) != 0 {
n.Devices = o.Devices
}
}
func (n *NodeResources) Equals(o *NodeResources) bool {
@ -2145,6 +2242,20 @@ func (n *NodeResources) Equals(o *NodeResources) bool {
}
}
// Check the devices
if len(n.Devices) != len(o.Devices) {
return false
}
idMap := make(map[DeviceIdTuple]*NodeDeviceResource, len(n.Devices))
for _, d := range n.Devices {
idMap[*d.ID()] = d
}
for _, otherD := range o.Devices {
if d, ok := idMap[*otherD.ID()]; !ok || !d.Equals(otherD) {
return false
}
}
return true
}
@ -2244,6 +2355,208 @@ func (n *NodeDiskResources) Equals(o *NodeDiskResources) bool {
return true
}
// DeviceIdTuple is the tuple that identifies a device
type DeviceIdTuple struct {
Vendor string
Type string
Name string
}
// Matches returns if this Device ID is a superset of the passed ID.
func (id *DeviceIdTuple) Matches(other *DeviceIdTuple) bool {
if other == nil {
return false
}
if other.Name != "" && other.Name != id.Name {
return false
}
if other.Vendor != "" && other.Vendor != id.Vendor {
return false
}
if other.Type != "" && other.Type != id.Type {
return false
}
return true
}
// NodeDeviceResource captures a set of devices sharing a common
// vendor/type/device_name tuple.
type NodeDeviceResource struct {
Vendor string
Type string
Name string
Instances []*NodeDevice
Attributes map[string]*psstructs.Attribute
}
func (n *NodeDeviceResource) ID() *DeviceIdTuple {
if n == nil {
return nil
}
return &DeviceIdTuple{
Vendor: n.Vendor,
Type: n.Type,
Name: n.Name,
}
}
func (n *NodeDeviceResource) Copy() *NodeDeviceResource {
if n == nil {
return nil
}
// Copy the primitives
nn := *n
// Copy the device instances
if l := len(nn.Instances); l != 0 {
nn.Instances = make([]*NodeDevice, 0, l)
for _, d := range n.Instances {
nn.Instances = append(nn.Instances, d.Copy())
}
}
// Copy the Attributes
nn.Attributes = psstructs.CopyMapStringAttribute(nn.Attributes)
return &nn
}
func (n *NodeDeviceResource) Equals(o *NodeDeviceResource) bool {
if o == nil && n == nil {
return true
} else if o == nil {
return false
} else if n == nil {
return false
}
if n.Vendor != o.Vendor {
return false
} else if n.Type != o.Type {
return false
} else if n.Name != o.Name {
return false
}
// Check the attributes
if len(n.Attributes) != len(o.Attributes) {
return false
}
for k, v := range n.Attributes {
if otherV, ok := o.Attributes[k]; !ok || v != otherV {
return false
}
}
// Check the instances
if len(n.Instances) != len(o.Instances) {
return false
}
idMap := make(map[string]*NodeDevice, len(n.Instances))
for _, d := range n.Instances {
idMap[d.ID] = d
}
for _, otherD := range o.Instances {
if d, ok := idMap[otherD.ID]; !ok || !d.Equals(otherD) {
return false
}
}
return true
}
// NodeDevice is an instance of a particular device.
type NodeDevice struct {
// ID is the ID of the device.
ID string
// Healthy captures whether the device is healthy.
Healthy bool
// HealthDescription is used to provide a human readable description of why
// the device may be unhealthy.
HealthDescription string
// Locality stores HW locality information for the node to optionally be
// used when making placement decisions.
Locality *NodeDeviceLocality
}
func (n *NodeDevice) Equals(o *NodeDevice) bool {
if o == nil && n == nil {
return true
} else if o == nil {
return false
} else if n == nil {
return false
}
if n.ID != o.ID {
return false
} else if n.Healthy != o.Healthy {
return false
} else if n.HealthDescription != o.HealthDescription {
return false
} else if !n.Locality.Equals(o.Locality) {
return false
}
return false
}
func (n *NodeDevice) Copy() *NodeDevice {
if n == nil {
return nil
}
// Copy the primitives
nn := *n
// Copy the locality
nn.Locality = nn.Locality.Copy()
return &nn
}
// NodeDeviceLocality stores information about the devices hardware locality on
// the node.
type NodeDeviceLocality struct {
// PciBusID is the PCI Bus ID for the device.
PciBusID string
}
func (n *NodeDeviceLocality) Equals(o *NodeDeviceLocality) bool {
if o == nil && n == nil {
return true
} else if o == nil {
return false
} else if n == nil {
return false
}
if n.PciBusID != o.PciBusID {
return false
}
return true
}
func (n *NodeDeviceLocality) Copy() *NodeDeviceLocality {
if n == nil {
return nil
}
// Copy the primitives
nn := *n
return &nn
}
// NodeReservedResources is used to capture the resources on a client node that
// should be reserved and not made available to jobs.
type NodeReservedResources struct {
@ -4783,15 +5096,8 @@ func (t *Task) Validate(ephemeralDisk *EphemeralDisk, jobType string) error {
// Validate the resources.
if t.Resources == nil {
mErr.Errors = append(mErr.Errors, errors.New("Missing task resources"))
} else {
if err := t.Resources.MeetsMinResources(); err != nil {
mErr.Errors = append(mErr.Errors, err)
}
// Ensure the task isn't asking for disk resources
if t.Resources.DiskMB > 0 {
mErr.Errors = append(mErr.Errors, errors.New("Task can't ask for disk resources, they have to be specified at the task group level."))
}
} else if err := t.Resources.Validate(); err != nil {
mErr.Errors = append(mErr.Errors, err)
}
// Validate the log config
@ -5804,7 +6110,7 @@ const (
ConstraintVersion = "version"
ConstraintSetContains = "set_contains"
ConstraintSetContainsAll = "set_contains_all"
ConstraintSetContaintsAny = "set_contains_any"
ConstraintSetContainsAny = "set_contains_any"
)
// Constraints are used to restrict placement options.
@ -5853,7 +6159,7 @@ func (c *Constraint) Validate() error {
switch c.Operand {
case ConstraintDistinctHosts:
requireLtarget = false
case ConstraintSetContains:
case ConstraintSetContainsAll, ConstraintSetContainsAny, ConstraintSetContains:
if c.RTarget == "" {
mErr.Errors = append(mErr.Errors, fmt.Errorf("Set contains constraint requires an RTarget"))
}
@ -5933,7 +6239,7 @@ func (a *Affinity) Validate() error {
// Perform additional validation based on operand
switch a.Operand {
case ConstraintSetContainsAll, ConstraintSetContaintsAny, ConstraintSetContains:
case ConstraintSetContainsAll, ConstraintSetContainsAny, ConstraintSetContains:
if a.RTarget == "" {
mErr.Errors = append(mErr.Errors, fmt.Errorf("Set contains operators require an RTarget"))
}

View file

@ -1571,13 +1571,15 @@ func TestConstraint_Validate(t *testing.T) {
t.Fatalf("expected valid constraint: %v", err)
}
// Perform set_contains validation
c.Operand = ConstraintSetContains
// Perform set_contains* validation
c.RTarget = ""
err = c.Validate()
mErr = err.(*multierror.Error)
if !strings.Contains(mErr.Errors[0].Error(), "requires an RTarget") {
t.Fatalf("err: %s", err)
for _, o := range []string{ConstraintSetContains, ConstraintSetContainsAll, ConstraintSetContainsAny} {
c.Operand = o
err = c.Validate()
mErr = err.(*multierror.Error)
if !strings.Contains(mErr.Errors[0].Error(), "requires an RTarget") {
t.Fatalf("err: %s", err)
}
}
// Perform LTarget validation

View file

@ -53,6 +53,53 @@ func (u *Unit) Comparable(o *Unit) bool {
return u.Base == o.Base
}
// ParseAttribute takes a string and parses it into an attribute, pulling out
// units if they are specified as a suffix on a number.
func ParseAttribute(input string) *Attribute {
ll := len(input)
if ll == 0 {
return &Attribute{String: helper.StringToPtr(input)}
}
// Check if the string is a number ending with potential units
var unit string
numeric := input
if unicode.IsLetter(rune(input[ll-1])) {
// Try suffix matching
for _, u := range lengthSortedUnits {
if strings.HasSuffix(input, u) {
unit = u
break
}
}
// Check if we know about the unit.
if len(unit) != 0 {
numeric = strings.TrimSpace(strings.TrimSuffix(input, unit))
}
}
// Try to parse as an int
i, err := strconv.ParseInt(numeric, 10, 64)
if err == nil {
return &Attribute{Int: helper.Int64ToPtr(i), Unit: unit}
}
// Try to parse as a float
f, err := strconv.ParseFloat(numeric, 64)
if err == nil {
return &Attribute{Float: helper.Float64ToPtr(f), Unit: unit}
}
// Try to parse as a bool
b, err := strconv.ParseBool(input)
if err == nil {
return &Attribute{Bool: helper.BoolToPtr(b)}
}
return &Attribute{String: helper.StringToPtr(input)}
}
// Attribute is used to describe the value of an attribute, optionally
// specifying units
type Attribute struct {
@ -72,6 +119,104 @@ type Attribute struct {
Unit string
}
// NewStringAttribute returns a new string attribute.
func NewStringAttribute(s string) *Attribute {
return &Attribute{
String: helper.StringToPtr(s),
}
}
// NewBoolAttribute returns a new boolean attribute.
func NewBoolAttribute(b bool) *Attribute {
return &Attribute{
Bool: helper.BoolToPtr(b),
}
}
// NewIntergerAttribute returns a new integer attribute. The unit is not checked
// to be valid.
func NewIntAttribute(i int64, unit string) *Attribute {
return &Attribute{
Int: helper.Int64ToPtr(i),
Unit: unit,
}
}
// NewFloatAttribute returns a new float attribute. The unit is not checked to
// be valid.
func NewFloatAttribute(f float64, unit string) *Attribute {
return &Attribute{
Float: helper.Float64ToPtr(f),
Unit: unit,
}
}
// GetString returns the string value of the attribute or false if the attribute
// doesn't contain a string.
func (a *Attribute) GetString() (value string, ok bool) {
if a.String == nil {
return "", false
}
return *a.String, true
}
// GetBool returns the boolean value of the attribute or false if the attribute
// doesn't contain a boolean.
func (a *Attribute) GetBool() (value bool, ok bool) {
if a.Bool == nil {
return false, false
}
return *a.Bool, true
}
// GetInt returns the integer value of the attribute or false if the attribute
// doesn't contain a integer.
func (a *Attribute) GetInt() (value int64, ok bool) {
if a.Int == nil {
return 0, false
}
return *a.Int, true
}
// GetFloat returns the float value of the attribute or false if the attribute
// doesn't contain a float.
func (a *Attribute) GetFloat() (value float64, ok bool) {
if a.Float == nil {
return 0.0, false
}
return *a.Float, true
}
// Copy returns a copied version of the attribute
func (a *Attribute) Copy() *Attribute {
if a == nil {
return nil
}
ca := &Attribute{
Unit: a.Unit,
}
if a.Float != nil {
ca.Float = helper.Float64ToPtr(*a.Float)
}
if a.Int != nil {
ca.Int = helper.Int64ToPtr(*a.Int)
}
if a.Bool != nil {
ca.Bool = helper.BoolToPtr(*a.Bool)
}
if a.String != nil {
ca.String = helper.StringToPtr(*a.String)
}
return ca
}
// GoString returns a string representation of the attribute
func (a *Attribute) GoString() string {
if a == nil {
@ -133,6 +278,39 @@ func (a *Attribute) Validate() error {
return nil
}
// Comparable returns whether the two attributes are comparable
func (a *Attribute) Comparable(b *Attribute) bool {
if a == nil || b == nil {
return false
}
// First use the units to decide if comparison is possible
aUnit := a.getTypedUnit()
bUnit := b.getTypedUnit()
if aUnit != nil && bUnit != nil {
return aUnit.Comparable(bUnit)
} else if aUnit != nil && bUnit == nil {
return false
} else if aUnit == nil && bUnit != nil {
return false
}
if a.String != nil {
if b.String != nil {
return true
}
return false
}
if a.Bool != nil {
if b.Bool != nil {
return true
}
return false
}
return true
}
// Compare compares two attributes. If the returned boolean value is false, it
// means the values are not comparable, either because they are of different
// types (bool versus int) or the units are incompatible for comparison.
@ -278,102 +456,7 @@ func (a *Attribute) getInt() int64 {
return i
}
// Comparable returns whether they are comparable
func (a *Attribute) Comparable(b *Attribute) bool {
if a == nil || b == nil {
return false
}
// First use the units to decide if comparison is possible
aUnit := a.getTypedUnit()
bUnit := b.getTypedUnit()
if aUnit != nil && bUnit != nil {
return aUnit.Comparable(bUnit)
} else if aUnit != nil && bUnit == nil {
return false
} else if aUnit == nil && bUnit != nil {
return false
}
if a.String != nil {
if b.String != nil {
return true
}
return false
}
if a.Bool != nil {
if b.Bool != nil {
return true
}
return false
}
return true
}
// getTypedUnit returns the Unit for the attribute or nil if no unit exists.
func (a *Attribute) getTypedUnit() *Unit {
return UnitIndex[a.Unit]
}
// ParseAttribute takes a string and parses it into an attribute, pulling out
// units if they are specified as a suffix on a number
func ParseAttribute(input string) *Attribute {
ll := len(input)
if ll == 0 {
return &Attribute{String: helper.StringToPtr(input)}
}
// Try to parse as a bool
b, err := strconv.ParseBool(input)
if err == nil {
return &Attribute{Bool: helper.BoolToPtr(b)}
}
// Check if the string is a number ending with potential units
if unicode.IsLetter(rune(input[ll-1])) {
// Try suffix matching
var unit string
for _, u := range lengthSortedUnits {
if strings.HasSuffix(input, u) {
unit = u
break
}
}
// Check if we know about the unit. If we don't we can only treat this
// as a string
if len(unit) == 0 {
return &Attribute{String: helper.StringToPtr(input)}
}
// Grab the numeric
numeric := strings.TrimSpace(strings.TrimSuffix(input, unit))
// Try to parse as an int
i, err := strconv.ParseInt(numeric, 10, 64)
if err == nil {
return &Attribute{Int: helper.Int64ToPtr(i), Unit: unit}
}
// Try to parse as a float
f, err := strconv.ParseFloat(numeric, 64)
if err == nil {
return &Attribute{Float: helper.Float64ToPtr(f), Unit: unit}
}
}
// Try to parse as an int
i, err := strconv.ParseInt(input, 10, 64)
if err == nil {
return &Attribute{Int: helper.Int64ToPtr(i)}
}
// Try to parse as a float
f, err := strconv.ParseFloat(input, 64)
if err == nil {
return &Attribute{Float: helper.Float64ToPtr(f)}
}
return &Attribute{String: helper.StringToPtr(input)}
}

View file

@ -558,6 +558,12 @@ func TestAttribute_ParseAndValidate(t *testing.T) {
Bool: helper.BoolToPtr(false),
},
},
{
Input: "1",
Expected: &Attribute{
Int: helper.Int64ToPtr(1),
},
},
{
Input: "100",
Expected: &Attribute{

View file

@ -88,3 +88,16 @@ func Pow(a, b int64) int64 {
}
return p
}
// CopyMapStringAttribute copies a map of string to Attribute
func CopyMapStringAttribute(in map[string]*Attribute) map[string]*Attribute {
if in == nil {
return nil
}
out := make(map[string]*Attribute, len(in))
for k, v := range in {
out[k] = v.Copy()
}
return out
}

View file

@ -7,6 +7,8 @@ import (
"strconv"
"strings"
psstructs "github.com/hashicorp/nomad/plugins/shared/structs"
"github.com/hashicorp/go-version"
"github.com/hashicorp/nomad/nomad/structs"
)
@ -468,8 +470,10 @@ func checkConstraint(ctx Context, operand string, lVal, rVal interface{}) bool {
return checkVersionMatch(ctx, lVal, rVal)
case structs.ConstraintRegex:
return checkRegexpMatch(ctx, lVal, rVal)
case structs.ConstraintSetContains:
case structs.ConstraintSetContains, structs.ConstraintSetContainsAll:
return checkSetContainsAll(ctx, lVal, rVal)
case structs.ConstraintSetContainsAny:
return checkSetContainsAny(lVal, rVal)
default:
return false
}
@ -478,7 +482,7 @@ func checkConstraint(ctx Context, operand string, lVal, rVal interface{}) bool {
// checkAffinity checks if a specific affinity is satisfied
func checkAffinity(ctx Context, operand string, lVal, rVal interface{}) bool {
switch operand {
case structs.ConstraintSetContaintsAny:
case structs.ConstraintSetContainsAny:
return checkSetContainsAny(lVal, rVal)
case structs.ConstraintSetContainsAll, structs.ConstraintSetContains:
return checkSetContainsAll(ctx, lVal, rVal)
@ -556,6 +560,48 @@ func checkVersionMatch(ctx Context, lVal, rVal interface{}) bool {
return constraints.Check(vers)
}
// checkAttributeVersionMatch is used to compare a version on the
// left hand side with a set of constraints on the right hand side
func checkAttributeVersionMatch(ctx Context, lVal, rVal *psstructs.Attribute) bool {
// Parse the version
var versionStr string
if s, ok := lVal.GetString(); ok {
versionStr = s
} else if i, ok := lVal.GetInt(); ok {
versionStr = fmt.Sprintf("%d", i)
} else {
return false
}
// Parse the version
vers, err := version.NewVersion(versionStr)
if err != nil {
return false
}
// Constraint must be a string
constraintStr, ok := rVal.GetString()
if !ok {
return false
}
// Check the cache for a match
cache := ctx.VersionConstraintCache()
constraints := cache[constraintStr]
// Parse the constraints
if constraints == nil {
constraints, err = version.NewConstraint(constraintStr)
if err != nil {
return false
}
cache[constraintStr] = constraints
}
// Check the constraints against the version
return constraints.Check(vers)
}
// checkRegexpMatch is used to compare a value on the
// left hand side with a regexp on the right hand side
func checkRegexpMatch(ctx Context, lVal, rVal interface{}) bool {
@ -768,3 +814,238 @@ OUTER:
return option
}
}
// DeviceChecker is a FeasibilityChecker which returns whether a node has the
// devices necessary to scheduler a task group.
type DeviceChecker struct {
ctx Context
// required is the set of requested devices that must exist on the node
required []*structs.RequestedDevice
// requiresDevices indicates if the task group requires devices
requiresDevices bool
}
// NewDeviceChecker creates a DeviceChecker
func NewDeviceChecker(ctx Context) *DeviceChecker {
return &DeviceChecker{
ctx: ctx,
}
}
func (c *DeviceChecker) SetTaskGroup(tg *structs.TaskGroup) {
c.required = nil
for _, task := range tg.Tasks {
c.required = append(c.required, task.Resources.Devices...)
}
c.requiresDevices = len(c.required) != 0
}
func (c *DeviceChecker) Feasible(option *structs.Node) bool {
if c.hasDevices(option) {
return true
}
c.ctx.Metrics().FilterNode(option, "missing devices")
return false
}
func (c *DeviceChecker) hasDevices(option *structs.Node) bool {
if !c.requiresDevices {
return true
}
// COMPAT(0.11): Remove in 0.11
// The node does not have the new resources object so it can not have any
// devices
if option.NodeResources == nil {
return false
}
// Check if the node has any devices
nodeDevs := option.NodeResources.Devices
if len(nodeDevs) == 0 {
return false
}
// Create a mapping of node devices to the remaining count
available := make(map[*structs.NodeDeviceResource]uint64, len(nodeDevs))
for _, d := range nodeDevs {
var healthy uint64 = 0
for _, instance := range d.Instances {
if instance.Healthy {
healthy++
}
}
if healthy != 0 {
available[d] = healthy
}
}
// Go through the required devices trying to find matches
OUTER:
for _, req := range c.required {
// Determine how many there are to place
desiredCount := req.Count
// Go through the device resources and see if we have a match
for d, unused := range available {
if unused == 0 {
// Depleted
continue
}
if nodeDeviceMatches(c.ctx, d, req) {
// Consume the instances
if unused >= desiredCount {
// This device satisfies all our requests
available[d] -= desiredCount
// Move on to the next request
continue OUTER
} else {
// This device partially satisfies our requests
available[d] = 0
desiredCount -= unused
}
}
}
// We couldn't match the request for the device
if desiredCount > 0 {
return false
}
}
// Only satisfied if there are no more devices to place
return true
}
// nodeDeviceMatches checks if the device matches the request and its
// constraints. It doesn't check the count.
func nodeDeviceMatches(ctx Context, d *structs.NodeDeviceResource, req *structs.RequestedDevice) bool {
if !d.ID().Matches(req.ID()) {
return false
}
// There are no constraints to consider
if len(req.Constraints) == 0 {
return true
}
for _, c := range req.Constraints {
// Resolve the targets
lVal, ok := resolveDeviceTarget(c.LTarget, d)
if !ok {
return false
}
rVal, ok := resolveDeviceTarget(c.RTarget, d)
if !ok {
return false
}
// Check if satisfied
if !checkAttributeConstraint(ctx, c.Operand, lVal, rVal) {
return false
}
}
return true
}
// resolveDeviceTarget is used to resolve the LTarget and RTarget of a Constraint
// when used on a device
func resolveDeviceTarget(target string, d *structs.NodeDeviceResource) (*psstructs.Attribute, bool) {
// If no prefix, this must be a literal value
if !strings.HasPrefix(target, "${") {
return psstructs.ParseAttribute(target), true
}
// Handle the interpolations
switch {
case "${driver.model}" == target:
return psstructs.NewStringAttribute(d.Name), true
case "${driver.vendor}" == target:
return psstructs.NewStringAttribute(d.Vendor), true
case "${driver.type}" == target:
return psstructs.NewStringAttribute(d.Type), true
case strings.HasPrefix(target, "${driver.attr."):
attr := strings.TrimPrefix(target, "${driver.attr.")
attr = strings.TrimSuffix(attr, "}")
val, ok := d.Attributes[attr]
return val, ok
default:
return nil, false
}
}
// checkAttributeConstraint checks if a constraint is satisfied
func checkAttributeConstraint(ctx Context, operand string, lVal, rVal *psstructs.Attribute) bool {
// Check for constraints not handled by this checker.
switch operand {
case structs.ConstraintDistinctHosts, structs.ConstraintDistinctProperty:
return true
default:
break
}
switch operand {
case "<", "<=", ">", ">=", "=", "==", "is", "!=", "not":
v, ok := lVal.Compare(rVal)
if !ok {
return false
}
switch operand {
case "not", "!=":
return v != 0
case "is", "==", "=":
return v == 0
case "<":
return v == -1
case "<=":
return v != 1
case ">":
return v == 1
case ">=":
return v != -1
default:
return false
}
case structs.ConstraintVersion:
return checkAttributeVersionMatch(ctx, lVal, rVal)
case structs.ConstraintRegex:
ls, ok := lVal.GetString()
rs, ok2 := rVal.GetString()
if !ok || !ok2 {
return false
}
return checkRegexpMatch(ctx, ls, rs)
case structs.ConstraintSetContains, structs.ConstraintSetContainsAll:
ls, ok := lVal.GetString()
rs, ok2 := rVal.GetString()
if !ok || !ok2 {
return false
}
return checkSetContainsAll(ctx, ls, rs)
case structs.ConstraintSetContainsAny:
ls, ok := lVal.GetString()
rs, ok2 := rVal.GetString()
if !ok || !ok2 {
return false
}
return checkSetContainsAny(ls, rs)
default:
return false
}
return false
}

View file

@ -9,6 +9,7 @@ import (
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
psstructs "github.com/hashicorp/nomad/plugins/shared/structs"
"github.com/stretchr/testify/require"
)
@ -1619,3 +1620,458 @@ func TestSetContainsAny(t *testing.T) {
require.True(t, checkSetContainsAny("a", "a"))
require.False(t, checkSetContainsAny("b", "a"))
}
func TestDeviceChecker(t *testing.T) {
getTg := func(devices ...*structs.RequestedDevice) *structs.TaskGroup {
return &structs.TaskGroup{
Name: "example",
Tasks: []*structs.Task{
{
Resources: &structs.Resources{
Devices: devices,
},
},
},
}
}
// Just type
gpuTypeReq := &structs.RequestedDevice{
Name: "gpu",
Count: 1,
}
fpgaTypeReq := &structs.RequestedDevice{
Name: "fpga",
Count: 1,
}
// vendor/type
gpuVendorTypeReq := &structs.RequestedDevice{
Name: "nvidia/gpu",
Count: 1,
}
fpgaVendorTypeReq := &structs.RequestedDevice{
Name: "nvidia/fpga",
Count: 1,
}
// vendor/type/model
gpuFullReq := &structs.RequestedDevice{
Name: "nvidia/gpu/1080ti",
Count: 1,
}
fpgaFullReq := &structs.RequestedDevice{
Name: "nvidia/fpga/F100",
Count: 1,
}
// Just type but high count
gpuTypeHighCountReq := &structs.RequestedDevice{
Name: "gpu",
Count: 3,
}
getNode := func(devices ...*structs.NodeDeviceResource) *structs.Node {
n := mock.Node()
n.NodeResources.Devices = devices
return n
}
nvidia := &structs.NodeDeviceResource{
Vendor: "nvidia",
Type: "gpu",
Name: "1080ti",
Attributes: map[string]*psstructs.Attribute{
"memory": psstructs.NewIntAttribute(4, psstructs.UnitGiB),
"pci_bandwidth": psstructs.NewIntAttribute(995, psstructs.UnitMiBPerS),
"cores_clock": psstructs.NewIntAttribute(800, psstructs.UnitMHz),
},
Instances: []*structs.NodeDevice{
&structs.NodeDevice{
ID: uuid.Generate(),
Healthy: true,
},
&structs.NodeDevice{
ID: uuid.Generate(),
Healthy: true,
},
},
}
nvidiaUnhealthy := &structs.NodeDeviceResource{
Vendor: "nvidia",
Type: "gpu",
Name: "1080ti",
Instances: []*structs.NodeDevice{
&structs.NodeDevice{
ID: uuid.Generate(),
Healthy: false,
},
&structs.NodeDevice{
ID: uuid.Generate(),
Healthy: false,
},
},
}
intel := &structs.NodeDeviceResource{
Vendor: "intel",
Type: "gpu",
Name: "GT640",
Instances: []*structs.NodeDevice{
&structs.NodeDevice{
ID: uuid.Generate(),
Healthy: true,
},
&structs.NodeDevice{
ID: uuid.Generate(),
Healthy: false,
},
},
}
cases := []struct {
Name string
Result bool
NodeDevices []*structs.NodeDeviceResource
RequestedDevices []*structs.RequestedDevice
}{
{
Name: "no devices on node",
Result: false,
NodeDevices: nil,
RequestedDevices: []*structs.RequestedDevice{gpuTypeReq},
},
{
Name: "no requested devices on empty node",
Result: true,
NodeDevices: nil,
RequestedDevices: nil,
},
{
Name: "gpu devices by type",
Result: true,
NodeDevices: []*structs.NodeDeviceResource{nvidia},
RequestedDevices: []*structs.RequestedDevice{gpuTypeReq},
},
{
Name: "wrong devices by type",
Result: false,
NodeDevices: []*structs.NodeDeviceResource{nvidia},
RequestedDevices: []*structs.RequestedDevice{fpgaTypeReq},
},
{
Name: "devices by type unhealthy node",
Result: false,
NodeDevices: []*structs.NodeDeviceResource{nvidiaUnhealthy},
RequestedDevices: []*structs.RequestedDevice{gpuTypeReq},
},
{
Name: "gpu devices by vendor/type",
Result: true,
NodeDevices: []*structs.NodeDeviceResource{nvidia},
RequestedDevices: []*structs.RequestedDevice{gpuVendorTypeReq},
},
{
Name: "wrong devices by vendor/type",
Result: false,
NodeDevices: []*structs.NodeDeviceResource{nvidia},
RequestedDevices: []*structs.RequestedDevice{fpgaVendorTypeReq},
},
{
Name: "gpu devices by vendor/type/model",
Result: true,
NodeDevices: []*structs.NodeDeviceResource{nvidia},
RequestedDevices: []*structs.RequestedDevice{gpuFullReq},
},
{
Name: "wrong devices by vendor/type/model",
Result: false,
NodeDevices: []*structs.NodeDeviceResource{nvidia},
RequestedDevices: []*structs.RequestedDevice{fpgaFullReq},
},
{
Name: "too many requested",
Result: false,
NodeDevices: []*structs.NodeDeviceResource{nvidia},
RequestedDevices: []*structs.RequestedDevice{gpuTypeHighCountReq},
},
{
Name: "request split over groups",
Result: true,
NodeDevices: []*structs.NodeDeviceResource{nvidia, intel},
RequestedDevices: []*structs.RequestedDevice{gpuTypeHighCountReq},
},
{
Name: "meets constraints requirement",
Result: true,
NodeDevices: []*structs.NodeDeviceResource{nvidia},
RequestedDevices: []*structs.RequestedDevice{
{
Name: "nvidia/gpu",
Count: 1,
Constraints: []*structs.Constraint{
{
Operand: "=",
LTarget: "${driver.model}",
RTarget: "1080ti",
},
{
Operand: ">",
LTarget: "${driver.attr.memory}",
RTarget: "1320.5 MB",
},
{
Operand: "<=",
LTarget: "${driver.attr.pci_bandwidth}",
RTarget: ".98 GiB/s",
},
{
Operand: "=",
LTarget: "${driver.attr.cores_clock}",
RTarget: "800MHz",
},
},
},
},
},
{
Name: "meets constraints requirement multiple count",
Result: true,
NodeDevices: []*structs.NodeDeviceResource{nvidia},
RequestedDevices: []*structs.RequestedDevice{
{
Name: "nvidia/gpu",
Count: 2,
Constraints: []*structs.Constraint{
{
Operand: "=",
LTarget: "${driver.model}",
RTarget: "1080ti",
},
{
Operand: ">",
LTarget: "${driver.attr.memory}",
RTarget: "1320.5 MB",
},
{
Operand: "<=",
LTarget: "${driver.attr.pci_bandwidth}",
RTarget: ".98 GiB/s",
},
{
Operand: "=",
LTarget: "${driver.attr.cores_clock}",
RTarget: "800MHz",
},
},
},
},
},
{
Name: "meets constraints requirement over count",
Result: false,
NodeDevices: []*structs.NodeDeviceResource{nvidia},
RequestedDevices: []*structs.RequestedDevice{
{
Name: "nvidia/gpu",
Count: 5,
Constraints: []*structs.Constraint{
{
Operand: "=",
LTarget: "${driver.model}",
RTarget: "1080ti",
},
{
Operand: ">",
LTarget: "${driver.attr.memory}",
RTarget: "1320.5 MB",
},
{
Operand: "<=",
LTarget: "${driver.attr.pci_bandwidth}",
RTarget: ".98 GiB/s",
},
{
Operand: "=",
LTarget: "${driver.attr.cores_clock}",
RTarget: "800MHz",
},
},
},
},
},
{
Name: "does not meet first constraint",
Result: false,
NodeDevices: []*structs.NodeDeviceResource{nvidia},
RequestedDevices: []*structs.RequestedDevice{
{
Name: "nvidia/gpu",
Count: 1,
Constraints: []*structs.Constraint{
{
Operand: "=",
LTarget: "${driver.model}",
RTarget: "2080ti",
},
{
Operand: ">",
LTarget: "${driver.attr.memory}",
RTarget: "1320.5 MB",
},
{
Operand: "<=",
LTarget: "${driver.attr.pci_bandwidth}",
RTarget: ".98 GiB/s",
},
{
Operand: "=",
LTarget: "${driver.attr.cores_clock}",
RTarget: "800MHz",
},
},
},
},
},
{
Name: "does not meet second constraint",
Result: false,
NodeDevices: []*structs.NodeDeviceResource{nvidia},
RequestedDevices: []*structs.RequestedDevice{
{
Name: "nvidia/gpu",
Count: 1,
Constraints: []*structs.Constraint{
{
Operand: "=",
LTarget: "${driver.model}",
RTarget: "1080ti",
},
{
Operand: "<",
LTarget: "${driver.attr.memory}",
RTarget: "1320.5 MB",
},
{
Operand: "<=",
LTarget: "${driver.attr.pci_bandwidth}",
RTarget: ".98 GiB/s",
},
{
Operand: "=",
LTarget: "${driver.attr.cores_clock}",
RTarget: "800MHz",
},
},
},
},
},
}
for _, c := range cases {
t.Run(c.Name, func(t *testing.T) {
_, ctx := testContext(t)
checker := NewDeviceChecker(ctx)
checker.SetTaskGroup(getTg(c.RequestedDevices...))
if act := checker.Feasible(getNode(c.NodeDevices...)); act != c.Result {
t.Fatalf("got %v; want %v", act, c.Result)
}
})
}
}
func TestCheckAttributeConstraint(t *testing.T) {
type tcase struct {
op string
lVal, rVal *psstructs.Attribute
result bool
}
cases := []tcase{
{
op: "=",
lVal: psstructs.NewStringAttribute("foo"),
rVal: psstructs.NewStringAttribute("foo"),
result: true,
},
{
op: "is",
lVal: psstructs.NewStringAttribute("foo"),
rVal: psstructs.NewStringAttribute("foo"),
result: true,
},
{
op: "==",
lVal: psstructs.NewStringAttribute("foo"),
rVal: psstructs.NewStringAttribute("foo"),
result: true,
},
{
op: "!=",
lVal: psstructs.NewStringAttribute("foo"),
rVal: psstructs.NewStringAttribute("foo"),
result: false,
},
{
op: "!=",
lVal: psstructs.NewStringAttribute("foo"),
rVal: psstructs.NewStringAttribute("bar"),
result: true,
},
{
op: "not",
lVal: psstructs.NewStringAttribute("foo"),
rVal: psstructs.NewStringAttribute("bar"),
result: true,
},
{
op: structs.ConstraintVersion,
lVal: psstructs.NewStringAttribute("1.2.3"),
rVal: psstructs.NewStringAttribute("~> 1.0"),
result: true,
},
{
op: structs.ConstraintRegex,
lVal: psstructs.NewStringAttribute("foobarbaz"),
rVal: psstructs.NewStringAttribute("[\\w]+"),
result: true,
},
{
op: "<",
lVal: psstructs.NewStringAttribute("foo"),
rVal: psstructs.NewStringAttribute("bar"),
result: false,
},
{
op: structs.ConstraintSetContains,
lVal: psstructs.NewStringAttribute("foo,bar,baz"),
rVal: psstructs.NewStringAttribute("foo, bar "),
result: true,
},
{
op: structs.ConstraintSetContainsAll,
lVal: psstructs.NewStringAttribute("foo,bar,baz"),
rVal: psstructs.NewStringAttribute("foo, bar "),
result: true,
},
{
op: structs.ConstraintSetContains,
lVal: psstructs.NewStringAttribute("foo,bar,baz"),
rVal: psstructs.NewStringAttribute("foo,bam"),
result: false,
},
{
op: structs.ConstraintSetContainsAny,
lVal: psstructs.NewStringAttribute("foo,bar,baz"),
rVal: psstructs.NewStringAttribute("foo,bam"),
result: true,
},
}
for _, tc := range cases {
_, ctx := testContext(t)
if res := checkAttributeConstraint(ctx, tc.op, tc.lVal, tc.rVal); res != tc.result {
t.Fatalf("TC: %#v, Result: %v", tc, res)
}
}
}

View file

@ -48,6 +48,7 @@ type GenericStack struct {
jobConstraint *ConstraintChecker
taskGroupDrivers *DriverChecker
taskGroupConstraint *ConstraintChecker
taskGroupDevices *DeviceChecker
distinctHostsConstraint *DistinctHostsIterator
distinctPropertyConstraint *DistinctPropertyIterator
@ -87,12 +88,15 @@ func NewGenericStack(batch bool, ctx Context) *GenericStack {
// Filter on task group constraints second
s.taskGroupConstraint = NewConstraintChecker(ctx, nil)
// Filter on task group devices
s.taskGroupDevices = NewDeviceChecker(ctx)
// Create the feasibility wrapper which wraps all feasibility checks in
// which feasibility checking can be skipped if the computed node class has
// previously been marked as eligible or ineligible. Generally this will be
// checks that only needs to examine the single node to determine feasibility.
jobs := []FeasibilityChecker{s.jobConstraint}
tgs := []FeasibilityChecker{s.taskGroupDrivers, s.taskGroupConstraint}
tgs := []FeasibilityChecker{s.taskGroupDrivers, s.taskGroupConstraint, s.taskGroupDevices}
s.wrappedChecks = NewFeasibilityWrapper(ctx, s.quota, jobs, tgs)
// Filter on distinct host constraints.
@ -195,6 +199,7 @@ func (s *GenericStack) Select(tg *structs.TaskGroup, options *SelectOptions) *Ra
// Update the parameters of iterators
s.taskGroupDrivers.SetDrivers(tgConstr.drivers)
s.taskGroupConstraint.SetConstraints(tgConstr.constraints)
s.taskGroupDevices.SetTaskGroup(tg)
s.distinctHostsConstraint.SetTaskGroup(tg)
s.distinctPropertyConstraint.SetTaskGroup(tg)
s.wrappedChecks.SetTaskGroup(tg.Name)
@ -225,13 +230,16 @@ func (s *GenericStack) Select(tg *structs.TaskGroup, options *SelectOptions) *Ra
// SystemStack is the Stack used for the System scheduler. It is designed to
// attempt to make placements on all nodes.
type SystemStack struct {
ctx Context
source *StaticIterator
wrappedChecks *FeasibilityWrapper
quota FeasibleIterator
jobConstraint *ConstraintChecker
taskGroupDrivers *DriverChecker
taskGroupConstraint *ConstraintChecker
ctx Context
source *StaticIterator
wrappedChecks *FeasibilityWrapper
quota FeasibleIterator
jobConstraint *ConstraintChecker
taskGroupDrivers *DriverChecker
taskGroupConstraint *ConstraintChecker
taskGroupDevices *DeviceChecker
distinctPropertyConstraint *DistinctPropertyIterator
binPack *BinPackIterator
scoreNorm *ScoreNormalizationIterator
@ -259,12 +267,15 @@ func NewSystemStack(ctx Context) *SystemStack {
// Filter on task group constraints second
s.taskGroupConstraint = NewConstraintChecker(ctx, nil)
// Filter on task group devices
s.taskGroupDevices = NewDeviceChecker(ctx)
// Create the feasibility wrapper which wraps all feasibility checks in
// which feasibility checking can be skipped if the computed node class has
// previously been marked as eligible or ineligible. Generally this will be
// checks that only needs to examine the single node to determine feasibility.
jobs := []FeasibilityChecker{s.jobConstraint}
tgs := []FeasibilityChecker{s.taskGroupDrivers, s.taskGroupConstraint}
tgs := []FeasibilityChecker{s.taskGroupDrivers, s.taskGroupConstraint, s.taskGroupDevices}
s.wrappedChecks = NewFeasibilityWrapper(ctx, s.quota, jobs, tgs)
// Filter on distinct property constraints.
@ -311,6 +322,7 @@ func (s *SystemStack) Select(tg *structs.TaskGroup, options *SelectOptions) *Ran
// Update the parameters of iterators
s.taskGroupDrivers.SetDrivers(tgConstr.drivers)
s.taskGroupConstraint.SetConstraints(tgConstr.constraints)
s.taskGroupDevices.SetTaskGroup(tg)
s.wrappedChecks.SetTaskGroup(tg.Name)
s.distinctPropertyConstraint.SetTaskGroup(tg)
s.binPack.SetTaskGroup(tg)

View file

@ -758,7 +758,10 @@ func TestInplaceUpdate_ChangedTaskGroup(t *testing.T) {
// Create a new task group that prevents in-place updates.
tg := &structs.TaskGroup{}
*tg = *job.TaskGroups[0]
task := &structs.Task{Name: "FOO"}
task := &structs.Task{
Name: "FOO",
Resources: &structs.Resources{},
}
tg.Tasks = nil
tg.Tasks = append(tg.Tasks, task)