open-nomad/nomad/structs/network.go
Preetha Appan 99eca85206
Scheduler changes to support network at task group level
Also includes unit tests for binpacker and preemption.
The tests verify that network resources specified at the
task group level are properly accounted for
2019-07-31 01:04:08 -04:00

411 lines
10 KiB
Go

package structs
import (
"fmt"
"math/rand"
"net"
"sync"
)
const (
// MinDynamicPort is the smallest dynamic port generated
MinDynamicPort = 20000
// MaxDynamicPort is the largest dynamic port generated
MaxDynamicPort = 32000
// maxRandPortAttempts is the maximum number of attempt
// to assign a random port
maxRandPortAttempts = 20
// maxValidPort is the max valid port number
maxValidPort = 65536
)
var (
// bitmapPool is used to pool the bitmaps used for port collision
// checking. They are fairly large (8K) so we can re-use them to
// avoid GC pressure. Care should be taken to call Clear() on any
// bitmap coming from the pool.
bitmapPool = new(sync.Pool)
)
// NetworkIndex is used to index the available network resources
// and the used network resources on a machine given allocations
type NetworkIndex struct {
AvailNetworks []*NetworkResource // List of available networks
AvailBandwidth map[string]int // Bandwidth by device
UsedPorts map[string]Bitmap // Ports by IP
UsedBandwidth map[string]int // Bandwidth by device
}
// NewNetworkIndex is used to construct a new network index
func NewNetworkIndex() *NetworkIndex {
return &NetworkIndex{
AvailBandwidth: make(map[string]int),
UsedPorts: make(map[string]Bitmap),
UsedBandwidth: make(map[string]int),
}
}
// Release is called when the network index is no longer needed
// to attempt to re-use some of the memory it has allocated
func (idx *NetworkIndex) Release() {
for _, b := range idx.UsedPorts {
bitmapPool.Put(b)
}
}
// Overcommitted checks if the network is overcommitted
func (idx *NetworkIndex) Overcommitted() bool {
for device, used := range idx.UsedBandwidth {
avail := idx.AvailBandwidth[device]
if used > avail {
return true
}
}
return false
}
// SetNode is used to setup the available network resources. Returns
// true if there is a collision
func (idx *NetworkIndex) SetNode(node *Node) (collide bool) {
// COMPAT(0.11): Remove in 0.11
// Grab the network resources, handling both new and old
var networks []*NetworkResource
if node.NodeResources != nil && len(node.NodeResources.Networks) != 0 {
networks = node.NodeResources.Networks
} else if node.Resources != nil {
networks = node.Resources.Networks
}
// Add the available CIDR blocks
for _, n := range networks {
if n.Device != "" {
idx.AvailNetworks = append(idx.AvailNetworks, n)
idx.AvailBandwidth[n.Device] = n.MBits
}
}
// COMPAT(0.11): Remove in 0.11
// Handle reserving ports, handling both new and old
if node.ReservedResources != nil && node.ReservedResources.Networks.ReservedHostPorts != "" {
collide = idx.AddReservedPortRange(node.ReservedResources.Networks.ReservedHostPorts)
} else if node.Reserved != nil {
for _, n := range node.Reserved.Networks {
if idx.AddReserved(n) {
collide = true
}
}
}
return
}
// AddAllocs is used to add the used network resources. Returns
// true if there is a collision
func (idx *NetworkIndex) AddAllocs(allocs []*Allocation) (collide bool) {
for _, alloc := range allocs {
// Do not consider the resource impact of terminal allocations
if alloc.TerminalStatus() {
continue
}
if alloc.AllocatedResources != nil {
// Add network resources that are at the task group level
if len(alloc.AllocatedResources.Shared.Networks) > 0 {
for _, network := range alloc.AllocatedResources.Shared.Networks {
if idx.AddReserved(network) {
collide = true
}
}
}
for _, task := range alloc.AllocatedResources.Tasks {
if len(task.Networks) == 0 {
continue
}
n := task.Networks[0]
if idx.AddReserved(n) {
collide = true
}
}
} else {
// COMPAT(0.11): Remove in 0.11
for _, task := range alloc.TaskResources {
if len(task.Networks) == 0 {
continue
}
n := task.Networks[0]
if idx.AddReserved(n) {
collide = true
}
}
}
}
return
}
// AddReserved is used to add a reserved network usage, returns true
// if there is a port collision
func (idx *NetworkIndex) AddReserved(n *NetworkResource) (collide bool) {
// Add the port usage
used := idx.UsedPorts[n.IP]
if used == nil {
// Try to get a bitmap from the pool, else create
raw := bitmapPool.Get()
if raw != nil {
used = raw.(Bitmap)
used.Clear()
} else {
used, _ = NewBitmap(maxValidPort)
}
idx.UsedPorts[n.IP] = used
}
for _, ports := range [][]Port{n.ReservedPorts, n.DynamicPorts} {
for _, port := range ports {
// Guard against invalid port
if port.Value < 0 || port.Value >= maxValidPort {
return true
}
if used.Check(uint(port.Value)) {
collide = true
} else {
used.Set(uint(port.Value))
}
}
}
// Add the bandwidth
idx.UsedBandwidth[n.Device] += n.MBits
return
}
// AddReservedPortRange marks the ports given as reserved on all network
// interfaces. The port format is comma delimited, with spans given as n1-n2
// (80,100-200,205)
func (idx *NetworkIndex) AddReservedPortRange(ports string) (collide bool) {
// Convert the ports into a slice of ints
resPorts, err := ParsePortRanges(ports)
if err != nil {
return
}
// Ensure we create a bitmap for each available network
for _, n := range idx.AvailNetworks {
used := idx.UsedPorts[n.IP]
if used == nil {
// Try to get a bitmap from the pool, else create
raw := bitmapPool.Get()
if raw != nil {
used = raw.(Bitmap)
used.Clear()
} else {
used, _ = NewBitmap(maxValidPort)
}
idx.UsedPorts[n.IP] = used
}
}
for _, used := range idx.UsedPorts {
for _, port := range resPorts {
// Guard against invalid port
if port < 0 || port >= maxValidPort {
return true
}
if used.Check(uint(port)) {
collide = true
} else {
used.Set(uint(port))
}
}
}
return
}
// yieldIP is used to iteratively invoke the callback with
// an available IP
func (idx *NetworkIndex) yieldIP(cb func(net *NetworkResource, ip net.IP) bool) {
inc := func(ip net.IP) {
for j := len(ip) - 1; j >= 0; j-- {
ip[j]++
if ip[j] > 0 {
break
}
}
}
for _, n := range idx.AvailNetworks {
ip, ipnet, err := net.ParseCIDR(n.CIDR)
if err != nil {
continue
}
for ip := ip.Mask(ipnet.Mask); ipnet.Contains(ip); inc(ip) {
if cb(n, ip) {
return
}
}
}
}
// AssignNetwork is used to assign network resources given an ask.
// If the ask cannot be satisfied, returns nil
func (idx *NetworkIndex) AssignNetwork(ask *NetworkResource) (out *NetworkResource, err error) {
err = fmt.Errorf("no networks available")
idx.yieldIP(func(n *NetworkResource, ip net.IP) (stop bool) {
// Convert the IP to a string
ipStr := ip.String()
// Check if we would exceed the bandwidth cap
availBandwidth := idx.AvailBandwidth[n.Device]
usedBandwidth := idx.UsedBandwidth[n.Device]
if usedBandwidth+ask.MBits > availBandwidth {
err = fmt.Errorf("bandwidth exceeded")
return
}
used := idx.UsedPorts[ipStr]
// Check if any of the reserved ports are in use
for _, port := range ask.ReservedPorts {
// Guard against invalid port
if port.Value < 0 || port.Value >= maxValidPort {
err = fmt.Errorf("invalid port %d (out of range)", port.Value)
return
}
// Check if in use
if used != nil && used.Check(uint(port.Value)) {
err = fmt.Errorf("reserved port collision")
return
}
}
// Create the offer
offer := &NetworkResource{
Device: n.Device,
IP: ipStr,
MBits: ask.MBits,
ReservedPorts: ask.ReservedPorts,
DynamicPorts: ask.DynamicPorts,
}
// Try to stochastically pick the dynamic ports as it is faster and
// lower memory usage.
var dynPorts []int
var dynErr error
dynPorts, dynErr = getDynamicPortsStochastic(used, ask)
if dynErr == nil {
goto BUILD_OFFER
}
// Fall back to the precise method if the random sampling failed.
dynPorts, dynErr = getDynamicPortsPrecise(used, ask)
if dynErr != nil {
err = dynErr
return
}
BUILD_OFFER:
for i, port := range dynPorts {
offer.DynamicPorts[i].Value = port
}
// Stop, we have an offer!
out = offer
err = nil
return true
})
return
}
// getDynamicPortsPrecise takes the nodes used port bitmap which may be nil if
// no ports have been allocated yet, the network ask and returns a set of unused
// ports to fullfil the ask's DynamicPorts or an error if it failed. An error
// means the ask can not be satisfied as the method does a precise search.
func getDynamicPortsPrecise(nodeUsed Bitmap, ask *NetworkResource) ([]int, error) {
// Create a copy of the used ports and apply the new reserves
var usedSet Bitmap
var err error
if nodeUsed != nil {
usedSet, err = nodeUsed.Copy()
if err != nil {
return nil, err
}
} else {
usedSet, err = NewBitmap(maxValidPort)
if err != nil {
return nil, err
}
}
for _, port := range ask.ReservedPorts {
usedSet.Set(uint(port.Value))
}
// Get the indexes of the unset
availablePorts := usedSet.IndexesInRange(false, MinDynamicPort, MaxDynamicPort)
// Randomize the amount we need
numDyn := len(ask.DynamicPorts)
if len(availablePorts) < numDyn {
return nil, fmt.Errorf("dynamic port selection failed")
}
numAvailable := len(availablePorts)
for i := 0; i < numDyn; i++ {
j := rand.Intn(numAvailable)
availablePorts[i], availablePorts[j] = availablePorts[j], availablePorts[i]
}
return availablePorts[:numDyn], nil
}
// getDynamicPortsStochastic takes the nodes used port bitmap which may be nil if
// no ports have been allocated yet, the network ask and returns a set of unused
// ports to fullfil the ask's DynamicPorts or an error if it failed. An error
// does not mean the ask can not be satisfied as the method has a fixed amount
// of random probes and if these fail, the search is aborted.
func getDynamicPortsStochastic(nodeUsed Bitmap, ask *NetworkResource) ([]int, error) {
var reserved, dynamic []int
for _, port := range ask.ReservedPorts {
reserved = append(reserved, port.Value)
}
for i := 0; i < len(ask.DynamicPorts); i++ {
attempts := 0
PICK:
attempts++
if attempts > maxRandPortAttempts {
return nil, fmt.Errorf("stochastic dynamic port selection failed")
}
randPort := MinDynamicPort + rand.Intn(MaxDynamicPort-MinDynamicPort)
if nodeUsed != nil && nodeUsed.Check(uint(randPort)) {
goto PICK
}
for _, ports := range [][]int{reserved, dynamic} {
if isPortReserved(ports, randPort) {
goto PICK
}
}
dynamic = append(dynamic, randPort)
}
return dynamic, nil
}
// IntContains scans an integer slice for a value
func isPortReserved(haystack []int, needle int) bool {
for _, item := range haystack {
if item == needle {
return true
}
}
return false
}