open-nomad/scheduler/feasible.go
Seth Hoenig 6b89527505 scheduler: enable upgrade path for bridge network finger print
This PR enables users of Nomad < 0.12 to upgrade to Nomad 0.12
and beyond. Nomad 0.12 introduced a network fingerprinter for
bridge networks, which is a contstraint checked for if bridge
network is being used. If users upgrade servers first as is
recommended, suddenly no clients running older versions of Nomad
will satisfy the bridge network resource constraint. Instead,
this change only enforces the constraint if the Nomad client
version is also >= 0.12.

Closes #8423
2020-11-13 14:17:01 -06:00

1487 lines
38 KiB
Go

package scheduler
import (
"fmt"
"reflect"
"regexp"
"strconv"
"strings"
memdb "github.com/hashicorp/go-memdb"
version "github.com/hashicorp/go-version"
"github.com/hashicorp/nomad/helper/constraints/semver"
"github.com/hashicorp/nomad/nomad/structs"
psstructs "github.com/hashicorp/nomad/plugins/shared/structs"
)
const (
FilterConstraintHostVolumes = "missing compatible host volumes"
FilterConstraintCSIPluginTemplate = "CSI plugin %s is missing from client %s"
FilterConstraintCSIPluginUnhealthyTemplate = "CSI plugin %s is unhealthy on client %s"
FilterConstraintCSIPluginMaxVolumesTemplate = "CSI plugin %s has the maximum number of volumes on client %s"
FilterConstraintCSIVolumesLookupFailed = "CSI volume lookup failed"
FilterConstraintCSIVolumeNotFoundTemplate = "missing CSI Volume %s"
FilterConstraintCSIVolumeNoReadTemplate = "CSI volume %s is unschedulable or has exhausted its available reader claims"
FilterConstraintCSIVolumeNoWriteTemplate = "CSI volume %s is unschedulable or is read-only"
FilterConstraintCSIVolumeInUseTemplate = "CSI volume %s has exhausted its available writer claims" //
FilterConstraintDrivers = "missing drivers"
FilterConstraintDevices = "missing devices"
)
var (
// predatesBridgeFingerprint returns true if the constraint matches a version
// of nomad that predates the addition of the bridge network finger-printer,
// which was added in Nomad v0.12
predatesBridgeFingerprint = mustBridgeConstraint()
)
func mustBridgeConstraint() version.Constraints {
versionC, err := version.NewConstraint("< 0.12")
if err != nil {
panic(err)
}
return versionC
}
// FeasibleIterator is used to iteratively yield nodes that
// match feasibility constraints. The iterators may manage
// some state for performance optimizations.
type FeasibleIterator interface {
// Next yields a feasible node or nil if exhausted
Next() *structs.Node
// Reset is invoked when an allocation has been placed
// to reset any stale state.
Reset()
}
// JobContextualIterator is an iterator that can have the job and task group set
// on it.
type ContextualIterator interface {
SetJob(*structs.Job)
SetTaskGroup(*structs.TaskGroup)
}
// FeasibilityChecker is used to check if a single node meets feasibility
// constraints.
type FeasibilityChecker interface {
Feasible(*structs.Node) bool
}
// StaticIterator is a FeasibleIterator which returns nodes
// in a static order. This is used at the base of the iterator
// chain only for testing due to deterministic behavior.
type StaticIterator struct {
ctx Context
nodes []*structs.Node
offset int
seen int
}
// NewStaticIterator constructs a random iterator from a list of nodes
func NewStaticIterator(ctx Context, nodes []*structs.Node) *StaticIterator {
iter := &StaticIterator{
ctx: ctx,
nodes: nodes,
}
return iter
}
func (iter *StaticIterator) Next() *structs.Node {
// Check if exhausted
n := len(iter.nodes)
if iter.offset == n || iter.seen == n {
if iter.seen != n { // seen has been Reset() to 0
iter.offset = 0
} else {
return nil
}
}
// Return the next offset, use this one
offset := iter.offset
iter.offset += 1
iter.seen += 1
iter.ctx.Metrics().EvaluateNode()
return iter.nodes[offset]
}
func (iter *StaticIterator) Reset() {
iter.seen = 0
}
func (iter *StaticIterator) SetNodes(nodes []*structs.Node) {
iter.nodes = nodes
iter.offset = 0
iter.seen = 0
}
// NewRandomIterator constructs a static iterator from a list of nodes
// after applying the Fisher-Yates algorithm for a random shuffle. This
// is applied in-place
func NewRandomIterator(ctx Context, nodes []*structs.Node) *StaticIterator {
// shuffle with the Fisher-Yates algorithm
shuffleNodes(nodes)
// Create a static iterator
return NewStaticIterator(ctx, nodes)
}
// HostVolumeChecker is a FeasibilityChecker which returns whether a node has
// the host volumes necessary to schedule a task group.
type HostVolumeChecker struct {
ctx Context
// volumes is a map[HostVolumeName][]RequestedVolume. The requested volumes are
// a slice because a single task group may request the same volume multiple times.
volumes map[string][]*structs.VolumeRequest
}
// NewHostVolumeChecker creates a HostVolumeChecker from a set of volumes
func NewHostVolumeChecker(ctx Context) *HostVolumeChecker {
return &HostVolumeChecker{
ctx: ctx,
}
}
// SetVolumes takes the volumes required by a task group and updates the checker.
func (h *HostVolumeChecker) SetVolumes(volumes map[string]*structs.VolumeRequest) {
lookupMap := make(map[string][]*structs.VolumeRequest)
// Convert the map from map[DesiredName]Request to map[Source][]Request to improve
// lookup performance. Also filter non-host volumes.
for _, req := range volumes {
if req.Type != structs.VolumeTypeHost {
continue
}
lookupMap[req.Source] = append(lookupMap[req.Source], req)
}
h.volumes = lookupMap
}
func (h *HostVolumeChecker) Feasible(candidate *structs.Node) bool {
if h.hasVolumes(candidate) {
return true
}
h.ctx.Metrics().FilterNode(candidate, FilterConstraintHostVolumes)
return false
}
func (h *HostVolumeChecker) hasVolumes(n *structs.Node) bool {
rLen := len(h.volumes)
hLen := len(n.HostVolumes)
// Fast path: Requested no volumes. No need to check further.
if rLen == 0 {
return true
}
// Fast path: Requesting more volumes than the node has, can't meet the criteria.
if rLen > hLen {
return false
}
for source, requests := range h.volumes {
nodeVolume, ok := n.HostVolumes[source]
if !ok {
return false
}
// If the volume supports being mounted as ReadWrite, we do not need to
// do further validation for readonly placement.
if !nodeVolume.ReadOnly {
continue
}
// The Volume can only be mounted ReadOnly, validate that no requests for
// it are ReadWrite.
for _, req := range requests {
if !req.ReadOnly {
return false
}
}
}
return true
}
type CSIVolumeChecker struct {
ctx Context
namespace string
jobID string
volumes map[string]*structs.VolumeRequest
}
func NewCSIVolumeChecker(ctx Context) *CSIVolumeChecker {
return &CSIVolumeChecker{
ctx: ctx,
}
}
func (c *CSIVolumeChecker) SetJobID(jobID string) {
c.jobID = jobID
}
func (c *CSIVolumeChecker) SetNamespace(namespace string) {
c.namespace = namespace
}
func (c *CSIVolumeChecker) SetVolumes(volumes map[string]*structs.VolumeRequest) {
xs := make(map[string]*structs.VolumeRequest)
// Filter to only CSI Volumes
for alias, req := range volumes {
if req.Type != structs.VolumeTypeCSI {
continue
}
xs[alias] = req
}
c.volumes = xs
}
func (c *CSIVolumeChecker) Feasible(n *structs.Node) bool {
hasPlugins, failReason := c.hasPlugins(n)
if hasPlugins {
return true
}
c.ctx.Metrics().FilterNode(n, failReason)
return false
}
func (c *CSIVolumeChecker) hasPlugins(n *structs.Node) (bool, string) {
// We can mount the volume if
// - if required, a healthy controller plugin is running the driver
// - the volume has free claims, or this job owns the claims
// - this node is running the node plugin, implies matching topology
// Fast path: Requested no volumes. No need to check further.
if len(c.volumes) == 0 {
return true, ""
}
ws := memdb.NewWatchSet()
// Find the count per plugin for this node, so that can enforce MaxVolumes
pluginCount := map[string]int64{}
iter, err := c.ctx.State().CSIVolumesByNodeID(ws, n.ID)
if err != nil {
return false, FilterConstraintCSIVolumesLookupFailed
}
for {
raw := iter.Next()
if raw == nil {
break
}
vol, ok := raw.(*structs.CSIVolume)
if !ok {
continue
}
pluginCount[vol.PluginID] += 1
}
// For volume requests, find volumes and determine feasibility
for _, req := range c.volumes {
vol, err := c.ctx.State().CSIVolumeByID(ws, c.namespace, req.Source)
if err != nil {
return false, FilterConstraintCSIVolumesLookupFailed
}
if vol == nil {
return false, fmt.Sprintf(FilterConstraintCSIVolumeNotFoundTemplate, req.Source)
}
// Check that this node has a healthy running plugin with the right PluginID
plugin, ok := n.CSINodePlugins[vol.PluginID]
if !ok {
return false, fmt.Sprintf(FilterConstraintCSIPluginTemplate, vol.PluginID, n.ID)
}
if !plugin.Healthy {
return false, fmt.Sprintf(FilterConstraintCSIPluginUnhealthyTemplate, vol.PluginID, n.ID)
}
if pluginCount[vol.PluginID] >= plugin.NodeInfo.MaxVolumes {
return false, fmt.Sprintf(FilterConstraintCSIPluginMaxVolumesTemplate, vol.PluginID, n.ID)
}
if req.ReadOnly {
if !vol.ReadSchedulable() {
return false, fmt.Sprintf(FilterConstraintCSIVolumeNoReadTemplate, vol.ID)
}
} else {
if !vol.WriteSchedulable() {
return false, fmt.Sprintf(FilterConstraintCSIVolumeNoWriteTemplate, vol.ID)
}
if vol.WriteFreeClaims() {
return true, ""
}
// Check the blocking allocations to see if they belong to this job
for id := range vol.WriteAllocs {
a, err := c.ctx.State().AllocByID(ws, id)
if err != nil || a == nil || a.Namespace != c.namespace || a.JobID != c.jobID {
return false, fmt.Sprintf(FilterConstraintCSIVolumeInUseTemplate, vol.ID)
}
}
}
}
return true, ""
}
// NetworkChecker is a FeasibilityChecker which returns whether a node has the
// network resources necessary to schedule the task group
type NetworkChecker struct {
ctx Context
networkMode string
ports []structs.Port
}
func NewNetworkChecker(ctx Context) *NetworkChecker {
return &NetworkChecker{ctx: ctx, networkMode: "host"}
}
func (c *NetworkChecker) SetNetwork(network *structs.NetworkResource) {
c.networkMode = network.Mode
if c.networkMode == "" {
c.networkMode = "host"
}
c.ports = make([]structs.Port, len(network.DynamicPorts)+len(network.ReservedPorts))
for _, port := range network.DynamicPorts {
c.ports = append(c.ports, port)
}
for _, port := range network.ReservedPorts {
c.ports = append(c.ports, port)
}
}
func (c *NetworkChecker) Feasible(option *structs.Node) bool {
if !c.hasNetwork(option) {
// special case - if the client is running a version older than 0.12 but
// the server is 0.12 or newer, we need to maintain an upgrade path for
// jobs looking for a bridge network that will not have been fingerprinted
// on the client (which was added in 0.12)
if c.networkMode == "bridge" {
sv, err := version.NewSemver(option.Attributes["nomad.version"])
if err == nil && predatesBridgeFingerprint.Check(sv) {
return true
}
}
c.ctx.Metrics().FilterNode(option, "missing network")
return false
}
if c.ports != nil {
if !c.hasHostNetworks(option) {
return false
}
}
return true
}
func (c *NetworkChecker) hasHostNetworks(option *structs.Node) bool {
for _, port := range c.ports {
if port.HostNetwork != "" {
found := false
for _, net := range option.NodeResources.NodeNetworks {
if net.HasAlias(port.HostNetwork) {
found = true
break
}
}
if !found {
c.ctx.Metrics().FilterNode(option, fmt.Sprintf("missing host network %q for port %q", port.HostNetwork, port.Label))
return false
}
}
}
return true
}
func (c *NetworkChecker) hasNetwork(option *structs.Node) bool {
if option.NodeResources == nil {
return false
}
for _, nw := range option.NodeResources.Networks {
mode := nw.Mode
if mode == "" {
mode = "host"
}
if mode == c.networkMode {
return true
}
}
return false
}
// DriverChecker is a FeasibilityChecker which returns whether a node has the
// drivers necessary to scheduler a task group.
type DriverChecker struct {
ctx Context
drivers map[string]struct{}
}
// NewDriverChecker creates a DriverChecker from a set of drivers
func NewDriverChecker(ctx Context, drivers map[string]struct{}) *DriverChecker {
return &DriverChecker{
ctx: ctx,
drivers: drivers,
}
}
func (c *DriverChecker) SetDrivers(d map[string]struct{}) {
c.drivers = d
}
func (c *DriverChecker) Feasible(option *structs.Node) bool {
// Use this node if possible
if c.hasDrivers(option) {
return true
}
c.ctx.Metrics().FilterNode(option, FilterConstraintDrivers)
return false
}
// hasDrivers is used to check if the node has all the appropriate
// drivers for this task group. Drivers are registered as node attribute
// like "driver.docker=1" with their corresponding version.
func (c *DriverChecker) hasDrivers(option *structs.Node) bool {
for driver := range c.drivers {
driverStr := fmt.Sprintf("driver.%s", driver)
// COMPAT: Remove in 0.10: As of Nomad 0.8, nodes have a DriverInfo that
// corresponds with every driver. As a Nomad server might be on a later
// version than a Nomad client, we need to check for compatibility here
// to verify the client supports this.
if driverInfo, ok := option.Drivers[driver]; ok {
if driverInfo == nil {
c.ctx.Logger().Named("driver_checker").Warn("node has no driver info set", "node_id", option.ID, "driver", driver)
return false
}
if driverInfo.Detected && driverInfo.Healthy {
continue
} else {
return false
}
}
value, ok := option.Attributes[driverStr]
if !ok {
return false
}
enabled, err := strconv.ParseBool(value)
if err != nil {
c.ctx.Logger().Named("driver_checker").Warn("node has invalid driver setting", "node_id", option.ID, "driver", driver, "val", value)
return false
}
if !enabled {
return false
}
}
return true
}
// DistinctHostsIterator is a FeasibleIterator which returns nodes that pass the
// distinct_hosts constraint. The constraint ensures that multiple allocations
// do not exist on the same node.
type DistinctHostsIterator struct {
ctx Context
source FeasibleIterator
tg *structs.TaskGroup
job *structs.Job
// Store whether the Job or TaskGroup has a distinct_hosts constraints so
// they don't have to be calculated every time Next() is called.
tgDistinctHosts bool
jobDistinctHosts bool
}
// NewDistinctHostsIterator creates a DistinctHostsIterator from a source.
func NewDistinctHostsIterator(ctx Context, source FeasibleIterator) *DistinctHostsIterator {
return &DistinctHostsIterator{
ctx: ctx,
source: source,
}
}
func (iter *DistinctHostsIterator) SetTaskGroup(tg *structs.TaskGroup) {
iter.tg = tg
iter.tgDistinctHosts = iter.hasDistinctHostsConstraint(tg.Constraints)
}
func (iter *DistinctHostsIterator) SetJob(job *structs.Job) {
iter.job = job
iter.jobDistinctHosts = iter.hasDistinctHostsConstraint(job.Constraints)
}
func (iter *DistinctHostsIterator) hasDistinctHostsConstraint(constraints []*structs.Constraint) bool {
for _, con := range constraints {
if con.Operand == structs.ConstraintDistinctHosts {
return true
}
}
return false
}
func (iter *DistinctHostsIterator) Next() *structs.Node {
for {
// Get the next option from the source
option := iter.source.Next()
// Hot-path if the option is nil or there are no distinct_hosts or
// distinct_property constraints.
hosts := iter.jobDistinctHosts || iter.tgDistinctHosts
if option == nil || !hosts {
return option
}
// Check if the host constraints are satisfied
if !iter.satisfiesDistinctHosts(option) {
iter.ctx.Metrics().FilterNode(option, structs.ConstraintDistinctHosts)
continue
}
return option
}
}
// satisfiesDistinctHosts checks if the node satisfies a distinct_hosts
// constraint either specified at the job level or the TaskGroup level.
func (iter *DistinctHostsIterator) satisfiesDistinctHosts(option *structs.Node) bool {
// Check if there is no constraint set.
if !(iter.jobDistinctHosts || iter.tgDistinctHosts) {
return true
}
// Get the proposed allocations
proposed, err := iter.ctx.ProposedAllocs(option.ID)
if err != nil {
iter.ctx.Logger().Named("distinct_hosts").Error("failed to get proposed allocations", "error", err)
return false
}
// Skip the node if the task group has already been allocated on it.
for _, alloc := range proposed {
// If the job has a distinct_hosts constraint we only need an alloc
// collision on the JobID but if the constraint is on the TaskGroup then
// we need both a job and TaskGroup collision.
jobCollision := alloc.JobID == iter.job.ID
taskCollision := alloc.TaskGroup == iter.tg.Name
if iter.jobDistinctHosts && jobCollision || jobCollision && taskCollision {
return false
}
}
return true
}
func (iter *DistinctHostsIterator) Reset() {
iter.source.Reset()
}
// DistinctPropertyIterator is a FeasibleIterator which returns nodes that pass the
// distinct_property constraint. The constraint ensures that multiple allocations
// do not use the same value of the given property.
type DistinctPropertyIterator struct {
ctx Context
source FeasibleIterator
tg *structs.TaskGroup
job *structs.Job
hasDistinctPropertyConstraints bool
jobPropertySets []*propertySet
groupPropertySets map[string][]*propertySet
}
// NewDistinctPropertyIterator creates a DistinctPropertyIterator from a source.
func NewDistinctPropertyIterator(ctx Context, source FeasibleIterator) *DistinctPropertyIterator {
return &DistinctPropertyIterator{
ctx: ctx,
source: source,
groupPropertySets: make(map[string][]*propertySet),
}
}
func (iter *DistinctPropertyIterator) SetTaskGroup(tg *structs.TaskGroup) {
iter.tg = tg
// Build the property set at the taskgroup level
if _, ok := iter.groupPropertySets[tg.Name]; !ok {
for _, c := range tg.Constraints {
if c.Operand != structs.ConstraintDistinctProperty {
continue
}
pset := NewPropertySet(iter.ctx, iter.job)
pset.SetTGConstraint(c, tg.Name)
iter.groupPropertySets[tg.Name] = append(iter.groupPropertySets[tg.Name], pset)
}
}
// Check if there is a distinct property
iter.hasDistinctPropertyConstraints = len(iter.jobPropertySets) != 0 || len(iter.groupPropertySets[tg.Name]) != 0
}
func (iter *DistinctPropertyIterator) SetJob(job *structs.Job) {
iter.job = job
// Build the property set at the job level
for _, c := range job.Constraints {
if c.Operand != structs.ConstraintDistinctProperty {
continue
}
pset := NewPropertySet(iter.ctx, job)
pset.SetJobConstraint(c)
iter.jobPropertySets = append(iter.jobPropertySets, pset)
}
}
func (iter *DistinctPropertyIterator) Next() *structs.Node {
for {
// Get the next option from the source
option := iter.source.Next()
// Hot path if there is nothing to check
if option == nil || !iter.hasDistinctPropertyConstraints {
return option
}
// Check if the constraints are met
if !iter.satisfiesProperties(option, iter.jobPropertySets) ||
!iter.satisfiesProperties(option, iter.groupPropertySets[iter.tg.Name]) {
continue
}
return option
}
}
// satisfiesProperties returns whether the option satisfies the set of
// properties. If not it will be filtered.
func (iter *DistinctPropertyIterator) satisfiesProperties(option *structs.Node, set []*propertySet) bool {
for _, ps := range set {
if satisfies, reason := ps.SatisfiesDistinctProperties(option, iter.tg.Name); !satisfies {
iter.ctx.Metrics().FilterNode(option, reason)
return false
}
}
return true
}
func (iter *DistinctPropertyIterator) Reset() {
iter.source.Reset()
for _, ps := range iter.jobPropertySets {
ps.PopulateProposed()
}
for _, sets := range iter.groupPropertySets {
for _, ps := range sets {
ps.PopulateProposed()
}
}
}
// ConstraintChecker is a FeasibilityChecker which returns nodes that match a
// given set of constraints. This is used to filter on job, task group, and task
// constraints.
type ConstraintChecker struct {
ctx Context
constraints []*structs.Constraint
}
// NewConstraintChecker creates a ConstraintChecker for a set of constraints
func NewConstraintChecker(ctx Context, constraints []*structs.Constraint) *ConstraintChecker {
return &ConstraintChecker{
ctx: ctx,
constraints: constraints,
}
}
func (c *ConstraintChecker) SetConstraints(constraints []*structs.Constraint) {
c.constraints = constraints
}
func (c *ConstraintChecker) Feasible(option *structs.Node) bool {
// Use this node if possible
for _, constraint := range c.constraints {
if !c.meetsConstraint(constraint, option) {
c.ctx.Metrics().FilterNode(option, constraint.String())
return false
}
}
return true
}
func (c *ConstraintChecker) meetsConstraint(constraint *structs.Constraint, option *structs.Node) bool {
// Resolve the targets. Targets that are not present are treated as `nil`.
// This is to allow for matching constraints where a target is not present.
lVal, lOk := resolveTarget(constraint.LTarget, option)
rVal, rOk := resolveTarget(constraint.RTarget, option)
// Check if satisfied
return checkConstraint(c.ctx, constraint.Operand, lVal, rVal, lOk, rOk)
}
// resolveTarget is used to resolve the LTarget and RTarget of a Constraint.
func resolveTarget(target string, node *structs.Node) (interface{}, bool) {
// If no prefix, this must be a literal value
if !strings.HasPrefix(target, "${") {
return target, true
}
// Handle the interpolations
switch {
case "${node.unique.id}" == target:
return node.ID, true
case "${node.datacenter}" == target:
return node.Datacenter, true
case "${node.unique.name}" == target:
return node.Name, true
case "${node.class}" == target:
return node.NodeClass, true
case strings.HasPrefix(target, "${attr."):
attr := strings.TrimSuffix(strings.TrimPrefix(target, "${attr."), "}")
val, ok := node.Attributes[attr]
return val, ok
case strings.HasPrefix(target, "${meta."):
meta := strings.TrimSuffix(strings.TrimPrefix(target, "${meta."), "}")
val, ok := node.Meta[meta]
return val, ok
default:
return nil, false
}
}
// checkConstraint checks if a constraint is satisfied. The lVal and rVal
// interfaces may be nil.
func checkConstraint(ctx Context, operand string, lVal, rVal interface{}, lFound, rFound bool) bool {
// Check for constraints not handled by this checker.
switch operand {
case structs.ConstraintDistinctHosts, structs.ConstraintDistinctProperty:
return true
default:
break
}
switch operand {
case "=", "==", "is":
return lFound && rFound && reflect.DeepEqual(lVal, rVal)
case "!=", "not":
return !reflect.DeepEqual(lVal, rVal)
case "<", "<=", ">", ">=":
return lFound && rFound && checkLexicalOrder(operand, lVal, rVal)
case structs.ConstraintAttributeIsSet:
return lFound
case structs.ConstraintAttributeIsNotSet:
return !lFound
case structs.ConstraintVersion:
parser := newVersionConstraintParser(ctx)
return lFound && rFound && checkVersionMatch(ctx, parser, lVal, rVal)
case structs.ConstraintSemver:
parser := newSemverConstraintParser(ctx)
return lFound && rFound && checkVersionMatch(ctx, parser, lVal, rVal)
case structs.ConstraintRegex:
return lFound && rFound && checkRegexpMatch(ctx, lVal, rVal)
case structs.ConstraintSetContains, structs.ConstraintSetContainsAll:
return lFound && rFound && checkSetContainsAll(ctx, lVal, rVal)
case structs.ConstraintSetContainsAny:
return lFound && rFound && checkSetContainsAny(lVal, rVal)
default:
return false
}
}
// checkAffinity checks if a specific affinity is satisfied
func checkAffinity(ctx Context, operand string, lVal, rVal interface{}, lFound, rFound bool) bool {
return checkConstraint(ctx, operand, lVal, rVal, lFound, rFound)
}
// checkAttributeAffinity checks if an affinity is satisfied
func checkAttributeAffinity(ctx Context, operand string, lVal, rVal *psstructs.Attribute, lFound, rFound bool) bool {
return checkAttributeConstraint(ctx, operand, lVal, rVal, lFound, rFound)
}
// checkLexicalOrder is used to check for lexical ordering
func checkLexicalOrder(op string, lVal, rVal interface{}) bool {
// Ensure the values are strings
lStr, ok := lVal.(string)
if !ok {
return false
}
rStr, ok := rVal.(string)
if !ok {
return false
}
switch op {
case "<":
return lStr < rStr
case "<=":
return lStr <= rStr
case ">":
return lStr > rStr
case ">=":
return lStr >= rStr
default:
return false
}
}
// checkVersionMatch is used to compare a version on the
// left hand side with a set of constraints on the right hand side
func checkVersionMatch(ctx Context, parse verConstraintParser, lVal, rVal interface{}) bool {
// Parse the version
var versionStr string
switch v := lVal.(type) {
case string:
versionStr = v
case int:
versionStr = fmt.Sprintf("%d", v)
default:
return false
}
// Parse the version
vers, err := version.NewVersion(versionStr)
if err != nil {
return false
}
// Constraint must be a string
constraintStr, ok := rVal.(string)
if !ok {
return false
}
// Parse the constraints
constraints := parse(constraintStr)
if constraints == nil {
return false
}
// Check the constraints against the version
return constraints.Check(vers)
}
// checkAttributeVersionMatch is used to compare a version on the
// left hand side with a set of constraints on the right hand side
func checkAttributeVersionMatch(ctx Context, parse verConstraintParser, lVal, rVal *psstructs.Attribute) bool {
// Parse the version
var versionStr string
if s, ok := lVal.GetString(); ok {
versionStr = s
} else if i, ok := lVal.GetInt(); ok {
versionStr = fmt.Sprintf("%d", i)
} else {
return false
}
// Parse the version
vers, err := version.NewVersion(versionStr)
if err != nil {
return false
}
// Constraint must be a string
constraintStr, ok := rVal.GetString()
if !ok {
return false
}
// Parse the constraints
constraints := parse(constraintStr)
if constraints == nil {
return false
}
// Check the constraints against the version
return constraints.Check(vers)
}
// checkRegexpMatch is used to compare a value on the
// left hand side with a regexp on the right hand side
func checkRegexpMatch(ctx Context, lVal, rVal interface{}) bool {
// Ensure left-hand is string
lStr, ok := lVal.(string)
if !ok {
return false
}
// Regexp must be a string
regexpStr, ok := rVal.(string)
if !ok {
return false
}
// Check the cache
cache := ctx.RegexpCache()
re := cache[regexpStr]
// Parse the regexp
if re == nil {
var err error
re, err = regexp.Compile(regexpStr)
if err != nil {
return false
}
cache[regexpStr] = re
}
// Look for a match
return re.MatchString(lStr)
}
// checkSetContainsAll is used to see if the left hand side contains the
// string on the right hand side
func checkSetContainsAll(ctx Context, lVal, rVal interface{}) bool {
// Ensure left-hand is string
lStr, ok := lVal.(string)
if !ok {
return false
}
// Regexp must be a string
rStr, ok := rVal.(string)
if !ok {
return false
}
input := strings.Split(lStr, ",")
lookup := make(map[string]struct{}, len(input))
for _, in := range input {
cleaned := strings.TrimSpace(in)
lookup[cleaned] = struct{}{}
}
for _, r := range strings.Split(rStr, ",") {
cleaned := strings.TrimSpace(r)
if _, ok := lookup[cleaned]; !ok {
return false
}
}
return true
}
// checkSetContainsAny is used to see if the left hand side contains any
// values on the right hand side
func checkSetContainsAny(lVal, rVal interface{}) bool {
// Ensure left-hand is string
lStr, ok := lVal.(string)
if !ok {
return false
}
// RHS must be a string
rStr, ok := rVal.(string)
if !ok {
return false
}
input := strings.Split(lStr, ",")
lookup := make(map[string]struct{}, len(input))
for _, in := range input {
cleaned := strings.TrimSpace(in)
lookup[cleaned] = struct{}{}
}
for _, r := range strings.Split(rStr, ",") {
cleaned := strings.TrimSpace(r)
if _, ok := lookup[cleaned]; ok {
return true
}
}
return false
}
// FeasibilityWrapper is a FeasibleIterator which wraps both job and task group
// FeasibilityCheckers in which feasibility checking can be skipped if the
// computed node class has previously been marked as eligible or ineligible.
type FeasibilityWrapper struct {
ctx Context
source FeasibleIterator
jobCheckers []FeasibilityChecker
tgCheckers []FeasibilityChecker
tgAvailable []FeasibilityChecker
tg string
}
// NewFeasibilityWrapper returns a FeasibleIterator based on the passed source
// and FeasibilityCheckers.
func NewFeasibilityWrapper(ctx Context, source FeasibleIterator,
jobCheckers, tgCheckers, tgAvailable []FeasibilityChecker) *FeasibilityWrapper {
return &FeasibilityWrapper{
ctx: ctx,
source: source,
jobCheckers: jobCheckers,
tgCheckers: tgCheckers,
tgAvailable: tgAvailable,
}
}
func (w *FeasibilityWrapper) SetTaskGroup(tg string) {
w.tg = tg
}
func (w *FeasibilityWrapper) Reset() {
w.source.Reset()
}
// Next returns an eligible node, only running the FeasibilityCheckers as needed
// based on the sources computed node class.
func (w *FeasibilityWrapper) Next() *structs.Node {
evalElig := w.ctx.Eligibility()
metrics := w.ctx.Metrics()
OUTER:
for {
// Get the next option from the source
option := w.source.Next()
if option == nil {
return nil
}
// Check if the job has been marked as eligible or ineligible.
jobEscaped, jobUnknown := false, false
switch evalElig.JobStatus(option.ComputedClass) {
case EvalComputedClassIneligible:
// Fast path the ineligible case
metrics.FilterNode(option, "computed class ineligible")
continue
case EvalComputedClassEscaped:
jobEscaped = true
case EvalComputedClassUnknown:
jobUnknown = true
}
// Run the job feasibility checks.
for _, check := range w.jobCheckers {
feasible := check.Feasible(option)
if !feasible {
// If the job hasn't escaped, set it to be ineligible since it
// failed a job check.
if !jobEscaped {
evalElig.SetJobEligibility(false, option.ComputedClass)
}
continue OUTER
}
}
// Set the job eligibility if the constraints weren't escaped and it
// hasn't been set before.
if !jobEscaped && jobUnknown {
evalElig.SetJobEligibility(true, option.ComputedClass)
}
// Check if the task group has been marked as eligible or ineligible.
tgEscaped, tgUnknown := false, false
switch evalElig.TaskGroupStatus(w.tg, option.ComputedClass) {
case EvalComputedClassIneligible:
// Fast path the ineligible case
metrics.FilterNode(option, "computed class ineligible")
continue
case EvalComputedClassEligible:
// Fast path the eligible case
if w.available(option) {
return option
}
// We match the class but are temporarily unavailable, the eval
// should be blocked
return nil
case EvalComputedClassEscaped:
tgEscaped = true
case EvalComputedClassUnknown:
tgUnknown = true
}
// Run the task group feasibility checks.
for _, check := range w.tgCheckers {
feasible := check.Feasible(option)
if !feasible {
// If the task group hasn't escaped, set it to be ineligible
// since it failed a check.
if !tgEscaped {
evalElig.SetTaskGroupEligibility(false, w.tg, option.ComputedClass)
}
continue OUTER
}
}
// Set the task group eligibility if the constraints weren't escaped and
// it hasn't been set before.
if !tgEscaped && tgUnknown {
evalElig.SetTaskGroupEligibility(true, w.tg, option.ComputedClass)
}
// tgAvailable handlers are available transiently, so we test them without
// affecting the computed class
if !w.available(option) {
continue OUTER
}
return option
}
}
// available checks transient feasibility checkers which depend on changing conditions,
// e.g. the health status of a plugin or driver
func (w *FeasibilityWrapper) available(option *structs.Node) bool {
// If we don't have any availability checks, we're available
if len(w.tgAvailable) == 0 {
return true
}
for _, check := range w.tgAvailable {
if !check.Feasible(option) {
return false
}
}
return true
}
// DeviceChecker is a FeasibilityChecker which returns whether a node has the
// devices necessary to scheduler a task group.
type DeviceChecker struct {
ctx Context
// required is the set of requested devices that must exist on the node
required []*structs.RequestedDevice
// requiresDevices indicates if the task group requires devices
requiresDevices bool
}
// NewDeviceChecker creates a DeviceChecker
func NewDeviceChecker(ctx Context) *DeviceChecker {
return &DeviceChecker{
ctx: ctx,
}
}
func (c *DeviceChecker) SetTaskGroup(tg *structs.TaskGroup) {
c.required = nil
for _, task := range tg.Tasks {
c.required = append(c.required, task.Resources.Devices...)
}
c.requiresDevices = len(c.required) != 0
}
func (c *DeviceChecker) Feasible(option *structs.Node) bool {
if c.hasDevices(option) {
return true
}
c.ctx.Metrics().FilterNode(option, FilterConstraintDevices)
return false
}
func (c *DeviceChecker) hasDevices(option *structs.Node) bool {
if !c.requiresDevices {
return true
}
// COMPAT(0.11): Remove in 0.11
// The node does not have the new resources object so it can not have any
// devices
if option.NodeResources == nil {
return false
}
// Check if the node has any devices
nodeDevs := option.NodeResources.Devices
if len(nodeDevs) == 0 {
return false
}
// Create a mapping of node devices to the remaining count
available := make(map[*structs.NodeDeviceResource]uint64, len(nodeDevs))
for _, d := range nodeDevs {
var healthy uint64 = 0
for _, instance := range d.Instances {
if instance.Healthy {
healthy++
}
}
if healthy != 0 {
available[d] = healthy
}
}
// Go through the required devices trying to find matches
OUTER:
for _, req := range c.required {
// Determine how many there are to place
desiredCount := req.Count
// Go through the device resources and see if we have a match
for d, unused := range available {
if unused == 0 {
// Depleted
continue
}
// First check we have enough instances of the device since this is
// cheaper than checking the constraints
if unused < desiredCount {
continue
}
// Check the constraints
if nodeDeviceMatches(c.ctx, d, req) {
// Consume the instances
available[d] -= desiredCount
// Move on to the next request
continue OUTER
}
}
// We couldn't match the request for the device
return false
}
// Only satisfied if there are no more devices to place
return true
}
// nodeDeviceMatches checks if the device matches the request and its
// constraints. It doesn't check the count.
func nodeDeviceMatches(ctx Context, d *structs.NodeDeviceResource, req *structs.RequestedDevice) bool {
if !d.ID().Matches(req.ID()) {
return false
}
// There are no constraints to consider
if len(req.Constraints) == 0 {
return true
}
for _, c := range req.Constraints {
// Resolve the targets
lVal, lOk := resolveDeviceTarget(c.LTarget, d)
rVal, rOk := resolveDeviceTarget(c.RTarget, d)
// Check if satisfied
if !checkAttributeConstraint(ctx, c.Operand, lVal, rVal, lOk, rOk) {
return false
}
}
return true
}
// resolveDeviceTarget is used to resolve the LTarget and RTarget of a Constraint
// when used on a device
func resolveDeviceTarget(target string, d *structs.NodeDeviceResource) (*psstructs.Attribute, bool) {
// If no prefix, this must be a literal value
if !strings.HasPrefix(target, "${") {
return psstructs.ParseAttribute(target), true
}
// Handle the interpolations
switch {
case "${device.model}" == target:
return psstructs.NewStringAttribute(d.Name), true
case "${device.vendor}" == target:
return psstructs.NewStringAttribute(d.Vendor), true
case "${device.type}" == target:
return psstructs.NewStringAttribute(d.Type), true
case strings.HasPrefix(target, "${device.attr."):
attr := strings.TrimPrefix(target, "${device.attr.")
attr = strings.TrimSuffix(attr, "}")
val, ok := d.Attributes[attr]
return val, ok
default:
return nil, false
}
}
// checkAttributeConstraint checks if a constraint is satisfied. nil equality
// comparisons are considered to be false.
func checkAttributeConstraint(ctx Context, operand string, lVal, rVal *psstructs.Attribute, lFound, rFound bool) bool {
// Check for constraints not handled by this checker.
switch operand {
case structs.ConstraintDistinctHosts, structs.ConstraintDistinctProperty:
return true
default:
break
}
switch operand {
case "!=", "not":
// Neither value was provided, nil != nil == false
if !(lFound || rFound) {
return false
}
// Only 1 value was provided, therefore nil != some == true
if lFound != rFound {
return true
}
// Both values were provided, so actually compare them
v, ok := lVal.Compare(rVal)
if !ok {
return false
}
return v != 0
case "<", "<=", ">", ">=", "=", "==", "is":
if !(lFound && rFound) {
return false
}
v, ok := lVal.Compare(rVal)
if !ok {
return false
}
switch operand {
case "is", "==", "=":
return v == 0
case "<":
return v == -1
case "<=":
return v != 1
case ">":
return v == 1
case ">=":
return v != -1
default:
return false
}
case structs.ConstraintVersion:
if !(lFound && rFound) {
return false
}
parser := newVersionConstraintParser(ctx)
return checkAttributeVersionMatch(ctx, parser, lVal, rVal)
case structs.ConstraintSemver:
if !(lFound && rFound) {
return false
}
parser := newSemverConstraintParser(ctx)
return checkAttributeVersionMatch(ctx, parser, lVal, rVal)
case structs.ConstraintRegex:
if !(lFound && rFound) {
return false
}
ls, ok := lVal.GetString()
rs, ok2 := rVal.GetString()
if !ok || !ok2 {
return false
}
return checkRegexpMatch(ctx, ls, rs)
case structs.ConstraintSetContains, structs.ConstraintSetContainsAll:
if !(lFound && rFound) {
return false
}
ls, ok := lVal.GetString()
rs, ok2 := rVal.GetString()
if !ok || !ok2 {
return false
}
return checkSetContainsAll(ctx, ls, rs)
case structs.ConstraintSetContainsAny:
if !(lFound && rFound) {
return false
}
ls, ok := lVal.GetString()
rs, ok2 := rVal.GetString()
if !ok || !ok2 {
return false
}
return checkSetContainsAny(ls, rs)
case structs.ConstraintAttributeIsSet:
return lFound
case structs.ConstraintAttributeIsNotSet:
return !lFound
default:
return false
}
}
// VerConstraints is the interface implemented by both go-verson constraints
// and semver constraints.
type VerConstraints interface {
Check(v *version.Version) bool
String() string
}
// verConstraintParser returns a version constraints implementation (go-version
// or semver).
type verConstraintParser func(verConstraint string) VerConstraints
func newVersionConstraintParser(ctx Context) verConstraintParser {
cache := ctx.VersionConstraintCache()
return func(cstr string) VerConstraints {
if c := cache[cstr]; c != nil {
return c
}
constraints, err := version.NewConstraint(cstr)
if err != nil {
return nil
}
cache[cstr] = constraints
return constraints
}
}
func newSemverConstraintParser(ctx Context) verConstraintParser {
cache := ctx.SemverConstraintCache()
return func(cstr string) VerConstraints {
if c := cache[cstr]; c != nil {
return c
}
constraints, err := semver.NewConstraint(cstr)
if err != nil {
return nil
}
cache[cstr] = constraints
return constraints
}
}