package scheduler import ( "fmt" "reflect" "regexp" "strconv" "strings" memdb "github.com/hashicorp/go-memdb" version "github.com/hashicorp/go-version" "github.com/hashicorp/nomad/helper/constraints/semver" "github.com/hashicorp/nomad/nomad/structs" psstructs "github.com/hashicorp/nomad/plugins/shared/structs" ) const ( FilterConstraintHostVolumes = "missing compatible host volumes" FilterConstraintCSIPluginTemplate = "CSI plugin %s is missing from client %s" FilterConstraintCSIPluginUnhealthyTemplate = "CSI plugin %s is unhealthy on client %s" FilterConstraintCSIPluginMaxVolumesTemplate = "CSI plugin %s has the maximum number of volumes on client %s" FilterConstraintCSIVolumesLookupFailed = "CSI volume lookup failed" FilterConstraintCSIVolumeNotFoundTemplate = "missing CSI Volume %s" FilterConstraintCSIVolumeNoReadTemplate = "CSI volume %s is unschedulable or has exhausted its available reader claims" FilterConstraintCSIVolumeNoWriteTemplate = "CSI volume %s is unschedulable or is read-only" FilterConstraintCSIVolumeInUseTemplate = "CSI volume %s has exhausted its available writer claims" FilterConstraintCSIVolumeGCdAllocationTemplate = "CSI volume %s has exhausted its available writer claims and is claimed by a garbage collected allocation %s; waiting for claim to be released" FilterConstraintDrivers = "missing drivers" FilterConstraintDevices = "missing devices" FilterConstraintsCSIPluginTopology = "did not meet topology requirement" ) var ( // predatesBridgeFingerprint returns true if the constraint matches a version // of nomad that predates the addition of the bridge network finger-printer, // which was added in Nomad v0.12 predatesBridgeFingerprint = mustBridgeConstraint() ) func mustBridgeConstraint() version.Constraints { versionC, err := version.NewConstraint("< 0.12") if err != nil { panic(err) } return versionC } // FeasibleIterator is used to iteratively yield nodes that // match feasibility constraints. The iterators may manage // some state for performance optimizations. type FeasibleIterator interface { // Next yields a feasible node or nil if exhausted Next() *structs.Node // Reset is invoked when an allocation has been placed // to reset any stale state. Reset() } // JobContextualIterator is an iterator that can have the job and task group set // on it. type ContextualIterator interface { SetJob(*structs.Job) SetTaskGroup(*structs.TaskGroup) } // FeasibilityChecker is used to check if a single node meets feasibility // constraints. type FeasibilityChecker interface { Feasible(*structs.Node) bool } // StaticIterator is a FeasibleIterator which returns nodes // in a static order. This is used at the base of the iterator // chain only for testing due to deterministic behavior. type StaticIterator struct { ctx Context nodes []*structs.Node offset int seen int } // NewStaticIterator constructs a random iterator from a list of nodes func NewStaticIterator(ctx Context, nodes []*structs.Node) *StaticIterator { iter := &StaticIterator{ ctx: ctx, nodes: nodes, } return iter } func (iter *StaticIterator) Next() *structs.Node { // Check if exhausted n := len(iter.nodes) if iter.offset == n || iter.seen == n { if iter.seen != n { // seen has been Reset() to 0 iter.offset = 0 } else { return nil } } // Return the next offset, use this one offset := iter.offset iter.offset += 1 iter.seen += 1 iter.ctx.Metrics().EvaluateNode() return iter.nodes[offset] } func (iter *StaticIterator) Reset() { iter.seen = 0 } func (iter *StaticIterator) SetNodes(nodes []*structs.Node) { iter.nodes = nodes iter.offset = 0 iter.seen = 0 } // NewRandomIterator constructs a static iterator from a list of nodes // after applying the Fisher-Yates algorithm for a random shuffle. This // is applied in-place func NewRandomIterator(ctx Context, nodes []*structs.Node) *StaticIterator { // shuffle with the Fisher-Yates algorithm idx, _ := ctx.State().LatestIndex() shuffleNodes(ctx.Plan(), idx, nodes) // Create a static iterator return NewStaticIterator(ctx, nodes) } // HostVolumeChecker is a FeasibilityChecker which returns whether a node has // the host volumes necessary to schedule a task group. type HostVolumeChecker struct { ctx Context // volumes is a map[HostVolumeName][]RequestedVolume. The requested volumes are // a slice because a single task group may request the same volume multiple times. volumes map[string][]*structs.VolumeRequest } // NewHostVolumeChecker creates a HostVolumeChecker from a set of volumes func NewHostVolumeChecker(ctx Context) *HostVolumeChecker { return &HostVolumeChecker{ ctx: ctx, } } // SetVolumes takes the volumes required by a task group and updates the checker. func (h *HostVolumeChecker) SetVolumes(volumes map[string]*structs.VolumeRequest) { lookupMap := make(map[string][]*structs.VolumeRequest) // Convert the map from map[DesiredName]Request to map[Source][]Request to improve // lookup performance. Also filter non-host volumes. for _, req := range volumes { if req.Type != structs.VolumeTypeHost { continue } lookupMap[req.Source] = append(lookupMap[req.Source], req) } h.volumes = lookupMap } func (h *HostVolumeChecker) Feasible(candidate *structs.Node) bool { if h.hasVolumes(candidate) { return true } h.ctx.Metrics().FilterNode(candidate, FilterConstraintHostVolumes) return false } func (h *HostVolumeChecker) hasVolumes(n *structs.Node) bool { rLen := len(h.volumes) hLen := len(n.HostVolumes) // Fast path: Requested no volumes. No need to check further. if rLen == 0 { return true } // Fast path: Requesting more volumes than the node has, can't meet the criteria. if rLen > hLen { return false } for source, requests := range h.volumes { nodeVolume, ok := n.HostVolumes[source] if !ok { return false } // If the volume supports being mounted as ReadWrite, we do not need to // do further validation for readonly placement. if !nodeVolume.ReadOnly { continue } // The Volume can only be mounted ReadOnly, validate that no requests for // it are ReadWrite. for _, req := range requests { if !req.ReadOnly { return false } } } return true } type CSIVolumeChecker struct { ctx Context namespace string jobID string volumes map[string]*structs.VolumeRequest } func NewCSIVolumeChecker(ctx Context) *CSIVolumeChecker { return &CSIVolumeChecker{ ctx: ctx, } } func (c *CSIVolumeChecker) SetJobID(jobID string) { c.jobID = jobID } func (c *CSIVolumeChecker) SetNamespace(namespace string) { c.namespace = namespace } func (c *CSIVolumeChecker) SetVolumes(allocName string, volumes map[string]*structs.VolumeRequest) { xs := make(map[string]*structs.VolumeRequest) // Filter to only CSI Volumes for alias, req := range volumes { if req.Type != structs.VolumeTypeCSI { continue } if req.PerAlloc { // provide a unique volume source per allocation copied := req.Copy() copied.Source = copied.Source + structs.AllocSuffix(allocName) xs[alias] = copied } else { xs[alias] = req } } c.volumes = xs } func (c *CSIVolumeChecker) Feasible(n *structs.Node) bool { ok, failReason := c.isFeasible(n) if ok { return true } c.ctx.Metrics().FilterNode(n, failReason) return false } func (c *CSIVolumeChecker) isFeasible(n *structs.Node) (bool, string) { // We can mount the volume if // - if required, a healthy controller plugin is running the driver // - the volume has free claims, or this job owns the claims // - this node is running the node plugin, implies matching topology // Fast path: Requested no volumes. No need to check further. if len(c.volumes) == 0 { return true, "" } ws := memdb.NewWatchSet() // Find the count per plugin for this node, so that can enforce MaxVolumes pluginCount := map[string]int64{} iter, err := c.ctx.State().CSIVolumesByNodeID(ws, "", n.ID) if err != nil { return false, FilterConstraintCSIVolumesLookupFailed } for { raw := iter.Next() if raw == nil { break } vol, ok := raw.(*structs.CSIVolume) if !ok { continue } pluginCount[vol.PluginID] += 1 } // For volume requests, find volumes and determine feasibility for _, req := range c.volumes { vol, err := c.ctx.State().CSIVolumeByID(ws, c.namespace, req.Source) if err != nil { return false, FilterConstraintCSIVolumesLookupFailed } if vol == nil { return false, fmt.Sprintf(FilterConstraintCSIVolumeNotFoundTemplate, req.Source) } // Check that this node has a healthy running plugin with the right PluginID plugin, ok := n.CSINodePlugins[vol.PluginID] if !ok { return false, fmt.Sprintf(FilterConstraintCSIPluginTemplate, vol.PluginID, n.ID) } if !plugin.Healthy { return false, fmt.Sprintf(FilterConstraintCSIPluginUnhealthyTemplate, vol.PluginID, n.ID) } if pluginCount[vol.PluginID] >= plugin.NodeInfo.MaxVolumes { return false, fmt.Sprintf(FilterConstraintCSIPluginMaxVolumesTemplate, vol.PluginID, n.ID) } // CSI spec: "If requisite is specified, the provisioned // volume MUST be accessible from at least one of the // requisite topologies." if len(vol.Topologies) > 0 { if !plugin.NodeInfo.AccessibleTopology.MatchFound(vol.Topologies) { return false, FilterConstraintsCSIPluginTopology } } if req.ReadOnly { if !vol.ReadSchedulable() { return false, fmt.Sprintf(FilterConstraintCSIVolumeNoReadTemplate, vol.ID) } } else { if !vol.WriteSchedulable() { return false, fmt.Sprintf(FilterConstraintCSIVolumeNoWriteTemplate, vol.ID) } if !vol.HasFreeWriteClaims() { for id := range vol.WriteAllocs { a, err := c.ctx.State().AllocByID(ws, id) // the alloc for this blocking claim has been // garbage collected but the volumewatcher hasn't // finished releasing the claim (and possibly // detaching the volume), so we need to block // until it can be scheduled if err != nil || a == nil { return false, fmt.Sprintf( FilterConstraintCSIVolumeGCdAllocationTemplate, vol.ID, id) } else if a.Namespace != c.namespace || a.JobID != c.jobID { // the blocking claim is for another live job // so it's legitimately blocking more write // claims return false, fmt.Sprintf( FilterConstraintCSIVolumeInUseTemplate, vol.ID) } } } } } return true, "" } // NetworkChecker is a FeasibilityChecker which returns whether a node has the // network resources necessary to schedule the task group type NetworkChecker struct { ctx Context networkMode string ports []structs.Port } func NewNetworkChecker(ctx Context) *NetworkChecker { return &NetworkChecker{ctx: ctx, networkMode: "host"} } func (c *NetworkChecker) SetNetwork(network *structs.NetworkResource) { c.networkMode = network.Mode if c.networkMode == "" { c.networkMode = "host" } c.ports = make([]structs.Port, len(network.DynamicPorts)+len(network.ReservedPorts)) c.ports = append(c.ports, network.DynamicPorts...) c.ports = append(c.ports, network.ReservedPorts...) } func (c *NetworkChecker) Feasible(option *structs.Node) bool { if !c.hasNetwork(option) { // special case - if the client is running a version older than 0.12 but // the server is 0.12 or newer, we need to maintain an upgrade path for // jobs looking for a bridge network that will not have been fingerprinted // on the client (which was added in 0.12) if c.networkMode == "bridge" { sv, err := version.NewSemver(option.Attributes["nomad.version"]) if err == nil && predatesBridgeFingerprint.Check(sv) { return true } } c.ctx.Metrics().FilterNode(option, "missing network") return false } if c.ports != nil { if !c.hasHostNetworks(option) { return false } } return true } func (c *NetworkChecker) hasHostNetworks(option *structs.Node) bool { for _, port := range c.ports { if port.HostNetwork != "" { hostNetworkValue, hostNetworkOk := resolveTarget(port.HostNetwork, option) if !hostNetworkOk { c.ctx.Metrics().FilterNode(option, fmt.Sprintf("invalid host network %q template for port %q", port.HostNetwork, port.Label)) return false } found := false for _, net := range option.NodeResources.NodeNetworks { if net.HasAlias(hostNetworkValue.(string)) { found = true break } } if !found { c.ctx.Metrics().FilterNode(option, fmt.Sprintf("missing host network %q for port %q", hostNetworkValue.(string), port.Label)) return false } } } return true } func (c *NetworkChecker) hasNetwork(option *structs.Node) bool { if option.NodeResources == nil { return false } for _, nw := range option.NodeResources.Networks { mode := nw.Mode if mode == "" { mode = "host" } if mode == c.networkMode { return true } } return false } // DriverChecker is a FeasibilityChecker which returns whether a node has the // drivers necessary to scheduler a task group. type DriverChecker struct { ctx Context drivers map[string]struct{} } // NewDriverChecker creates a DriverChecker from a set of drivers func NewDriverChecker(ctx Context, drivers map[string]struct{}) *DriverChecker { return &DriverChecker{ ctx: ctx, drivers: drivers, } } func (c *DriverChecker) SetDrivers(d map[string]struct{}) { c.drivers = d } func (c *DriverChecker) Feasible(option *structs.Node) bool { // Use this node if possible if c.hasDrivers(option) { return true } c.ctx.Metrics().FilterNode(option, FilterConstraintDrivers) return false } // hasDrivers is used to check if the node has all the appropriate // drivers for this task group. Drivers are registered as node attribute // like "driver.docker=1" with their corresponding version. func (c *DriverChecker) hasDrivers(option *structs.Node) bool { for driver := range c.drivers { driverStr := fmt.Sprintf("driver.%s", driver) // COMPAT: Remove in 0.10: As of Nomad 0.8, nodes have a DriverInfo that // corresponds with every driver. As a Nomad server might be on a later // version than a Nomad client, we need to check for compatibility here // to verify the client supports this. if driverInfo, ok := option.Drivers[driver]; ok { if driverInfo == nil { c.ctx.Logger().Named("driver_checker").Warn("node has no driver info set", "node_id", option.ID, "driver", driver) return false } if driverInfo.Detected && driverInfo.Healthy { continue } else { return false } } value, ok := option.Attributes[driverStr] if !ok { return false } enabled, err := strconv.ParseBool(value) if err != nil { c.ctx.Logger().Named("driver_checker").Warn("node has invalid driver setting", "node_id", option.ID, "driver", driver, "val", value) return false } if !enabled { return false } } return true } // DistinctHostsIterator is a FeasibleIterator which returns nodes that pass the // distinct_hosts constraint. The constraint ensures that multiple allocations // do not exist on the same node. type DistinctHostsIterator struct { ctx Context source FeasibleIterator tg *structs.TaskGroup job *structs.Job // Store whether the Job or TaskGroup has a distinct_hosts constraints so // they don't have to be calculated every time Next() is called. tgDistinctHosts bool jobDistinctHosts bool } // NewDistinctHostsIterator creates a DistinctHostsIterator from a source. func NewDistinctHostsIterator(ctx Context, source FeasibleIterator) *DistinctHostsIterator { return &DistinctHostsIterator{ ctx: ctx, source: source, } } func (iter *DistinctHostsIterator) SetTaskGroup(tg *structs.TaskGroup) { iter.tg = tg iter.tgDistinctHosts = iter.hasDistinctHostsConstraint(tg.Constraints) } func (iter *DistinctHostsIterator) SetJob(job *structs.Job) { iter.job = job iter.jobDistinctHosts = iter.hasDistinctHostsConstraint(job.Constraints) } func (iter *DistinctHostsIterator) hasDistinctHostsConstraint(constraints []*structs.Constraint) bool { for _, con := range constraints { if con.Operand == structs.ConstraintDistinctHosts { return true } } return false } func (iter *DistinctHostsIterator) Next() *structs.Node { for { // Get the next option from the source option := iter.source.Next() // Hot-path if the option is nil or there are no distinct_hosts or // distinct_property constraints. hosts := iter.jobDistinctHosts || iter.tgDistinctHosts if option == nil || !hosts { return option } // Check if the host constraints are satisfied if !iter.satisfiesDistinctHosts(option) { iter.ctx.Metrics().FilterNode(option, structs.ConstraintDistinctHosts) continue } return option } } // satisfiesDistinctHosts checks if the node satisfies a distinct_hosts // constraint either specified at the job level or the TaskGroup level. func (iter *DistinctHostsIterator) satisfiesDistinctHosts(option *structs.Node) bool { // Check if there is no constraint set. if !(iter.jobDistinctHosts || iter.tgDistinctHosts) { return true } // Get the proposed allocations proposed, err := iter.ctx.ProposedAllocs(option.ID) if err != nil { iter.ctx.Logger().Named("distinct_hosts").Error("failed to get proposed allocations", "error", err) return false } // Skip the node if the task group has already been allocated on it. for _, alloc := range proposed { // If the job has a distinct_hosts constraint we only need an alloc // collision on the JobID but if the constraint is on the TaskGroup then // we need both a job and TaskGroup collision. jobCollision := alloc.JobID == iter.job.ID taskCollision := alloc.TaskGroup == iter.tg.Name if iter.jobDistinctHosts && jobCollision || jobCollision && taskCollision { return false } } return true } func (iter *DistinctHostsIterator) Reset() { iter.source.Reset() } // DistinctPropertyIterator is a FeasibleIterator which returns nodes that pass the // distinct_property constraint. The constraint ensures that multiple allocations // do not use the same value of the given property. type DistinctPropertyIterator struct { ctx Context source FeasibleIterator tg *structs.TaskGroup job *structs.Job hasDistinctPropertyConstraints bool jobPropertySets []*propertySet groupPropertySets map[string][]*propertySet } // NewDistinctPropertyIterator creates a DistinctPropertyIterator from a source. func NewDistinctPropertyIterator(ctx Context, source FeasibleIterator) *DistinctPropertyIterator { return &DistinctPropertyIterator{ ctx: ctx, source: source, groupPropertySets: make(map[string][]*propertySet), } } func (iter *DistinctPropertyIterator) SetTaskGroup(tg *structs.TaskGroup) { iter.tg = tg // Build the property set at the taskgroup level if _, ok := iter.groupPropertySets[tg.Name]; !ok { for _, c := range tg.Constraints { if c.Operand != structs.ConstraintDistinctProperty { continue } pset := NewPropertySet(iter.ctx, iter.job) pset.SetTGConstraint(c, tg.Name) iter.groupPropertySets[tg.Name] = append(iter.groupPropertySets[tg.Name], pset) } } // Check if there is a distinct property iter.hasDistinctPropertyConstraints = len(iter.jobPropertySets) != 0 || len(iter.groupPropertySets[tg.Name]) != 0 } func (iter *DistinctPropertyIterator) SetJob(job *structs.Job) { iter.job = job // Build the property set at the job level for _, c := range job.Constraints { if c.Operand != structs.ConstraintDistinctProperty { continue } pset := NewPropertySet(iter.ctx, job) pset.SetJobConstraint(c) iter.jobPropertySets = append(iter.jobPropertySets, pset) } } func (iter *DistinctPropertyIterator) Next() *structs.Node { for { // Get the next option from the source option := iter.source.Next() // Hot path if there is nothing to check if option == nil || !iter.hasDistinctPropertyConstraints { return option } // Check if the constraints are met if !iter.satisfiesProperties(option, iter.jobPropertySets) || !iter.satisfiesProperties(option, iter.groupPropertySets[iter.tg.Name]) { continue } return option } } // satisfiesProperties returns whether the option satisfies the set of // properties. If not it will be filtered. func (iter *DistinctPropertyIterator) satisfiesProperties(option *structs.Node, set []*propertySet) bool { for _, ps := range set { if satisfies, reason := ps.SatisfiesDistinctProperties(option, iter.tg.Name); !satisfies { iter.ctx.Metrics().FilterNode(option, reason) return false } } return true } func (iter *DistinctPropertyIterator) Reset() { iter.source.Reset() for _, ps := range iter.jobPropertySets { ps.PopulateProposed() } for _, sets := range iter.groupPropertySets { for _, ps := range sets { ps.PopulateProposed() } } } // ConstraintChecker is a FeasibilityChecker which returns nodes that match a // given set of constraints. This is used to filter on job, task group, and task // constraints. type ConstraintChecker struct { ctx Context constraints []*structs.Constraint } // NewConstraintChecker creates a ConstraintChecker for a set of constraints func NewConstraintChecker(ctx Context, constraints []*structs.Constraint) *ConstraintChecker { return &ConstraintChecker{ ctx: ctx, constraints: constraints, } } func (c *ConstraintChecker) SetConstraints(constraints []*structs.Constraint) { c.constraints = constraints } func (c *ConstraintChecker) Feasible(option *structs.Node) bool { // Use this node if possible for _, constraint := range c.constraints { if !c.meetsConstraint(constraint, option) { c.ctx.Metrics().FilterNode(option, constraint.String()) return false } } return true } func (c *ConstraintChecker) meetsConstraint(constraint *structs.Constraint, option *structs.Node) bool { // Resolve the targets. Targets that are not present are treated as `nil`. // This is to allow for matching constraints where a target is not present. lVal, lOk := resolveTarget(constraint.LTarget, option) rVal, rOk := resolveTarget(constraint.RTarget, option) // Check if satisfied return checkConstraint(c.ctx, constraint.Operand, lVal, rVal, lOk, rOk) } // resolveTarget is used to resolve the LTarget and RTarget of a Constraint. func resolveTarget(target string, node *structs.Node) (interface{}, bool) { // If no prefix, this must be a literal value if !strings.HasPrefix(target, "${") { return target, true } // Handle the interpolations switch { case "${node.unique.id}" == target: return node.ID, true case "${node.datacenter}" == target: return node.Datacenter, true case "${node.unique.name}" == target: return node.Name, true case "${node.class}" == target: return node.NodeClass, true case strings.HasPrefix(target, "${attr."): attr := strings.TrimSuffix(strings.TrimPrefix(target, "${attr."), "}") val, ok := node.Attributes[attr] return val, ok case strings.HasPrefix(target, "${meta."): meta := strings.TrimSuffix(strings.TrimPrefix(target, "${meta."), "}") val, ok := node.Meta[meta] return val, ok default: return nil, false } } // checkConstraint checks if a constraint is satisfied. The lVal and rVal // interfaces may be nil. func checkConstraint(ctx Context, operand string, lVal, rVal interface{}, lFound, rFound bool) bool { // Check for constraints not handled by this checker. switch operand { case structs.ConstraintDistinctHosts, structs.ConstraintDistinctProperty: return true default: break } switch operand { case "=", "==", "is": return lFound && rFound && reflect.DeepEqual(lVal, rVal) case "!=", "not": return !reflect.DeepEqual(lVal, rVal) case "<", "<=", ">", ">=": return lFound && rFound && checkLexicalOrder(operand, lVal, rVal) case structs.ConstraintAttributeIsSet: return lFound case structs.ConstraintAttributeIsNotSet: return !lFound case structs.ConstraintVersion: parser := newVersionConstraintParser(ctx) return lFound && rFound && checkVersionMatch(ctx, parser, lVal, rVal) case structs.ConstraintSemver: parser := newSemverConstraintParser(ctx) return lFound && rFound && checkVersionMatch(ctx, parser, lVal, rVal) case structs.ConstraintRegex: return lFound && rFound && checkRegexpMatch(ctx, lVal, rVal) case structs.ConstraintSetContains, structs.ConstraintSetContainsAll: return lFound && rFound && checkSetContainsAll(ctx, lVal, rVal) case structs.ConstraintSetContainsAny: return lFound && rFound && checkSetContainsAny(lVal, rVal) default: return false } } // checkAffinity checks if a specific affinity is satisfied func checkAffinity(ctx Context, operand string, lVal, rVal interface{}, lFound, rFound bool) bool { return checkConstraint(ctx, operand, lVal, rVal, lFound, rFound) } // checkAttributeAffinity checks if an affinity is satisfied func checkAttributeAffinity(ctx Context, operand string, lVal, rVal *psstructs.Attribute, lFound, rFound bool) bool { return checkAttributeConstraint(ctx, operand, lVal, rVal, lFound, rFound) } // checkLexicalOrder is used to check for lexical ordering func checkLexicalOrder(op string, lVal, rVal interface{}) bool { // Ensure the values are strings lStr, ok := lVal.(string) if !ok { return false } rStr, ok := rVal.(string) if !ok { return false } switch op { case "<": return lStr < rStr case "<=": return lStr <= rStr case ">": return lStr > rStr case ">=": return lStr >= rStr default: return false } } // checkVersionMatch is used to compare a version on the // left hand side with a set of constraints on the right hand side func checkVersionMatch(ctx Context, parse verConstraintParser, lVal, rVal interface{}) bool { // Parse the version var versionStr string switch v := lVal.(type) { case string: versionStr = v case int: versionStr = fmt.Sprintf("%d", v) default: return false } // Parse the version vers, err := version.NewVersion(versionStr) if err != nil { return false } // Constraint must be a string constraintStr, ok := rVal.(string) if !ok { return false } // Parse the constraints constraints := parse(constraintStr) if constraints == nil { return false } // Check the constraints against the version return constraints.Check(vers) } // checkAttributeVersionMatch is used to compare a version on the // left hand side with a set of constraints on the right hand side func checkAttributeVersionMatch(ctx Context, parse verConstraintParser, lVal, rVal *psstructs.Attribute) bool { // Parse the version var versionStr string if s, ok := lVal.GetString(); ok { versionStr = s } else if i, ok := lVal.GetInt(); ok { versionStr = fmt.Sprintf("%d", i) } else { return false } // Parse the version vers, err := version.NewVersion(versionStr) if err != nil { return false } // Constraint must be a string constraintStr, ok := rVal.GetString() if !ok { return false } // Parse the constraints constraints := parse(constraintStr) if constraints == nil { return false } // Check the constraints against the version return constraints.Check(vers) } // checkRegexpMatch is used to compare a value on the // left hand side with a regexp on the right hand side func checkRegexpMatch(ctx Context, lVal, rVal interface{}) bool { // Ensure left-hand is string lStr, ok := lVal.(string) if !ok { return false } // Regexp must be a string regexpStr, ok := rVal.(string) if !ok { return false } // Check the cache cache := ctx.RegexpCache() re := cache[regexpStr] // Parse the regexp if re == nil { var err error re, err = regexp.Compile(regexpStr) if err != nil { return false } cache[regexpStr] = re } // Look for a match return re.MatchString(lStr) } // checkSetContainsAll is used to see if the left hand side contains the // string on the right hand side func checkSetContainsAll(ctx Context, lVal, rVal interface{}) bool { // Ensure left-hand is string lStr, ok := lVal.(string) if !ok { return false } // Regexp must be a string rStr, ok := rVal.(string) if !ok { return false } input := strings.Split(lStr, ",") lookup := make(map[string]struct{}, len(input)) for _, in := range input { cleaned := strings.TrimSpace(in) lookup[cleaned] = struct{}{} } for _, r := range strings.Split(rStr, ",") { cleaned := strings.TrimSpace(r) if _, ok := lookup[cleaned]; !ok { return false } } return true } // checkSetContainsAny is used to see if the left hand side contains any // values on the right hand side func checkSetContainsAny(lVal, rVal interface{}) bool { // Ensure left-hand is string lStr, ok := lVal.(string) if !ok { return false } // RHS must be a string rStr, ok := rVal.(string) if !ok { return false } input := strings.Split(lStr, ",") lookup := make(map[string]struct{}, len(input)) for _, in := range input { cleaned := strings.TrimSpace(in) lookup[cleaned] = struct{}{} } for _, r := range strings.Split(rStr, ",") { cleaned := strings.TrimSpace(r) if _, ok := lookup[cleaned]; ok { return true } } return false } // FeasibilityWrapper is a FeasibleIterator which wraps both job and task group // FeasibilityCheckers in which feasibility checking can be skipped if the // computed node class has previously been marked as eligible or ineligible. type FeasibilityWrapper struct { ctx Context source FeasibleIterator jobCheckers []FeasibilityChecker tgCheckers []FeasibilityChecker tgAvailable []FeasibilityChecker tg string } // NewFeasibilityWrapper returns a FeasibleIterator based on the passed source // and FeasibilityCheckers. func NewFeasibilityWrapper(ctx Context, source FeasibleIterator, jobCheckers, tgCheckers, tgAvailable []FeasibilityChecker) *FeasibilityWrapper { return &FeasibilityWrapper{ ctx: ctx, source: source, jobCheckers: jobCheckers, tgCheckers: tgCheckers, tgAvailable: tgAvailable, } } func (w *FeasibilityWrapper) SetTaskGroup(tg string) { w.tg = tg } func (w *FeasibilityWrapper) Reset() { w.source.Reset() } // Next returns an eligible node, only running the FeasibilityCheckers as needed // based on the sources computed node class. func (w *FeasibilityWrapper) Next() *structs.Node { evalElig := w.ctx.Eligibility() metrics := w.ctx.Metrics() OUTER: for { // Get the next option from the source option := w.source.Next() if option == nil { return nil } // Check if the job has been marked as eligible or ineligible. jobEscaped, jobUnknown := false, false switch evalElig.JobStatus(option.ComputedClass) { case EvalComputedClassIneligible: // Fast path the ineligible case metrics.FilterNode(option, "computed class ineligible") continue case EvalComputedClassEscaped: jobEscaped = true case EvalComputedClassUnknown: jobUnknown = true } // Run the job feasibility checks. for _, check := range w.jobCheckers { feasible := check.Feasible(option) if !feasible { // If the job hasn't escaped, set it to be ineligible since it // failed a job check. if !jobEscaped { evalElig.SetJobEligibility(false, option.ComputedClass) } continue OUTER } } // Set the job eligibility if the constraints weren't escaped and it // hasn't been set before. if !jobEscaped && jobUnknown { evalElig.SetJobEligibility(true, option.ComputedClass) } // Check if the task group has been marked as eligible or ineligible. tgEscaped, tgUnknown := false, false switch evalElig.TaskGroupStatus(w.tg, option.ComputedClass) { case EvalComputedClassIneligible: // Fast path the ineligible case metrics.FilterNode(option, "computed class ineligible") continue case EvalComputedClassEligible: // Fast path the eligible case if w.available(option) { return option } // We match the class but are temporarily unavailable continue OUTER case EvalComputedClassEscaped: tgEscaped = true case EvalComputedClassUnknown: tgUnknown = true } // Run the task group feasibility checks. for _, check := range w.tgCheckers { feasible := check.Feasible(option) if !feasible { // If the task group hasn't escaped, set it to be ineligible // since it failed a check. if !tgEscaped { evalElig.SetTaskGroupEligibility(false, w.tg, option.ComputedClass) } continue OUTER } } // Set the task group eligibility if the constraints weren't escaped and // it hasn't been set before. if !tgEscaped && tgUnknown { evalElig.SetTaskGroupEligibility(true, w.tg, option.ComputedClass) } // tgAvailable handlers are available transiently, so we test them without // affecting the computed class if !w.available(option) { continue OUTER } return option } } // available checks transient feasibility checkers which depend on changing conditions, // e.g. the health status of a plugin or driver func (w *FeasibilityWrapper) available(option *structs.Node) bool { // If we don't have any availability checks, we're available if len(w.tgAvailable) == 0 { return true } for _, check := range w.tgAvailable { if !check.Feasible(option) { return false } } return true } // DeviceChecker is a FeasibilityChecker which returns whether a node has the // devices necessary to scheduler a task group. type DeviceChecker struct { ctx Context // required is the set of requested devices that must exist on the node required []*structs.RequestedDevice // requiresDevices indicates if the task group requires devices requiresDevices bool } // NewDeviceChecker creates a DeviceChecker func NewDeviceChecker(ctx Context) *DeviceChecker { return &DeviceChecker{ ctx: ctx, } } func (c *DeviceChecker) SetTaskGroup(tg *structs.TaskGroup) { c.required = nil for _, task := range tg.Tasks { c.required = append(c.required, task.Resources.Devices...) } c.requiresDevices = len(c.required) != 0 } func (c *DeviceChecker) Feasible(option *structs.Node) bool { if c.hasDevices(option) { return true } c.ctx.Metrics().FilterNode(option, FilterConstraintDevices) return false } func (c *DeviceChecker) hasDevices(option *structs.Node) bool { if !c.requiresDevices { return true } // COMPAT(0.11): Remove in 0.11 // The node does not have the new resources object so it can not have any // devices if option.NodeResources == nil { return false } // Check if the node has any devices nodeDevs := option.NodeResources.Devices if len(nodeDevs) == 0 { return false } // Create a mapping of node devices to the remaining count available := make(map[*structs.NodeDeviceResource]uint64, len(nodeDevs)) for _, d := range nodeDevs { var healthy uint64 = 0 for _, instance := range d.Instances { if instance.Healthy { healthy++ } } if healthy != 0 { available[d] = healthy } } // Go through the required devices trying to find matches OUTER: for _, req := range c.required { // Determine how many there are to place desiredCount := req.Count // Go through the device resources and see if we have a match for d, unused := range available { if unused == 0 { // Depleted continue } // First check we have enough instances of the device since this is // cheaper than checking the constraints if unused < desiredCount { continue } // Check the constraints if nodeDeviceMatches(c.ctx, d, req) { // Consume the instances available[d] -= desiredCount // Move on to the next request continue OUTER } } // We couldn't match the request for the device return false } // Only satisfied if there are no more devices to place return true } // nodeDeviceMatches checks if the device matches the request and its // constraints. It doesn't check the count. func nodeDeviceMatches(ctx Context, d *structs.NodeDeviceResource, req *structs.RequestedDevice) bool { if !d.ID().Matches(req.ID()) { return false } // There are no constraints to consider if len(req.Constraints) == 0 { return true } for _, c := range req.Constraints { // Resolve the targets lVal, lOk := resolveDeviceTarget(c.LTarget, d) rVal, rOk := resolveDeviceTarget(c.RTarget, d) // Check if satisfied if !checkAttributeConstraint(ctx, c.Operand, lVal, rVal, lOk, rOk) { return false } } return true } // resolveDeviceTarget is used to resolve the LTarget and RTarget of a Constraint // when used on a device func resolveDeviceTarget(target string, d *structs.NodeDeviceResource) (*psstructs.Attribute, bool) { // If no prefix, this must be a literal value if !strings.HasPrefix(target, "${") { return psstructs.ParseAttribute(target), true } // Handle the interpolations switch { case "${device.model}" == target: return psstructs.NewStringAttribute(d.Name), true case "${device.vendor}" == target: return psstructs.NewStringAttribute(d.Vendor), true case "${device.type}" == target: return psstructs.NewStringAttribute(d.Type), true case strings.HasPrefix(target, "${device.attr."): attr := strings.TrimPrefix(target, "${device.attr.") attr = strings.TrimSuffix(attr, "}") val, ok := d.Attributes[attr] return val, ok default: return nil, false } } // checkAttributeConstraint checks if a constraint is satisfied. nil equality // comparisons are considered to be false. func checkAttributeConstraint(ctx Context, operand string, lVal, rVal *psstructs.Attribute, lFound, rFound bool) bool { // Check for constraints not handled by this checker. switch operand { case structs.ConstraintDistinctHosts, structs.ConstraintDistinctProperty: return true default: break } switch operand { case "!=", "not": // Neither value was provided, nil != nil == false if !(lFound || rFound) { return false } // Only 1 value was provided, therefore nil != some == true if lFound != rFound { return true } // Both values were provided, so actually compare them v, ok := lVal.Compare(rVal) if !ok { return false } return v != 0 case "<", "<=", ">", ">=", "=", "==", "is": if !(lFound && rFound) { return false } v, ok := lVal.Compare(rVal) if !ok { return false } switch operand { case "is", "==", "=": return v == 0 case "<": return v == -1 case "<=": return v != 1 case ">": return v == 1 case ">=": return v != -1 default: return false } case structs.ConstraintVersion: if !(lFound && rFound) { return false } parser := newVersionConstraintParser(ctx) return checkAttributeVersionMatch(ctx, parser, lVal, rVal) case structs.ConstraintSemver: if !(lFound && rFound) { return false } parser := newSemverConstraintParser(ctx) return checkAttributeVersionMatch(ctx, parser, lVal, rVal) case structs.ConstraintRegex: if !(lFound && rFound) { return false } ls, ok := lVal.GetString() rs, ok2 := rVal.GetString() if !ok || !ok2 { return false } return checkRegexpMatch(ctx, ls, rs) case structs.ConstraintSetContains, structs.ConstraintSetContainsAll: if !(lFound && rFound) { return false } ls, ok := lVal.GetString() rs, ok2 := rVal.GetString() if !ok || !ok2 { return false } return checkSetContainsAll(ctx, ls, rs) case structs.ConstraintSetContainsAny: if !(lFound && rFound) { return false } ls, ok := lVal.GetString() rs, ok2 := rVal.GetString() if !ok || !ok2 { return false } return checkSetContainsAny(ls, rs) case structs.ConstraintAttributeIsSet: return lFound case structs.ConstraintAttributeIsNotSet: return !lFound default: return false } } // VerConstraints is the interface implemented by both go-verson constraints // and semver constraints. type VerConstraints interface { Check(v *version.Version) bool String() string } // verConstraintParser returns a version constraints implementation (go-version // or semver). type verConstraintParser func(verConstraint string) VerConstraints func newVersionConstraintParser(ctx Context) verConstraintParser { cache := ctx.VersionConstraintCache() return func(cstr string) VerConstraints { if c := cache[cstr]; c != nil { return c } constraints, err := version.NewConstraint(cstr) if err != nil { return nil } cache[cstr] = constraints return constraints } } func newSemverConstraintParser(ctx Context) verConstraintParser { cache := ctx.SemverConstraintCache() return func(cstr string) VerConstraints { if c := cache[cstr]; c != nil { return c } constraints, err := semver.NewConstraint(cstr) if err != nil { return nil } cache[cstr] = constraints return constraints } }