open-nomad/nomad/structs/funcs.go

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

582 lines
15 KiB
Go
Raw Normal View History

2015-08-05 00:19:05 +00:00
package structs
2015-09-07 22:08:50 +00:00
import (
2018-01-12 21:58:44 +00:00
"crypto/subtle"
"encoding/base64"
"encoding/binary"
2015-09-07 22:08:50 +00:00
"fmt"
"math"
"sort"
2018-10-02 20:36:04 +00:00
"strconv"
"strings"
multierror "github.com/hashicorp/go-multierror"
"github.com/hashicorp/go-set"
2017-08-20 21:30:27 +00:00
lru "github.com/hashicorp/golang-lru"
"github.com/hashicorp/nomad/acl"
"golang.org/x/crypto/blake2b"
2015-09-07 22:08:50 +00:00
)
2015-08-13 18:54:59 +00:00
// MergeMultierrorWarnings takes job warnings and canonicalize warnings and
// merges them into a returnable string. Both the errors may be nil.
func MergeMultierrorWarnings(errs ...error) string {
if len(errs) == 0 {
2017-09-14 04:48:52 +00:00
return ""
}
var mErr multierror.Error
_ = multierror.Append(&mErr, errs...)
mErr.ErrorFormat = warningsFormatter
return mErr.Error()
}
// warningsFormatter is used to format job warnings
func warningsFormatter(es []error) string {
sb := strings.Builder{}
sb.WriteString(fmt.Sprintf("%d warning(s):\n", len(es)))
for i := range es {
sb.WriteString(fmt.Sprintf("\n* %s", es[i]))
}
return sb.String()
}
2015-08-05 00:19:05 +00:00
// RemoveAllocs is used to remove any allocs with the given IDs
// from the list of allocations
func RemoveAllocs(allocs []*Allocation, remove []*Allocation) []*Allocation {
if len(remove) == 0 {
return allocs
}
2015-08-05 00:19:05 +00:00
// Convert remove into a set
removeSet := make(map[string]struct{})
for _, remove := range remove {
removeSet[remove.ID] = struct{}{}
2015-08-05 00:19:05 +00:00
}
r := make([]*Allocation, 0, len(allocs))
for _, alloc := range allocs {
if _, ok := removeSet[alloc.ID]; !ok {
r = append(r, alloc)
2015-08-05 00:19:05 +00:00
}
}
return r
2015-08-05 00:19:05 +00:00
}
2015-08-05 00:28:19 +00:00
func AllocSubset(allocs []*Allocation, subset []*Allocation) bool {
if len(subset) == 0 {
return true
}
// Convert allocs into a map
allocMap := make(map[string]struct{})
for _, alloc := range allocs {
allocMap[alloc.ID] = struct{}{}
}
for _, alloc := range subset {
if _, ok := allocMap[alloc.ID]; !ok {
return false
}
}
return true
}
// FilterTerminalAllocs filters out all allocations in a terminal state and
// returns the latest terminal allocations.
func FilterTerminalAllocs(allocs []*Allocation) ([]*Allocation, map[string]*Allocation) {
terminalAllocsByName := make(map[string]*Allocation)
2015-08-23 01:27:51 +00:00
n := len(allocs)
2015-08-23 01:27:51 +00:00
for i := 0; i < n; i++ {
if allocs[i].TerminalStatus() {
// Add the allocation to the terminal allocs map if it's not already
// added or has a higher create index than the one which is
// currently present.
alloc, ok := terminalAllocsByName[allocs[i].Name]
if !ok || alloc.CreateIndex < allocs[i].CreateIndex {
terminalAllocsByName[allocs[i].Name] = allocs[i]
}
// Remove the allocation
2015-08-23 01:27:51 +00:00
allocs[i], allocs[n-1] = allocs[n-1], nil
i--
n--
}
}
return allocs[:n], terminalAllocsByName
2015-08-23 01:27:51 +00:00
}
// SplitTerminalAllocs splits allocs into non-terminal and terminal allocs, with
// the terminal allocs indexed by node->alloc.name.
func SplitTerminalAllocs(allocs []*Allocation) ([]*Allocation, TerminalByNodeByName) {
var alive []*Allocation
var terminal = make(TerminalByNodeByName)
for _, alloc := range allocs {
if alloc.TerminalStatus() {
terminal.Set(alloc)
} else {
alive = append(alive, alloc)
}
}
return alive, terminal
}
// TerminalByNodeByName is a map of NodeID->Allocation.Name->Allocation used by
// the sysbatch scheduler for locating the most up-to-date terminal allocations.
type TerminalByNodeByName map[string]map[string]*Allocation
func (a TerminalByNodeByName) Set(allocation *Allocation) {
node := allocation.NodeID
name := allocation.Name
if _, exists := a[node]; !exists {
a[node] = make(map[string]*Allocation)
}
if previous, exists := a[node][name]; !exists {
a[node][name] = allocation
} else if previous.CreateIndex < allocation.CreateIndex {
// keep the newest version of the terminal alloc for the coordinate
a[node][name] = allocation
}
}
func (a TerminalByNodeByName) Get(nodeID, name string) (*Allocation, bool) {
if _, exists := a[nodeID]; !exists {
return nil, false
}
if _, exists := a[nodeID][name]; !exists {
return nil, false
}
return a[nodeID][name], true
}
// AllocsFit checks if a given set of allocations will fit on a node.
// The netIdx can optionally be provided if its already been computed.
// If the netIdx is provided, it is assumed that the client has already
// ensured there are no collisions. If checkDevices is set to true, we check if
// there is a device oversubscription.
func AllocsFit(node *Node, allocs []*Allocation, netIdx *NetworkIndex, checkDevices bool) (bool, string, *ComparableResources, error) {
core: fix node reservation scoring The BinPackIter accounted for node reservations twice when scoring nodes which could bias scores toward nodes with reservations. Pseudo-code for previous algorithm: ``` proposed = reservedResources + sum(allocsResources) available = nodeResources - reservedResources score = 1 - (proposed / available) ``` The node's reserved resources are added to the total resources used by allocations, and then the node's reserved resources are later substracted from the node's overall resources. The new algorithm is: ``` proposed = sum(allocResources) available = nodeResources - reservedResources score = 1 - (proposed / available) ``` The node's reserved resources are no longer added to the total resources used by allocations. My guess as to how this bug happened is that the resource utilization variable (`util`) is calculated and returned by the `AllocsFit` function which needs to take reserved resources into account as a basic feasibility check. To avoid re-calculating alloc resource usage (because there may be a large number of allocs), we reused `util` in the `ScoreFit` function. `ScoreFit` properly accounts for reserved resources by subtracting them from the node's overall resources. However since `util` _also_ took reserved resources into account the score would be incorrect. Prior to the fix the added test output: ``` Node: reserved Score: 1.0000 Node: reserved2 Score: 1.0000 Node: no-reserved Score: 0.9741 ``` The scores being 1.0 for *both* nodes with reserved resources is a good hint something is wrong as they should receive different scores. Upon further inspection the double accounting of reserved resources caused their scores to be >1.0 and clamped. After the fix the added test outputs: ``` Node: no-reserved Score: 0.9741 Node: reserved Score: 0.9480 Node: reserved2 Score: 0.8717 ```
2020-04-15 21:24:47 +00:00
// Compute the allocs' utilization from zero
2018-10-04 21:33:09 +00:00
used := new(ComparableResources)
2015-08-05 00:48:24 +00:00
reservedCores := map[uint16]struct{}{}
var coreOverlap bool
2015-08-05 00:48:24 +00:00
// For each alloc, add the resources
for _, alloc := range allocs {
// Do not consider the resource impact of terminal allocations
if alloc.ClientTerminalStatus() {
continue
}
cr := alloc.ComparableResources()
used.Add(cr)
// Adding the comparable resource unions reserved core sets, need to check if reserved cores overlap
for _, core := range cr.Flattened.Cpu.ReservedCores {
if _, ok := reservedCores[core]; ok {
coreOverlap = true
} else {
reservedCores[core] = struct{}{}
}
}
}
if coreOverlap {
return false, "cores", used, nil
2015-08-05 00:48:24 +00:00
}
core: fix node reservation scoring The BinPackIter accounted for node reservations twice when scoring nodes which could bias scores toward nodes with reservations. Pseudo-code for previous algorithm: ``` proposed = reservedResources + sum(allocsResources) available = nodeResources - reservedResources score = 1 - (proposed / available) ``` The node's reserved resources are added to the total resources used by allocations, and then the node's reserved resources are later substracted from the node's overall resources. The new algorithm is: ``` proposed = sum(allocResources) available = nodeResources - reservedResources score = 1 - (proposed / available) ``` The node's reserved resources are no longer added to the total resources used by allocations. My guess as to how this bug happened is that the resource utilization variable (`util`) is calculated and returned by the `AllocsFit` function which needs to take reserved resources into account as a basic feasibility check. To avoid re-calculating alloc resource usage (because there may be a large number of allocs), we reused `util` in the `ScoreFit` function. `ScoreFit` properly accounts for reserved resources by subtracting them from the node's overall resources. However since `util` _also_ took reserved resources into account the score would be incorrect. Prior to the fix the added test output: ``` Node: reserved Score: 1.0000 Node: reserved2 Score: 1.0000 Node: no-reserved Score: 0.9741 ``` The scores being 1.0 for *both* nodes with reserved resources is a good hint something is wrong as they should receive different scores. Upon further inspection the double accounting of reserved resources caused their scores to be >1.0 and clamped. After the fix the added test outputs: ``` Node: no-reserved Score: 0.9741 Node: reserved Score: 0.9480 Node: reserved2 Score: 0.8717 ```
2020-04-15 21:24:47 +00:00
// Check that the node resources (after subtracting reserved) are a
// super set of those that are being allocated
available := node.ComparableResources()
available.Subtract(node.ComparableReservedResources())
if superset, dimension := available.Superset(used); !superset {
return false, dimension, used, nil
2015-08-05 00:48:24 +00:00
}
// Create the network index if missing
if netIdx == nil {
netIdx = NewNetworkIndex()
defer netIdx.Release()
core: merge reserved_ports into host_networks (#13651) Fixes #13505 This fixes #13505 by treating reserved_ports like we treat a lot of jobspec settings: merging settings from more global stanzas (client.reserved.reserved_ports) "down" into more specific stanzas (client.host_networks[].reserved_ports). As discussed in #13505 there are other options, and since it's totally broken right now we have some flexibility: Treat overlapping reserved_ports on addresses as invalid and refuse to start agents. However, I'm not sure there's a cohesive model we want to publish right now since so much 0.9-0.12 compat code still exists! We would have to explain to folks that if their -network-interface and host_network addresses overlapped, they could only specify reserved_ports in one place or the other?! It gets ugly. Use the global client.reserved.reserved_ports value as the default and treat host_network[].reserverd_ports as overrides. My first suggestion in the issue, but @groggemans made me realize the addresses on the agent's interface (as configured by -network-interface) may overlap with host_networks, so you'd need to remove the global reserved_ports from addresses shared with a shared network?! This seemed really confusing and subtle for users to me. So I think "merging down" creates the most expressive yet understandable approach. I've played around with it a bit, and it doesn't seem too surprising. The only frustrating part is how difficult it is to observe the available addresses and ports on a node! However that's a job for another PR.
2022-07-12 21:40:25 +00:00
if err := netIdx.SetNode(node); err != nil {
// To maintain backward compatibility with when SetNode
// returned collision+reason like AddAllocs, return
// this as a reason instead of an error.
return false, fmt.Sprintf("reserved node port collision: %v", err), used, nil
}
if collision, reason := netIdx.AddAllocs(allocs); collision {
return false, fmt.Sprintf("reserved alloc port collision: %v", reason), used, nil
}
}
// Check if the network is overcommitted
if netIdx.Overcommitted() {
return false, "bandwidth exceeded", used, nil
2015-08-05 00:48:24 +00:00
}
// Check devices
if checkDevices {
accounter := NewDeviceAccounter(node)
if accounter.AddAllocs(allocs) {
return false, "device oversubscribed", used, nil
}
}
2015-08-05 00:48:24 +00:00
// Allocations fit!
return true, "", used, nil
2015-08-13 18:54:59 +00:00
}
2020-04-24 14:47:43 +00:00
func computeFreePercentage(node *Node, util *ComparableResources) (freePctCpu, freePctRam float64) {
2018-10-02 20:36:04 +00:00
// COMPAT(0.11): Remove in 0.11
reserved := node.ComparableReservedResources()
res := node.ComparableResources()
2015-08-13 18:54:59 +00:00
// Determine the node availability
2018-10-02 20:36:04 +00:00
nodeCpu := float64(res.Flattened.Cpu.CpuShares)
nodeMem := float64(res.Flattened.Memory.MemoryMB)
if reserved != nil {
nodeCpu -= float64(reserved.Flattened.Cpu.CpuShares)
nodeMem -= float64(reserved.Flattened.Memory.MemoryMB)
2015-08-13 18:54:59 +00:00
}
// Compute the free percentage
2020-04-24 14:47:43 +00:00
freePctCpu = 1 - (float64(util.Flattened.Cpu.CpuShares) / nodeCpu)
freePctRam = 1 - (float64(util.Flattened.Memory.MemoryMB) / nodeMem)
return freePctCpu, freePctRam
}
// ScoreFitBinPack computes a fit score to achieve pinbacking behavior.
// Score is in [0, 18]
//
// It's the BestFit v3 on the Google work published here:
// http://www.columbia.edu/~cs2035/courses/ieor4405.S13/datacenter_scheduling.ppt
func ScoreFitBinPack(node *Node, util *ComparableResources) float64 {
freePctCpu, freePctRam := computeFreePercentage(node, util)
2015-08-13 18:54:59 +00:00
// Total will be "maximized" the smaller the value is.
// At 100% utilization, the total is 2, while at 0% util it is 20.
total := math.Pow(10, freePctCpu) + math.Pow(10, freePctRam)
// Invert so that the "maximized" total represents a high-value
// score. Because the floor is 20, we simply use that as an anchor.
// This means at a perfect fit, we return 18 as the score.
score := 20.0 - total
// Bound the score, just in case
// If the score is over 18, that means we've overfit the node.
if score > 18.0 {
score = 18.0
} else if score < 0 {
score = 0
}
return score
2015-08-05 00:48:24 +00:00
}
2015-09-07 22:08:50 +00:00
2021-07-16 05:49:15 +00:00
// ScoreFitSpread computes a fit score to achieve spread behavior.
2020-04-24 14:47:43 +00:00
// Score is in [0, 18]
//
// This is equivalent to Worst Fit of
// http://www.columbia.edu/~cs2035/courses/ieor4405.S13/datacenter_scheduling.ppt
func ScoreFitSpread(node *Node, util *ComparableResources) float64 {
freePctCpu, freePctRam := computeFreePercentage(node, util)
total := math.Pow(10, freePctCpu) + math.Pow(10, freePctRam)
score := total - 2
if score > 18.0 {
score = 18.0
} else if score < 0 {
score = 0
}
return score
}
2016-02-11 17:08:20 +00:00
func CopySliceConstraints(s []*Constraint) []*Constraint {
l := len(s)
if l == 0 {
return nil
}
c := make([]*Constraint, l)
for i, v := range s {
c[i] = v.Copy()
}
return c
}
2016-08-17 00:50:14 +00:00
2018-07-16 13:30:58 +00:00
func CopySliceAffinities(s []*Affinity) []*Affinity {
l := len(s)
if l == 0 {
return nil
}
c := make([]*Affinity, l)
for i, v := range s {
c[i] = v.Copy()
}
return c
}
2018-07-16 17:52:24 +00:00
func CopySliceSpreads(s []*Spread) []*Spread {
l := len(s)
if l == 0 {
return nil
}
c := make([]*Spread, l)
for i, v := range s {
c[i] = v.Copy()
}
return c
}
func CopySliceSpreadTarget(s []*SpreadTarget) []*SpreadTarget {
l := len(s)
if l == 0 {
return nil
}
c := make([]*SpreadTarget, l)
for i, v := range s {
c[i] = v.Copy()
}
return c
}
func CopySliceNodeScoreMeta(s []*NodeScoreMeta) []*NodeScoreMeta {
l := len(s)
if l == 0 {
return nil
}
c := make([]*NodeScoreMeta, l)
for i, v := range s {
c[i] = v.Copy()
}
return c
}
2016-08-17 00:50:14 +00:00
// VaultPoliciesSet takes the structure returned by VaultPolicies and returns
// the set of required policies
2016-08-18 17:50:47 +00:00
func VaultPoliciesSet(policies map[string]map[string]*Vault) []string {
s := set.New[string](10)
2016-08-17 00:50:14 +00:00
for _, tgp := range policies {
for _, tp := range tgp {
if tp != nil {
s.InsertAll(tp.Policies)
2016-08-17 00:50:14 +00:00
}
}
}
return s.List()
2016-08-17 00:50:14 +00:00
}
// VaultNamespaceSet takes the structure returned by VaultPolicies and
// returns a set of required namespaces
func VaultNamespaceSet(policies map[string]map[string]*Vault) []string {
s := set.New[string](10)
for _, tgp := range policies {
for _, tp := range tgp {
if tp != nil && tp.Namespace != "" {
s.Insert(tp.Namespace)
}
}
}
return s.List()
}
// DenormalizeAllocationJobs is used to attach a job to all allocations that are
// non-terminal and do not have a job already. This is useful in cases where the
// job is normalized.
func DenormalizeAllocationJobs(job *Job, allocs []*Allocation) {
if job != nil {
for _, alloc := range allocs {
if alloc.Job == nil && !alloc.TerminalStatus() {
alloc.Job = job
}
}
}
}
2017-05-31 18:34:46 +00:00
// AllocName returns the name of the allocation given the input.
func AllocName(job, group string, idx uint) string {
return fmt.Sprintf("%s.%s[%d]", job, group, idx)
}
// AllocSuffix returns the alloc index suffix that was added by the AllocName
// function above.
func AllocSuffix(name string) string {
idx := strings.LastIndex(name, "[")
if idx == -1 {
return ""
}
suffix := name[idx:]
return suffix
}
// ACLPolicyListHash returns a consistent hash for a set of policies.
func ACLPolicyListHash(policies []*ACLPolicy) string {
cacheKeyHash, err := blake2b.New256(nil)
if err != nil {
panic(err)
}
for _, policy := range policies {
_, _ = cacheKeyHash.Write([]byte(policy.Name))
_ = binary.Write(cacheKeyHash, binary.BigEndian, policy.ModifyIndex)
}
cacheKey := string(cacheKeyHash.Sum(nil))
return cacheKey
}
2017-08-20 21:30:27 +00:00
// CompileACLObject compiles a set of ACL policies into an ACL object with a cache
func CompileACLObject(cache *lru.TwoQueueCache, policies []*ACLPolicy) (*acl.ACL, error) {
// Sort the policies to ensure consistent ordering
2017-08-23 20:49:08 +00:00
sort.Slice(policies, func(i, j int) bool {
return policies[i].Name < policies[j].Name
})
2017-08-20 21:30:27 +00:00
// Determine the cache key
cacheKey := ACLPolicyListHash(policies)
aclRaw, ok := cache.Get(cacheKey)
if ok {
return aclRaw.(*acl.ACL), nil
}
// Parse the policies
parsed := make([]*acl.Policy, 0, len(policies))
for _, policy := range policies {
p, err := acl.Parse(policy.Rules)
if err != nil {
return nil, fmt.Errorf("failed to parse %q: %v", policy.Name, err)
}
parsed = append(parsed, p)
}
// Create the ACL object
aclObj, err := acl.NewACL(false, parsed)
if err != nil {
return nil, fmt.Errorf("failed to construct ACL: %v", err)
}
// Update the cache
cache.Add(cacheKey, aclObj)
return aclObj, nil
}
2018-01-12 21:58:44 +00:00
// GenerateMigrateToken will create a token for a client to access an
// authenticated volume of another client to migrate data for sticky volumes.
func GenerateMigrateToken(allocID, nodeSecretID string) (string, error) {
h, err := blake2b.New512([]byte(nodeSecretID))
if err != nil {
return "", err
}
_, _ = h.Write([]byte(allocID))
2018-01-12 21:58:44 +00:00
return base64.URLEncoding.EncodeToString(h.Sum(nil)), nil
}
// CompareMigrateToken returns true if two migration tokens can be computed and
// are equal.
func CompareMigrateToken(allocID, nodeSecretID, otherMigrateToken string) bool {
h, err := blake2b.New512([]byte(nodeSecretID))
if err != nil {
return false
}
_, _ = h.Write([]byte(allocID))
2018-01-12 21:58:44 +00:00
otherBytes, err := base64.URLEncoding.DecodeString(otherMigrateToken)
if err != nil {
return false
}
return subtle.ConstantTimeCompare(h.Sum(nil), otherBytes) == 1
}
2018-10-02 20:36:04 +00:00
// ParsePortRanges parses the passed port range string and returns a list of the
// ports. The specification is a comma separated list of either port numbers or
// port ranges. A port number is a single integer and a port range is two
// integers separated by a hyphen. As an example the following spec would
// convert to: ParsePortRanges("10,12-14,16") -> []uint64{10, 12, 13, 14, 16}
func ParsePortRanges(spec string) ([]uint64, error) {
parts := strings.Split(spec, ",")
// Hot path the empty case
if len(parts) == 1 && parts[0] == "" {
return nil, nil
}
ports := make(map[uint64]struct{})
for _, part := range parts {
part = strings.TrimSpace(part)
rangeParts := strings.Split(part, "-")
l := len(rangeParts)
switch l {
case 1:
if val := rangeParts[0]; val == "" {
return nil, fmt.Errorf("can't specify empty port")
} else {
port, err := strconv.ParseUint(val, 10, 0)
if err != nil {
return nil, err
}
core: merge reserved_ports into host_networks (#13651) Fixes #13505 This fixes #13505 by treating reserved_ports like we treat a lot of jobspec settings: merging settings from more global stanzas (client.reserved.reserved_ports) "down" into more specific stanzas (client.host_networks[].reserved_ports). As discussed in #13505 there are other options, and since it's totally broken right now we have some flexibility: Treat overlapping reserved_ports on addresses as invalid and refuse to start agents. However, I'm not sure there's a cohesive model we want to publish right now since so much 0.9-0.12 compat code still exists! We would have to explain to folks that if their -network-interface and host_network addresses overlapped, they could only specify reserved_ports in one place or the other?! It gets ugly. Use the global client.reserved.reserved_ports value as the default and treat host_network[].reserverd_ports as overrides. My first suggestion in the issue, but @groggemans made me realize the addresses on the agent's interface (as configured by -network-interface) may overlap with host_networks, so you'd need to remove the global reserved_ports from addresses shared with a shared network?! This seemed really confusing and subtle for users to me. So I think "merging down" creates the most expressive yet understandable approach. I've played around with it a bit, and it doesn't seem too surprising. The only frustrating part is how difficult it is to observe the available addresses and ports on a node! However that's a job for another PR.
2022-07-12 21:40:25 +00:00
if port > MaxValidPort {
return nil, fmt.Errorf("port must be < %d but found %d", MaxValidPort, port)
}
2018-10-02 20:36:04 +00:00
ports[port] = struct{}{}
}
case 2:
// We are parsing a range
start, err := strconv.ParseUint(rangeParts[0], 10, 0)
if err != nil {
return nil, err
}
end, err := strconv.ParseUint(rangeParts[1], 10, 0)
if err != nil {
return nil, err
}
if end < start {
return nil, fmt.Errorf("invalid range: starting value (%v) less than ending (%v) value", end, start)
}
// Full range validation is below but prevent creating
// arbitrarily large arrays here
if end > MaxValidPort {
return nil, fmt.Errorf("port must be < %d but found %d", MaxValidPort, end)
}
2018-10-02 20:36:04 +00:00
for i := start; i <= end; i++ {
ports[i] = struct{}{}
}
default:
return nil, fmt.Errorf("can only parse single port numbers or port ranges (ex. 80,100-120,150)")
}
}
var results []uint64
for port := range ports {
if port == 0 {
return nil, fmt.Errorf("port must be > 0")
}
if port > MaxValidPort {
return nil, fmt.Errorf("port must be < %d but found %d", MaxValidPort, port)
}
2018-10-02 20:36:04 +00:00
results = append(results, port)
}
sort.Slice(results, func(i, j int) bool {
return results[i] < results[j]
})
return results, nil
}