9e96971a75
* cli: display group ports and address in alloc status command output * add assertions for port.To = -1 case and convert assertions to testify
418 lines
11 KiB
Go
418 lines
11 KiB
Go
package structs
|
|
|
|
import (
|
|
"fmt"
|
|
"math/rand"
|
|
"net"
|
|
"sync"
|
|
)
|
|
|
|
const (
|
|
// MinDynamicPort is the smallest dynamic port generated
|
|
MinDynamicPort = 20000
|
|
|
|
// MaxDynamicPort is the largest dynamic port generated
|
|
MaxDynamicPort = 32000
|
|
|
|
// maxRandPortAttempts is the maximum number of attempt
|
|
// to assign a random port
|
|
maxRandPortAttempts = 20
|
|
|
|
// maxValidPort is the max valid port number
|
|
maxValidPort = 65536
|
|
)
|
|
|
|
var (
|
|
// bitmapPool is used to pool the bitmaps used for port collision
|
|
// checking. They are fairly large (8K) so we can re-use them to
|
|
// avoid GC pressure. Care should be taken to call Clear() on any
|
|
// bitmap coming from the pool.
|
|
bitmapPool = new(sync.Pool)
|
|
)
|
|
|
|
// NetworkIndex is used to index the available network resources
|
|
// and the used network resources on a machine given allocations
|
|
type NetworkIndex struct {
|
|
AvailNetworks []*NetworkResource // List of available networks
|
|
AvailBandwidth map[string]int // Bandwidth by device
|
|
UsedPorts map[string]Bitmap // Ports by IP
|
|
UsedBandwidth map[string]int // Bandwidth by device
|
|
}
|
|
|
|
// NewNetworkIndex is used to construct a new network index
|
|
func NewNetworkIndex() *NetworkIndex {
|
|
return &NetworkIndex{
|
|
AvailBandwidth: make(map[string]int),
|
|
UsedPorts: make(map[string]Bitmap),
|
|
UsedBandwidth: make(map[string]int),
|
|
}
|
|
}
|
|
|
|
// Release is called when the network index is no longer needed
|
|
// to attempt to re-use some of the memory it has allocated
|
|
func (idx *NetworkIndex) Release() {
|
|
for _, b := range idx.UsedPorts {
|
|
bitmapPool.Put(b)
|
|
}
|
|
}
|
|
|
|
// Overcommitted checks if the network is overcommitted
|
|
func (idx *NetworkIndex) Overcommitted() bool {
|
|
for device, used := range idx.UsedBandwidth {
|
|
avail := idx.AvailBandwidth[device]
|
|
if used > avail {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
// SetNode is used to setup the available network resources. Returns
|
|
// true if there is a collision
|
|
func (idx *NetworkIndex) SetNode(node *Node) (collide bool) {
|
|
|
|
// COMPAT(0.11): Remove in 0.11
|
|
// Grab the network resources, handling both new and old
|
|
var networks []*NetworkResource
|
|
if node.NodeResources != nil && len(node.NodeResources.Networks) != 0 {
|
|
networks = node.NodeResources.Networks
|
|
} else if node.Resources != nil {
|
|
networks = node.Resources.Networks
|
|
}
|
|
|
|
// Add the available CIDR blocks
|
|
for _, n := range networks {
|
|
if n.Device != "" {
|
|
idx.AvailNetworks = append(idx.AvailNetworks, n)
|
|
idx.AvailBandwidth[n.Device] = n.MBits
|
|
}
|
|
}
|
|
|
|
// COMPAT(0.11): Remove in 0.11
|
|
// Handle reserving ports, handling both new and old
|
|
if node.ReservedResources != nil && node.ReservedResources.Networks.ReservedHostPorts != "" {
|
|
collide = idx.AddReservedPortRange(node.ReservedResources.Networks.ReservedHostPorts)
|
|
} else if node.Reserved != nil {
|
|
for _, n := range node.Reserved.Networks {
|
|
if idx.AddReserved(n) {
|
|
collide = true
|
|
}
|
|
}
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
// AddAllocs is used to add the used network resources. Returns
|
|
// true if there is a collision
|
|
func (idx *NetworkIndex) AddAllocs(allocs []*Allocation) (collide bool) {
|
|
for _, alloc := range allocs {
|
|
// Do not consider the resource impact of terminal allocations
|
|
if alloc.TerminalStatus() {
|
|
continue
|
|
}
|
|
|
|
if alloc.AllocatedResources != nil {
|
|
// Add network resources that are at the task group level
|
|
if len(alloc.AllocatedResources.Shared.Networks) > 0 {
|
|
for _, network := range alloc.AllocatedResources.Shared.Networks {
|
|
if idx.AddReserved(network) {
|
|
collide = true
|
|
}
|
|
}
|
|
}
|
|
|
|
for _, task := range alloc.AllocatedResources.Tasks {
|
|
if len(task.Networks) == 0 {
|
|
continue
|
|
}
|
|
n := task.Networks[0]
|
|
if idx.AddReserved(n) {
|
|
collide = true
|
|
}
|
|
}
|
|
} else {
|
|
// COMPAT(0.11): Remove in 0.11
|
|
for _, task := range alloc.TaskResources {
|
|
if len(task.Networks) == 0 {
|
|
continue
|
|
}
|
|
n := task.Networks[0]
|
|
if idx.AddReserved(n) {
|
|
collide = true
|
|
}
|
|
}
|
|
}
|
|
}
|
|
return
|
|
}
|
|
|
|
// AddReserved is used to add a reserved network usage, returns true
|
|
// if there is a port collision
|
|
func (idx *NetworkIndex) AddReserved(n *NetworkResource) (collide bool) {
|
|
// Add the port usage
|
|
used := idx.UsedPorts[n.IP]
|
|
if used == nil {
|
|
// Try to get a bitmap from the pool, else create
|
|
raw := bitmapPool.Get()
|
|
if raw != nil {
|
|
used = raw.(Bitmap)
|
|
used.Clear()
|
|
} else {
|
|
used, _ = NewBitmap(maxValidPort)
|
|
}
|
|
idx.UsedPorts[n.IP] = used
|
|
}
|
|
|
|
for _, ports := range [][]Port{n.ReservedPorts, n.DynamicPorts} {
|
|
for _, port := range ports {
|
|
// Guard against invalid port
|
|
if port.Value < 0 || port.Value >= maxValidPort {
|
|
return true
|
|
}
|
|
if used.Check(uint(port.Value)) {
|
|
collide = true
|
|
} else {
|
|
used.Set(uint(port.Value))
|
|
}
|
|
}
|
|
}
|
|
|
|
// Add the bandwidth
|
|
idx.UsedBandwidth[n.Device] += n.MBits
|
|
return
|
|
}
|
|
|
|
// AddReservedPortRange marks the ports given as reserved on all network
|
|
// interfaces. The port format is comma delimited, with spans given as n1-n2
|
|
// (80,100-200,205)
|
|
func (idx *NetworkIndex) AddReservedPortRange(ports string) (collide bool) {
|
|
// Convert the ports into a slice of ints
|
|
resPorts, err := ParsePortRanges(ports)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
// Ensure we create a bitmap for each available network
|
|
for _, n := range idx.AvailNetworks {
|
|
used := idx.UsedPorts[n.IP]
|
|
if used == nil {
|
|
// Try to get a bitmap from the pool, else create
|
|
raw := bitmapPool.Get()
|
|
if raw != nil {
|
|
used = raw.(Bitmap)
|
|
used.Clear()
|
|
} else {
|
|
used, _ = NewBitmap(maxValidPort)
|
|
}
|
|
idx.UsedPorts[n.IP] = used
|
|
}
|
|
}
|
|
|
|
for _, used := range idx.UsedPorts {
|
|
for _, port := range resPorts {
|
|
// Guard against invalid port
|
|
if port < 0 || port >= maxValidPort {
|
|
return true
|
|
}
|
|
if used.Check(uint(port)) {
|
|
collide = true
|
|
} else {
|
|
used.Set(uint(port))
|
|
}
|
|
}
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
// yieldIP is used to iteratively invoke the callback with
|
|
// an available IP
|
|
func (idx *NetworkIndex) yieldIP(cb func(net *NetworkResource, ip net.IP) bool) {
|
|
inc := func(ip net.IP) {
|
|
for j := len(ip) - 1; j >= 0; j-- {
|
|
ip[j]++
|
|
if ip[j] > 0 {
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
for _, n := range idx.AvailNetworks {
|
|
ip, ipnet, err := net.ParseCIDR(n.CIDR)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
for ip := ip.Mask(ipnet.Mask); ipnet.Contains(ip); inc(ip) {
|
|
if cb(n, ip) {
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// AssignNetwork is used to assign network resources given an ask.
|
|
// If the ask cannot be satisfied, returns nil
|
|
func (idx *NetworkIndex) AssignNetwork(ask *NetworkResource) (out *NetworkResource, err error) {
|
|
err = fmt.Errorf("no networks available")
|
|
idx.yieldIP(func(n *NetworkResource, ip net.IP) (stop bool) {
|
|
// Convert the IP to a string
|
|
ipStr := ip.String()
|
|
|
|
// Check if we would exceed the bandwidth cap
|
|
availBandwidth := idx.AvailBandwidth[n.Device]
|
|
usedBandwidth := idx.UsedBandwidth[n.Device]
|
|
if usedBandwidth+ask.MBits > availBandwidth {
|
|
err = fmt.Errorf("bandwidth exceeded")
|
|
return
|
|
}
|
|
|
|
used := idx.UsedPorts[ipStr]
|
|
|
|
// Check if any of the reserved ports are in use
|
|
for _, port := range ask.ReservedPorts {
|
|
// Guard against invalid port
|
|
if port.Value < 0 || port.Value >= maxValidPort {
|
|
err = fmt.Errorf("invalid port %d (out of range)", port.Value)
|
|
return
|
|
}
|
|
|
|
// Check if in use
|
|
if used != nil && used.Check(uint(port.Value)) {
|
|
err = fmt.Errorf("reserved port collision")
|
|
return
|
|
}
|
|
}
|
|
|
|
// Create the offer
|
|
offer := &NetworkResource{
|
|
Mode: ask.Mode,
|
|
Device: n.Device,
|
|
IP: ipStr,
|
|
MBits: ask.MBits,
|
|
ReservedPorts: ask.ReservedPorts,
|
|
DynamicPorts: ask.DynamicPorts,
|
|
}
|
|
|
|
// Try to stochastically pick the dynamic ports as it is faster and
|
|
// lower memory usage.
|
|
var dynPorts []int
|
|
var dynErr error
|
|
dynPorts, dynErr = getDynamicPortsStochastic(used, ask)
|
|
if dynErr == nil {
|
|
goto BUILD_OFFER
|
|
}
|
|
|
|
// Fall back to the precise method if the random sampling failed.
|
|
dynPorts, dynErr = getDynamicPortsPrecise(used, ask)
|
|
if dynErr != nil {
|
|
err = dynErr
|
|
return
|
|
}
|
|
|
|
BUILD_OFFER:
|
|
for i, port := range dynPorts {
|
|
offer.DynamicPorts[i].Value = port
|
|
|
|
// This syntax allows you to set the mapped to port to the same port
|
|
// allocated by the scheduler on the host.
|
|
if offer.DynamicPorts[i].To == -1 {
|
|
offer.DynamicPorts[i].To = port
|
|
}
|
|
}
|
|
|
|
// Stop, we have an offer!
|
|
out = offer
|
|
err = nil
|
|
return true
|
|
})
|
|
return
|
|
}
|
|
|
|
// getDynamicPortsPrecise takes the nodes used port bitmap which may be nil if
|
|
// no ports have been allocated yet, the network ask and returns a set of unused
|
|
// ports to fullfil the ask's DynamicPorts or an error if it failed. An error
|
|
// means the ask can not be satisfied as the method does a precise search.
|
|
func getDynamicPortsPrecise(nodeUsed Bitmap, ask *NetworkResource) ([]int, error) {
|
|
// Create a copy of the used ports and apply the new reserves
|
|
var usedSet Bitmap
|
|
var err error
|
|
if nodeUsed != nil {
|
|
usedSet, err = nodeUsed.Copy()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
} else {
|
|
usedSet, err = NewBitmap(maxValidPort)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
for _, port := range ask.ReservedPorts {
|
|
usedSet.Set(uint(port.Value))
|
|
}
|
|
|
|
// Get the indexes of the unset
|
|
availablePorts := usedSet.IndexesInRange(false, MinDynamicPort, MaxDynamicPort)
|
|
|
|
// Randomize the amount we need
|
|
numDyn := len(ask.DynamicPorts)
|
|
if len(availablePorts) < numDyn {
|
|
return nil, fmt.Errorf("dynamic port selection failed")
|
|
}
|
|
|
|
numAvailable := len(availablePorts)
|
|
for i := 0; i < numDyn; i++ {
|
|
j := rand.Intn(numAvailable)
|
|
availablePorts[i], availablePorts[j] = availablePorts[j], availablePorts[i]
|
|
}
|
|
|
|
return availablePorts[:numDyn], nil
|
|
}
|
|
|
|
// getDynamicPortsStochastic takes the nodes used port bitmap which may be nil if
|
|
// no ports have been allocated yet, the network ask and returns a set of unused
|
|
// ports to fullfil the ask's DynamicPorts or an error if it failed. An error
|
|
// does not mean the ask can not be satisfied as the method has a fixed amount
|
|
// of random probes and if these fail, the search is aborted.
|
|
func getDynamicPortsStochastic(nodeUsed Bitmap, ask *NetworkResource) ([]int, error) {
|
|
var reserved, dynamic []int
|
|
for _, port := range ask.ReservedPorts {
|
|
reserved = append(reserved, port.Value)
|
|
}
|
|
|
|
for i := 0; i < len(ask.DynamicPorts); i++ {
|
|
attempts := 0
|
|
PICK:
|
|
attempts++
|
|
if attempts > maxRandPortAttempts {
|
|
return nil, fmt.Errorf("stochastic dynamic port selection failed")
|
|
}
|
|
|
|
randPort := MinDynamicPort + rand.Intn(MaxDynamicPort-MinDynamicPort)
|
|
if nodeUsed != nil && nodeUsed.Check(uint(randPort)) {
|
|
goto PICK
|
|
}
|
|
|
|
for _, ports := range [][]int{reserved, dynamic} {
|
|
if isPortReserved(ports, randPort) {
|
|
goto PICK
|
|
}
|
|
}
|
|
dynamic = append(dynamic, randPort)
|
|
}
|
|
|
|
return dynamic, nil
|
|
}
|
|
|
|
// IntContains scans an integer slice for a value
|
|
func isPortReserved(haystack []int, needle int) bool {
|
|
for _, item := range haystack {
|
|
if item == needle {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|