open-nomad/nomad/structs/funcs.go

450 lines
11 KiB
Go
Raw Normal View History

2015-08-05 00:19:05 +00:00
package structs
2015-09-07 22:08:50 +00:00
import (
2018-01-12 21:58:44 +00:00
"crypto/subtle"
"encoding/base64"
"encoding/binary"
2015-09-07 22:08:50 +00:00
"fmt"
"math"
"sort"
2018-10-02 20:36:04 +00:00
"strconv"
"strings"
multierror "github.com/hashicorp/go-multierror"
2017-08-20 21:30:27 +00:00
lru "github.com/hashicorp/golang-lru"
"github.com/hashicorp/nomad/acl"
"golang.org/x/crypto/blake2b"
2015-09-07 22:08:50 +00:00
)
2015-08-13 18:54:59 +00:00
// MergeMultierrorWarnings takes job warnings and canonicalize warnings and
// merges them into a returnable string. Both the errors may be nil.
2017-09-13 18:38:29 +00:00
func MergeMultierrorWarnings(warnings ...error) string {
var warningMsg multierror.Error
2017-09-13 18:38:29 +00:00
for _, warn := range warnings {
if warn != nil {
multierror.Append(&warningMsg, warn)
}
}
2017-09-14 04:48:52 +00:00
if len(warningMsg.Errors) == 0 {
return ""
}
// Set the formatter
warningMsg.ErrorFormat = warningsFormatter
return warningMsg.Error()
}
// warningsFormatter is used to format job warnings
func warningsFormatter(es []error) string {
points := make([]string, len(es))
for i, err := range es {
points[i] = fmt.Sprintf("* %s", err)
}
return fmt.Sprintf(
"%d warning(s):\n\n%s",
len(es), strings.Join(points, "\n"))
}
2015-08-05 00:19:05 +00:00
// RemoveAllocs is used to remove any allocs with the given IDs
// from the list of allocations
func RemoveAllocs(alloc []*Allocation, remove []*Allocation) []*Allocation {
2015-08-05 00:19:05 +00:00
// Convert remove into a set
removeSet := make(map[string]struct{})
for _, remove := range remove {
removeSet[remove.ID] = struct{}{}
2015-08-05 00:19:05 +00:00
}
n := len(alloc)
for i := 0; i < n; i++ {
if _, ok := removeSet[alloc[i].ID]; ok {
alloc[i], alloc[n-1] = alloc[n-1], nil
i--
n--
}
}
alloc = alloc[:n]
return alloc
}
2015-08-05 00:28:19 +00:00
// FilterTerminalAllocs filters out all allocations in a terminal state and
2016-08-31 21:06:31 +00:00
// returns the latest terminal allocations
func FilterTerminalAllocs(allocs []*Allocation) ([]*Allocation, map[string]*Allocation) {
terminalAllocsByName := make(map[string]*Allocation)
2015-08-23 01:27:51 +00:00
n := len(allocs)
for i := 0; i < n; i++ {
if allocs[i].TerminalStatus() {
// Add the allocation to the terminal allocs map if it's not already
// added or has a higher create index than the one which is
// currently present.
alloc, ok := terminalAllocsByName[allocs[i].Name]
if !ok || alloc.CreateIndex < allocs[i].CreateIndex {
terminalAllocsByName[allocs[i].Name] = allocs[i]
}
// Remove the allocation
2015-08-23 01:27:51 +00:00
allocs[i], allocs[n-1] = allocs[n-1], nil
i--
n--
}
}
return allocs[:n], terminalAllocsByName
2015-08-23 01:27:51 +00:00
}
// AllocsFit checks if a given set of allocations will fit on a node.
// The netIdx can optionally be provided if its already been computed.
// If the netIdx is provided, it is assumed that the client has already
// ensured there are no collisions. If checkDevices is set to true, we check if
// there is a device oversubscription.
func AllocsFit(node *Node, allocs []*Allocation, netIdx *NetworkIndex, checkDevices bool) (bool, string, *ComparableResources, error) {
2015-08-05 00:48:24 +00:00
// Compute the utilization from zero
2018-10-04 21:33:09 +00:00
used := new(ComparableResources)
2015-08-05 00:48:24 +00:00
// Add the reserved resources of the node
2018-10-02 20:36:04 +00:00
used.Add(node.ComparableReservedResources())
2015-08-05 00:48:24 +00:00
// For each alloc, add the resources
for _, alloc := range allocs {
// Do not consider the resource impact of terminal allocations
if alloc.TerminalStatus() {
continue
}
2018-10-02 20:36:04 +00:00
used.Add(alloc.ComparableResources())
2015-08-05 00:48:24 +00:00
}
// Check that the node resources are a super set of those
// that are being allocated
2018-10-02 20:36:04 +00:00
if superset, dimension := node.ComparableResources().Superset(used); !superset {
return false, dimension, used, nil
2015-08-05 00:48:24 +00:00
}
// Create the network index if missing
if netIdx == nil {
netIdx = NewNetworkIndex()
defer netIdx.Release()
if netIdx.SetNode(node) || netIdx.AddAllocs(allocs) {
return false, "reserved port collision", used, nil
}
}
// Check if the network is overcommitted
if netIdx.Overcommitted() {
return false, "bandwidth exceeded", used, nil
2015-08-05 00:48:24 +00:00
}
// Check devices
if checkDevices {
accounter := NewDeviceAccounter(node)
if accounter.AddAllocs(allocs) {
return false, "device oversubscribed", used, nil
}
}
2015-08-05 00:48:24 +00:00
// Allocations fit!
return true, "", used, nil
2015-08-13 18:54:59 +00:00
}
// ScoreFit is used to score the fit based on the Google work published here:
// http://www.columbia.edu/~cs2035/courses/ieor4405.S13/datacenter_scheduling.ppt
// This is equivalent to their BestFit v3
2018-10-04 21:33:09 +00:00
func ScoreFit(node *Node, util *ComparableResources) float64 {
2018-10-02 20:36:04 +00:00
// COMPAT(0.11): Remove in 0.11
reserved := node.ComparableReservedResources()
res := node.ComparableResources()
2015-08-13 18:54:59 +00:00
// Determine the node availability
2018-10-02 20:36:04 +00:00
nodeCpu := float64(res.Flattened.Cpu.CpuShares)
nodeMem := float64(res.Flattened.Memory.MemoryMB)
if reserved != nil {
nodeCpu -= float64(reserved.Flattened.Cpu.CpuShares)
nodeMem -= float64(reserved.Flattened.Memory.MemoryMB)
2015-08-13 18:54:59 +00:00
}
// Compute the free percentage
2018-10-02 20:36:04 +00:00
freePctCpu := 1 - (float64(util.Flattened.Cpu.CpuShares) / nodeCpu)
freePctRam := 1 - (float64(util.Flattened.Memory.MemoryMB) / nodeMem)
2015-08-13 18:54:59 +00:00
// Total will be "maximized" the smaller the value is.
// At 100% utilization, the total is 2, while at 0% util it is 20.
total := math.Pow(10, freePctCpu) + math.Pow(10, freePctRam)
// Invert so that the "maximized" total represents a high-value
// score. Because the floor is 20, we simply use that as an anchor.
// This means at a perfect fit, we return 18 as the score.
score := 20.0 - total
// Bound the score, just in case
// If the score is over 18, that means we've overfit the node.
if score > 18.0 {
score = 18.0
} else if score < 0 {
score = 0
}
return score
2015-08-05 00:48:24 +00:00
}
2015-09-07 22:08:50 +00:00
2016-02-11 17:08:20 +00:00
func CopySliceConstraints(s []*Constraint) []*Constraint {
l := len(s)
if l == 0 {
return nil
}
c := make([]*Constraint, l)
for i, v := range s {
c[i] = v.Copy()
}
return c
}
2016-08-17 00:50:14 +00:00
2018-07-16 13:30:58 +00:00
func CopySliceAffinities(s []*Affinity) []*Affinity {
l := len(s)
if l == 0 {
return nil
}
c := make([]*Affinity, l)
for i, v := range s {
c[i] = v.Copy()
}
return c
}
2018-07-16 17:52:24 +00:00
func CopySliceSpreads(s []*Spread) []*Spread {
l := len(s)
if l == 0 {
return nil
}
c := make([]*Spread, l)
for i, v := range s {
c[i] = v.Copy()
}
return c
}
func CopySliceSpreadTarget(s []*SpreadTarget) []*SpreadTarget {
l := len(s)
if l == 0 {
return nil
}
c := make([]*SpreadTarget, l)
for i, v := range s {
c[i] = v.Copy()
}
return c
}
func CopySliceNodeScoreMeta(s []*NodeScoreMeta) []*NodeScoreMeta {
l := len(s)
if l == 0 {
return nil
}
c := make([]*NodeScoreMeta, l)
for i, v := range s {
c[i] = v.Copy()
}
return c
}
func CopyScalingPolicy(p *ScalingPolicy) *ScalingPolicy {
if p == nil {
return nil
}
c := ScalingPolicy{
ID: p.ID,
Namespace: p.Namespace,
Target: p.Target,
JobID: p.JobID,
Policy: p.Policy,
Enabled: p.Enabled,
CreateIndex: p.CreateIndex,
ModifyIndex: p.ModifyIndex,
}
return &c
}
2016-08-17 00:50:14 +00:00
// VaultPoliciesSet takes the structure returned by VaultPolicies and returns
// the set of required policies
2016-08-18 17:50:47 +00:00
func VaultPoliciesSet(policies map[string]map[string]*Vault) []string {
2016-08-17 00:50:14 +00:00
set := make(map[string]struct{})
for _, tgp := range policies {
for _, tp := range tgp {
2016-08-18 17:50:47 +00:00
for _, p := range tp.Policies {
2016-08-17 00:50:14 +00:00
set[p] = struct{}{}
}
}
}
flattened := make([]string, 0, len(set))
for p := range set {
flattened = append(flattened, p)
}
return flattened
}
// DenormalizeAllocationJobs is used to attach a job to all allocations that are
// non-terminal and do not have a job already. This is useful in cases where the
// job is normalized.
func DenormalizeAllocationJobs(job *Job, allocs []*Allocation) {
if job != nil {
for _, alloc := range allocs {
if alloc.Job == nil && !alloc.TerminalStatus() {
alloc.Job = job
}
}
}
}
2017-05-31 18:34:46 +00:00
// AllocName returns the name of the allocation given the input.
func AllocName(job, group string, idx uint) string {
return fmt.Sprintf("%s.%s[%d]", job, group, idx)
}
// ACLPolicyListHash returns a consistent hash for a set of policies.
func ACLPolicyListHash(policies []*ACLPolicy) string {
cacheKeyHash, err := blake2b.New256(nil)
if err != nil {
panic(err)
}
for _, policy := range policies {
cacheKeyHash.Write([]byte(policy.Name))
binary.Write(cacheKeyHash, binary.BigEndian, policy.ModifyIndex)
}
cacheKey := string(cacheKeyHash.Sum(nil))
return cacheKey
}
2017-08-20 21:30:27 +00:00
// CompileACLObject compiles a set of ACL policies into an ACL object with a cache
func CompileACLObject(cache *lru.TwoQueueCache, policies []*ACLPolicy) (*acl.ACL, error) {
// Sort the policies to ensure consistent ordering
2017-08-23 20:49:08 +00:00
sort.Slice(policies, func(i, j int) bool {
return policies[i].Name < policies[j].Name
})
2017-08-20 21:30:27 +00:00
// Determine the cache key
cacheKey := ACLPolicyListHash(policies)
aclRaw, ok := cache.Get(cacheKey)
if ok {
return aclRaw.(*acl.ACL), nil
}
// Parse the policies
parsed := make([]*acl.Policy, 0, len(policies))
for _, policy := range policies {
p, err := acl.Parse(policy.Rules)
if err != nil {
return nil, fmt.Errorf("failed to parse %q: %v", policy.Name, err)
}
parsed = append(parsed, p)
}
// Create the ACL object
aclObj, err := acl.NewACL(false, parsed)
if err != nil {
return nil, fmt.Errorf("failed to construct ACL: %v", err)
}
// Update the cache
cache.Add(cacheKey, aclObj)
return aclObj, nil
}
2018-01-12 21:58:44 +00:00
// GenerateMigrateToken will create a token for a client to access an
// authenticated volume of another client to migrate data for sticky volumes.
func GenerateMigrateToken(allocID, nodeSecretID string) (string, error) {
h, err := blake2b.New512([]byte(nodeSecretID))
if err != nil {
return "", err
}
h.Write([]byte(allocID))
return base64.URLEncoding.EncodeToString(h.Sum(nil)), nil
}
// CompareMigrateToken returns true if two migration tokens can be computed and
// are equal.
func CompareMigrateToken(allocID, nodeSecretID, otherMigrateToken string) bool {
h, err := blake2b.New512([]byte(nodeSecretID))
if err != nil {
return false
}
h.Write([]byte(allocID))
otherBytes, err := base64.URLEncoding.DecodeString(otherMigrateToken)
if err != nil {
return false
}
return subtle.ConstantTimeCompare(h.Sum(nil), otherBytes) == 1
}
2018-10-02 20:36:04 +00:00
// ParsePortRanges parses the passed port range string and returns a list of the
// ports. The specification is a comma separated list of either port numbers or
// port ranges. A port number is a single integer and a port range is two
// integers separated by a hyphen. As an example the following spec would
// convert to: ParsePortRanges("10,12-14,16") -> []uint64{10, 12, 13, 14, 16}
func ParsePortRanges(spec string) ([]uint64, error) {
parts := strings.Split(spec, ",")
// Hot path the empty case
if len(parts) == 1 && parts[0] == "" {
return nil, nil
}
ports := make(map[uint64]struct{})
for _, part := range parts {
part = strings.TrimSpace(part)
rangeParts := strings.Split(part, "-")
l := len(rangeParts)
switch l {
case 1:
if val := rangeParts[0]; val == "" {
return nil, fmt.Errorf("can't specify empty port")
} else {
port, err := strconv.ParseUint(val, 10, 0)
if err != nil {
return nil, err
}
ports[port] = struct{}{}
}
case 2:
// We are parsing a range
start, err := strconv.ParseUint(rangeParts[0], 10, 0)
if err != nil {
return nil, err
}
end, err := strconv.ParseUint(rangeParts[1], 10, 0)
if err != nil {
return nil, err
}
if end < start {
return nil, fmt.Errorf("invalid range: starting value (%v) less than ending (%v) value", end, start)
}
for i := start; i <= end; i++ {
ports[i] = struct{}{}
}
default:
return nil, fmt.Errorf("can only parse single port numbers or port ranges (ex. 80,100-120,150)")
}
}
var results []uint64
for port := range ports {
results = append(results, port)
}
sort.Slice(results, func(i, j int) bool {
return results[i] < results[j]
})
return results, nil
}