Merge pull request #43 from hashicorp/f-ports

Network offers and Dynamic Ports
This commit is contained in:
Armon Dadgar 2015-09-13 17:10:41 -07:00
commit 95af1478b8
15 changed files with 904 additions and 168 deletions

View File

@ -20,10 +20,9 @@ func Node() *structs.Node {
IOPS: 150,
Networks: []*structs.NetworkResource{
&structs.NetworkResource{
Public: true,
CIDR: "192.168.0.100/32",
ReservedPorts: []int{22},
MBits: 1000,
Device: "eth0",
CIDR: "192.168.0.100/32",
MBits: 1000,
},
},
},
@ -31,6 +30,14 @@ func Node() *structs.Node {
CPU: 0.1,
MemoryMB: 256,
DiskMB: 4 * 1024,
Networks: []*structs.NetworkResource{
&structs.NetworkResource{
Device: "eth0",
IP: "192.168.0.100",
ReservedPorts: []int{22},
MBits: 1,
},
},
},
Links: map[string]string{
"consul": "foobar.dc1",
@ -75,6 +82,12 @@ func Job() *structs.Job {
Resources: &structs.Resources{
CPU: 0.5,
MemoryMB: 256,
Networks: []*structs.NetworkResource{
&structs.NetworkResource{
MBits: 50,
DynamicPorts: 1,
},
},
},
},
},
@ -113,16 +126,30 @@ func Alloc() *structs.Allocation {
NodeID: "foo",
TaskGroup: "web",
Resources: &structs.Resources{
CPU: 1.0,
MemoryMB: 1024,
DiskMB: 1024,
IOPS: 10,
CPU: 0.5,
MemoryMB: 256,
Networks: []*structs.NetworkResource{
&structs.NetworkResource{
Public: true,
CIDR: "192.168.0.100/32",
Device: "eth0",
IP: "192.168.0.100",
ReservedPorts: []int{12345},
MBits: 100,
DynamicPorts: 1,
},
},
},
TaskResources: map[string]*structs.Resources{
"web": &structs.Resources{
CPU: 0.5,
MemoryMB: 256,
Networks: []*structs.NetworkResource{
&structs.NetworkResource{
Device: "eth0",
IP: "192.168.0.100",
ReservedPorts: []int{5000},
MBits: 50,
DynamicPorts: 1,
},
},
},
},

View File

@ -194,6 +194,6 @@ func evaluateNodePlan(snap *state.StateSnapshot, plan *structs.Plan, nodeID stri
proposed = append(proposed, plan.NodeAllocation[nodeID]...)
// Check if these allocations fit
fit, _, err := structs.AllocsFit(node, proposed)
fit, _, err := structs.AllocsFit(node, proposed, nil)
return fit, err
}

View File

@ -41,32 +41,13 @@ func FilterTerminalAllocs(allocs []*Allocation) []*Allocation {
return allocs[:n]
}
// PortsOvercommited checks if any ports are over-committed.
// This does not handle CIDR subsets, and computes for the entire
// CIDR block currently.
func PortsOvercommited(r *Resources) bool {
for _, net := range r.Networks {
ports := make(map[int]struct{})
for _, port := range net.ReservedPorts {
if _, ok := ports[port]; ok {
return true
}
ports[port] = struct{}{}
}
}
return false
}
// AllocsFit checks if a given set of allocations will fit on a node
func AllocsFit(node *Node, allocs []*Allocation) (bool, *Resources, error) {
// AllocsFit checks if a given set of allocations will fit on a node.
// The netIdx can optionally be provided if its already been computed.
// If the netIdx is provided, it is assumed that the client has already
// ensured there are no collisions.
func AllocsFit(node *Node, allocs []*Allocation, netIdx *NetworkIndex) (bool, *Resources, error) {
// Compute the utilization from zero
used := new(Resources)
for _, net := range node.Resources.Networks {
used.Networks = append(used.Networks, &NetworkResource{
Public: net.Public,
CIDR: net.CIDR,
})
}
// Add the reserved resources of the node
if node.Reserved != nil {
@ -88,8 +69,16 @@ func AllocsFit(node *Node, allocs []*Allocation) (bool, *Resources, error) {
return false, used, nil
}
// Ensure ports are not over commited
if PortsOvercommited(used) {
// Create the network index if missing
if netIdx == nil {
netIdx = NewNetworkIndex()
if netIdx.SetNode(node) || netIdx.AddAllocs(allocs) {
return false, used, nil
}
}
// Check if the network is overcommitted
if netIdx.Overcommitted() {
return false, used, nil
}

View File

@ -39,25 +39,48 @@ func TestFilterTerminalALlocs(t *testing.T) {
}
}
func TestPortsOvercommitted(t *testing.T) {
r := &Resources{
Networks: []*NetworkResource{
&NetworkResource{
ReservedPorts: []int{22, 80},
},
&NetworkResource{
ReservedPorts: []int{22, 80},
func TestAllocsFit_PortsOvercommitted(t *testing.T) {
n := &Node{
Resources: &Resources{
Networks: []*NetworkResource{
&NetworkResource{
CIDR: "10.0.0.0/8",
MBits: 100,
},
},
},
}
if PortsOvercommited(r) {
t.Fatalf("bad")
a1 := &Allocation{
TaskResources: map[string]*Resources{
"web": &Resources{
Networks: []*NetworkResource{
&NetworkResource{
IP: "10.0.0.1",
MBits: 50,
ReservedPorts: []int{8000},
},
},
},
},
}
// Overcommit 22
r.Networks[1].ReservedPorts[1] = 22
if !PortsOvercommited(r) {
t.Fatalf("bad")
// Should fit one allocation
fit, _, err := AllocsFit(n, []*Allocation{a1}, nil)
if err != nil {
t.Fatalf("err: %v", err)
}
if !fit {
t.Fatalf("Bad")
}
// Should not fit second allocation
fit, _, err = AllocsFit(n, []*Allocation{a1, a1}, nil)
if err != nil {
t.Fatalf("err: %v", err)
}
if fit {
t.Fatalf("Bad")
}
}
@ -82,7 +105,7 @@ func TestAllocsFit(t *testing.T) {
IOPS: 50,
Networks: []*NetworkResource{
&NetworkResource{
CIDR: "10.0.0.0/8",
IP: "10.0.0.1",
MBits: 50,
ReservedPorts: []int{80},
},
@ -98,7 +121,7 @@ func TestAllocsFit(t *testing.T) {
IOPS: 50,
Networks: []*NetworkResource{
&NetworkResource{
CIDR: "10.0.0.0/8",
IP: "10.0.0.1",
MBits: 50,
ReservedPorts: []int{8000},
},
@ -107,7 +130,7 @@ func TestAllocsFit(t *testing.T) {
}
// Should fit one allocation
fit, used, err := AllocsFit(n, []*Allocation{a1})
fit, used, err := AllocsFit(n, []*Allocation{a1}, nil)
if err != nil {
t.Fatalf("err: %v", err)
}
@ -124,7 +147,7 @@ func TestAllocsFit(t *testing.T) {
}
// Should not fit second allocation
fit, used, err = AllocsFit(n, []*Allocation{a1, a1})
fit, used, err = AllocsFit(n, []*Allocation{a1, a1}, nil)
if err != nil {
t.Fatalf("err: %v", err)
}

202
nomad/structs/network.go Normal file
View File

@ -0,0 +1,202 @@
package structs
import (
"fmt"
"math/rand"
"net"
)
const (
// MinDynamicPort is the smallest dynamic port generated
MinDynamicPort = 20000
// MaxDynamicPort is the largest dynamic port generated
MaxDynamicPort = 60000
// maxRandPortAttempts is the maximum number of attempt
// to assign a random port
maxRandPortAttempts = 20
)
// NetworkIndex is used to index the available network resources
// and the used network resources on a machine given allocations
type NetworkIndex struct {
AvailNetworks []*NetworkResource // List of available networks
AvailBandwidth map[string]int // Bandwidth by device
UsedPorts map[string]map[int]struct{} // Ports by IP
UsedBandwidth map[string]int // Bandwidth by device
}
// NewNetworkIndex is used to construct a new network index
func NewNetworkIndex() *NetworkIndex {
return &NetworkIndex{
AvailBandwidth: make(map[string]int),
UsedPorts: make(map[string]map[int]struct{}),
UsedBandwidth: make(map[string]int),
}
}
// Overcommitted checks if the network is overcommitted
func (idx *NetworkIndex) Overcommitted() bool {
for device, used := range idx.UsedBandwidth {
avail := idx.AvailBandwidth[device]
if used > avail {
return true
}
}
return false
}
// SetNode is used to setup the available network resources. Returns
// true if there is a collision
func (idx *NetworkIndex) SetNode(node *Node) (collide bool) {
// Add the available CIDR blocks
for _, n := range node.Resources.Networks {
if n.CIDR != "" {
idx.AvailNetworks = append(idx.AvailNetworks, n)
idx.AvailBandwidth[n.Device] = n.MBits
}
}
// Add the reserved resources
if r := node.Reserved; r != nil {
for _, n := range r.Networks {
if idx.AddReserved(n) {
collide = true
}
}
}
return
}
// AddAllocs is used to add the used network resources. Returns
// true if there is a collision
func (idx *NetworkIndex) AddAllocs(allocs []*Allocation) (collide bool) {
for _, alloc := range allocs {
for _, task := range alloc.TaskResources {
if len(task.Networks) == 0 {
continue
}
n := task.Networks[0]
if idx.AddReserved(n) {
collide = true
}
}
}
return
}
// AddReserved is used to add a reserved network usage, returns true
// if there is a port collision
func (idx *NetworkIndex) AddReserved(n *NetworkResource) (collide bool) {
// Add the port usage
used := idx.UsedPorts[n.IP]
if used == nil {
used = make(map[int]struct{})
idx.UsedPorts[n.IP] = used
}
for _, port := range n.ReservedPorts {
if _, ok := used[port]; ok {
collide = true
} else {
used[port] = struct{}{}
}
}
// Add the bandwidth
idx.UsedBandwidth[n.Device] += n.MBits
return
}
// yieldIP is used to iteratively invoke the callback with
// an available IP
func (idx *NetworkIndex) yieldIP(cb func(net *NetworkResource, ip net.IP) bool) {
inc := func(ip net.IP) {
for j := len(ip) - 1; j >= 0; j-- {
ip[j]++
if ip[j] > 0 {
break
}
}
}
for _, n := range idx.AvailNetworks {
ip, ipnet, err := net.ParseCIDR(n.CIDR)
if err != nil {
continue
}
for ip := ip.Mask(ipnet.Mask); ipnet.Contains(ip); inc(ip) {
if cb(n, ip) {
return
}
}
}
}
// AssignNetwork is used to assign network resources given an ask.
// If the ask cannot be satisfied, returns nil
func (idx *NetworkIndex) AssignNetwork(ask *NetworkResource) (out *NetworkResource, err error) {
idx.yieldIP(func(n *NetworkResource, ip net.IP) (stop bool) {
// Convert the IP to a string
ipStr := ip.String()
// Check if we would exceed the bandwidth cap
availBandwidth := idx.AvailBandwidth[n.Device]
usedBandwidth := idx.UsedBandwidth[n.Device]
if usedBandwidth+ask.MBits > availBandwidth {
err = fmt.Errorf("bandwidth exceeded")
return
}
// Check if any of the reserved ports are in use
for _, port := range ask.ReservedPorts {
if _, ok := idx.UsedPorts[ipStr][port]; ok {
err = fmt.Errorf("reserved port collision")
return
}
}
// Create the offer
offer := &NetworkResource{
Device: n.Device,
IP: ipStr,
ReservedPorts: ask.ReservedPorts,
}
// Check if we need to generate any ports
for i := 0; i < ask.DynamicPorts; i++ {
attempts := 0
PICK:
attempts++
if attempts > maxRandPortAttempts {
err = fmt.Errorf("dynamic port selection failed")
return
}
randPort := MinDynamicPort + rand.Intn(MaxDynamicPort-MinDynamicPort)
if _, ok := idx.UsedPorts[ipStr][randPort]; ok {
goto PICK
}
if IntContains(offer.ReservedPorts, randPort) {
goto PICK
}
offer.ReservedPorts = append(offer.ReservedPorts, randPort)
}
// Stop, we have an offer!
out = offer
err = nil
return true
})
return
}
// IntContains scans an integer slice for a value
func IntContains(haystack []int, needle int) bool {
for _, item := range haystack {
if item == needle {
return true
}
}
return false
}

View File

@ -0,0 +1,349 @@
package structs
import (
"net"
"reflect"
"testing"
)
func TestNetworkIndex_Overcommitted(t *testing.T) {
idx := NewNetworkIndex()
// Consume some network
reserved := &NetworkResource{
Device: "eth0",
IP: "192.168.0.100",
MBits: 505,
ReservedPorts: []int{8000, 9000},
}
collide := idx.AddReserved(reserved)
if collide {
t.Fatalf("bad")
}
if !idx.Overcommitted() {
t.Fatalf("have no resources")
}
// Add resources
n := &Node{
Resources: &Resources{
Networks: []*NetworkResource{
&NetworkResource{
Device: "eth0",
CIDR: "192.168.0.100/32",
MBits: 1000,
},
},
},
}
idx.SetNode(n)
if idx.Overcommitted() {
t.Fatalf("have resources")
}
// Double up our ussage
idx.AddReserved(reserved)
if !idx.Overcommitted() {
t.Fatalf("should be overcommitted")
}
}
func TestNetworkIndex_SetNode(t *testing.T) {
idx := NewNetworkIndex()
n := &Node{
Resources: &Resources{
Networks: []*NetworkResource{
&NetworkResource{
Device: "eth0",
CIDR: "192.168.0.100/32",
MBits: 1000,
},
},
},
Reserved: &Resources{
Networks: []*NetworkResource{
&NetworkResource{
Device: "eth0",
IP: "192.168.0.100",
ReservedPorts: []int{22},
MBits: 1,
},
},
},
}
collide := idx.SetNode(n)
if collide {
t.Fatalf("bad")
}
if len(idx.AvailNetworks) != 1 {
t.Fatalf("Bad")
}
if idx.AvailBandwidth["eth0"] != 1000 {
t.Fatalf("Bad")
}
if idx.UsedBandwidth["eth0"] != 1 {
t.Fatalf("Bad")
}
if _, ok := idx.UsedPorts["192.168.0.100"][22]; !ok {
t.Fatalf("Bad")
}
}
func TestNetworkIndex_AddAllocs(t *testing.T) {
idx := NewNetworkIndex()
allocs := []*Allocation{
&Allocation{
TaskResources: map[string]*Resources{
"web": &Resources{
Networks: []*NetworkResource{
&NetworkResource{
Device: "eth0",
IP: "192.168.0.100",
MBits: 20,
ReservedPorts: []int{8000, 9000},
},
},
},
},
},
&Allocation{
TaskResources: map[string]*Resources{
"api": &Resources{
Networks: []*NetworkResource{
&NetworkResource{
Device: "eth0",
IP: "192.168.0.100",
MBits: 50,
ReservedPorts: []int{10000},
},
},
},
},
},
}
collide := idx.AddAllocs(allocs)
if collide {
t.Fatalf("bad")
}
if idx.UsedBandwidth["eth0"] != 70 {
t.Fatalf("Bad")
}
if _, ok := idx.UsedPorts["192.168.0.100"][8000]; !ok {
t.Fatalf("Bad")
}
if _, ok := idx.UsedPorts["192.168.0.100"][9000]; !ok {
t.Fatalf("Bad")
}
if _, ok := idx.UsedPorts["192.168.0.100"][10000]; !ok {
t.Fatalf("Bad")
}
}
func TestNetworkIndex_AddReserved(t *testing.T) {
idx := NewNetworkIndex()
reserved := &NetworkResource{
Device: "eth0",
IP: "192.168.0.100",
MBits: 20,
ReservedPorts: []int{8000, 9000},
}
collide := idx.AddReserved(reserved)
if collide {
t.Fatalf("bad")
}
if idx.UsedBandwidth["eth0"] != 20 {
t.Fatalf("Bad")
}
if _, ok := idx.UsedPorts["192.168.0.100"][8000]; !ok {
t.Fatalf("Bad")
}
if _, ok := idx.UsedPorts["192.168.0.100"][9000]; !ok {
t.Fatalf("Bad")
}
// Try to reserve the same network
collide = idx.AddReserved(reserved)
if !collide {
t.Fatalf("bad")
}
}
func TestNetworkIndex_yieldIP(t *testing.T) {
idx := NewNetworkIndex()
n := &Node{
Resources: &Resources{
Networks: []*NetworkResource{
&NetworkResource{
Device: "eth0",
CIDR: "192.168.0.100/30",
MBits: 1000,
},
},
},
Reserved: &Resources{
Networks: []*NetworkResource{
&NetworkResource{
Device: "eth0",
IP: "192.168.0.100",
ReservedPorts: []int{22},
MBits: 1,
},
},
},
}
idx.SetNode(n)
var out []string
idx.yieldIP(func(n *NetworkResource, ip net.IP) (stop bool) {
out = append(out, ip.String())
return
})
expect := []string{"192.168.0.100", "192.168.0.101",
"192.168.0.102", "192.168.0.103"}
if !reflect.DeepEqual(out, expect) {
t.Fatalf("bad: %v", out)
}
}
func TestNetworkIndex_AssignNetwork(t *testing.T) {
idx := NewNetworkIndex()
n := &Node{
Resources: &Resources{
Networks: []*NetworkResource{
&NetworkResource{
Device: "eth0",
CIDR: "192.168.0.100/30",
MBits: 1000,
},
},
},
Reserved: &Resources{
Networks: []*NetworkResource{
&NetworkResource{
Device: "eth0",
IP: "192.168.0.100",
ReservedPorts: []int{22},
MBits: 1,
},
},
},
}
idx.SetNode(n)
allocs := []*Allocation{
&Allocation{
TaskResources: map[string]*Resources{
"web": &Resources{
Networks: []*NetworkResource{
&NetworkResource{
Device: "eth0",
IP: "192.168.0.100",
MBits: 20,
ReservedPorts: []int{8000, 9000},
},
},
},
},
},
&Allocation{
TaskResources: map[string]*Resources{
"api": &Resources{
Networks: []*NetworkResource{
&NetworkResource{
Device: "eth0",
IP: "192.168.0.100",
MBits: 50,
ReservedPorts: []int{10000},
},
},
},
},
},
}
idx.AddAllocs(allocs)
// Ask for a reserved port
ask := &NetworkResource{
ReservedPorts: []int{8000},
}
offer, err := idx.AssignNetwork(ask)
if err != nil {
t.Fatalf("err: %v", err)
}
if offer == nil {
t.Fatalf("bad")
}
if offer.IP != "192.168.0.101" {
t.Fatalf("bad: %#v", offer)
}
if len(offer.ReservedPorts) != 1 || offer.ReservedPorts[0] != 8000 {
t.Fatalf("bad: %#v", offer)
}
// Ask for dynamic ports
ask = &NetworkResource{
DynamicPorts: 3,
}
offer, err = idx.AssignNetwork(ask)
if err != nil {
t.Fatalf("err: %v", err)
}
if offer == nil {
t.Fatalf("bad")
}
if offer.IP != "192.168.0.100" {
t.Fatalf("bad: %#v", offer)
}
if len(offer.ReservedPorts) != 3 {
t.Fatalf("bad: %#v", offer)
}
// Ask for reserved + dynamic ports
ask = &NetworkResource{
ReservedPorts: []int{12345},
DynamicPorts: 3,
}
offer, err = idx.AssignNetwork(ask)
if err != nil {
t.Fatalf("err: %v", err)
}
if offer == nil {
t.Fatalf("bad")
}
if offer.IP != "192.168.0.100" {
t.Fatalf("bad: %#v", offer)
}
if len(offer.ReservedPorts) != 4 || offer.ReservedPorts[0] != 12345 {
t.Fatalf("bad: %#v", offer)
}
// Ask for too much bandwidth
ask = &NetworkResource{
MBits: 1000,
}
offer, err = idx.AssignNetwork(ask)
if err.Error() != "bandwidth exceeded" {
t.Fatalf("err: %v", err)
}
if offer != nil {
t.Fatalf("bad")
}
}
func TestIntContains(t *testing.T) {
l := []int{1, 2, 10, 20}
if IntContains(l, 50) {
t.Fatalf("bad")
}
if !IntContains(l, 20) {
t.Fatalf("bad")
}
if !IntContains(l, 1) {
t.Fatalf("bad")
}
}

View File

@ -538,12 +538,22 @@ type Resources struct {
Networks []*NetworkResource
}
// NetIndexByCIDR scans the list of networks for a matching
// CIDR, returning the index. This currently ONLY handles
// an exact match and not a subset CIDR.
func (r *Resources) NetIndexByCIDR(cidr string) int {
// Copy returns a deep copy of the resources
func (r *Resources) Copy() *Resources {
newR := new(Resources)
*newR = *r
n := len(r.Networks)
newR.Networks = make([]*NetworkResource, n)
for i := 0; i < n; i++ {
newR.Networks[i] = r.Networks[i].Copy()
}
return newR
}
// NetIndex finds the matching net index using device name
func (r *Resources) NetIndex(n *NetworkResource) int {
for idx, net := range r.Networks {
if net.CIDR == cidr {
if net.Device == n.Device {
return idx
}
}
@ -551,7 +561,8 @@ func (r *Resources) NetIndexByCIDR(cidr string) int {
}
// Superset checks if one set of resources is a superset
// of another.
// of another. This ignores network resources, and the NetworkIndex
// should be used for that.
func (r *Resources) Superset(other *Resources) bool {
if r.CPU < other.CPU {
return false
@ -565,21 +576,6 @@ func (r *Resources) Superset(other *Resources) bool {
if r.IOPS < other.IOPS {
return false
}
for _, net := range r.Networks {
idx := other.NetIndexByCIDR(net.CIDR)
if idx >= 0 {
if net.MBits < other.Networks[idx].MBits {
return false
}
}
}
// Check that other does not have a network we are missing
for _, net := range other.Networks {
idx := r.NetIndexByCIDR(net.CIDR)
if idx == -1 {
return false
}
}
return true
}
@ -594,12 +590,14 @@ func (r *Resources) Add(delta *Resources) error {
r.DiskMB += delta.DiskMB
r.IOPS += delta.IOPS
for _, net := range delta.Networks {
idx := r.NetIndexByCIDR(net.CIDR)
for _, n := range delta.Networks {
// Find the matching interface by IP or CIDR
idx := r.NetIndex(n)
if idx == -1 {
return fmt.Errorf("missing network for CIDR %s", net.CIDR)
r.Networks = append(r.Networks, n.Copy())
} else {
r.Networks[idx].Add(n)
}
r.Networks[idx].Add(net)
}
return nil
}
@ -607,10 +605,21 @@ func (r *Resources) Add(delta *Resources) error {
// NetworkResource is used to represesent available network
// resources
type NetworkResource struct {
Public bool // Is this a public address?
Device string // Name of the device
CIDR string // CIDR block of addresses
IP string // IP address
ReservedPorts []int // Reserved ports
MBits int // Throughput
DynamicPorts int // Dynamically assigned ports
}
// Copy returns a deep copy of the network resource
func (n *NetworkResource) Copy() *NetworkResource {
newR := new(NetworkResource)
*newR = *n
newR.ReservedPorts = make([]int, len(n.ReservedPorts))
copy(newR.ReservedPorts, n.ReservedPorts)
return newR
}
// Add adds the resources of the delta to this, potentially
@ -620,13 +629,13 @@ func (n *NetworkResource) Add(delta *NetworkResource) {
n.ReservedPorts = append(n.ReservedPorts, delta.ReservedPorts...)
}
n.MBits += delta.MBits
n.DynamicPorts += delta.DynamicPorts
}
const (
// JobTypeNomad is reserved for internal system tasks and is
// always handled by the CoreScheduler.
JobTypeCore = "_core"
JobTypeSystem = "system"
JobTypeService = "service"
JobTypeBatch = "batch"
)
@ -871,10 +880,14 @@ type Allocation struct {
// TaskGroup is the name of the task group that should be run
TaskGroup string
// Resources is the set of resources allocated as part
// Resources is the total set of resources allocated as part
// of this allocation of the task group.
Resources *Resources
// TaskResources is the set of resources allocated to each
// task. These should sum to the total Resources.
TaskResources map[string]*Resources
// Metrics associated with this allocation
Metrics *AllocMetric
@ -964,6 +977,9 @@ type AllocMetric struct {
// ClassExhausted is the number of nodes exhausted by class
ClassExhausted map[string]int
// DimensionExhaused provides the count by dimension or reason
DimensionExhaused map[string]int
// Scores is the scores of the final few nodes remaining
// for placement. The top score is typically selected.
Scores map[string]float64
@ -999,7 +1015,7 @@ func (a *AllocMetric) FilterNode(node *Node, constraint string) {
}
}
func (a *AllocMetric) ExhaustedNode(node *Node) {
func (a *AllocMetric) ExhaustedNode(node *Node, dimension string) {
a.NodesExhausted += 1
if node != nil && node.NodeClass != "" {
if a.ClassExhausted == nil {
@ -1007,6 +1023,12 @@ func (a *AllocMetric) ExhaustedNode(node *Node) {
}
a.ClassExhausted[node.NodeClass] += 1
}
if dimension != "" {
if a.DimensionExhaused == nil {
a.DimensionExhaused = make(map[string]int)
}
a.DimensionExhaused[dimension] += 1
}
}
func (a *AllocMetric) ScoreNode(node *Node, name string, score float64) {

View File

@ -5,20 +5,21 @@ import (
"testing"
)
func TestResource_NetIndexByCIDR(t *testing.T) {
func TestResource_NetIndex(t *testing.T) {
r := &Resources{
Networks: []*NetworkResource{
&NetworkResource{CIDR: "10.0.0.0/8"},
&NetworkResource{CIDR: "127.0.0.0/24"},
&NetworkResource{Device: "eth0"},
&NetworkResource{Device: "lo0"},
&NetworkResource{Device: ""},
},
}
if idx := r.NetIndexByCIDR("10.0.0.0/8"); idx != 0 {
if idx := r.NetIndex(&NetworkResource{Device: "eth0"}); idx != 0 {
t.Fatalf("Bad: %d", idx)
}
if idx := r.NetIndexByCIDR("127.0.0.0/24"); idx != 1 {
if idx := r.NetIndex(&NetworkResource{Device: "lo0"}); idx != 1 {
t.Fatalf("Bad: %d", idx)
}
if idx := r.NetIndexByCIDR("10.0.0.0/16"); idx != -1 {
if idx := r.NetIndex(&NetworkResource{Device: "eth1"}); idx != -1 {
t.Fatalf("Bad: %d", idx)
}
}
@ -29,24 +30,12 @@ func TestResource_Superset(t *testing.T) {
MemoryMB: 2048,
DiskMB: 10000,
IOPS: 100,
Networks: []*NetworkResource{
&NetworkResource{
CIDR: "10.0.0.0/8",
MBits: 100,
},
},
}
r2 := &Resources{
CPU: 1.0,
MemoryMB: 1024,
DiskMB: 5000,
IOPS: 50,
Networks: []*NetworkResource{
&NetworkResource{
CIDR: "10.0.0.0/8",
MBits: 50,
},
},
}
if !r1.Superset(r1) {
@ -84,7 +73,7 @@ func TestResource_Add(t *testing.T) {
IOPS: 50,
Networks: []*NetworkResource{
&NetworkResource{
CIDR: "10.0.0.0/8",
IP: "10.0.0.1",
MBits: 50,
ReservedPorts: []int{80},
},
@ -115,6 +104,48 @@ func TestResource_Add(t *testing.T) {
}
}
func TestResource_Add_Network(t *testing.T) {
r1 := &Resources{}
r2 := &Resources{
Networks: []*NetworkResource{
&NetworkResource{
MBits: 50,
DynamicPorts: 2,
},
},
}
r3 := &Resources{
Networks: []*NetworkResource{
&NetworkResource{
MBits: 25,
DynamicPorts: 1,
},
},
}
err := r1.Add(r2)
if err != nil {
t.Fatalf("Err: %v", err)
}
err = r1.Add(r3)
if err != nil {
t.Fatalf("Err: %v", err)
}
expect := &Resources{
Networks: []*NetworkResource{
&NetworkResource{
MBits: 75,
DynamicPorts: 3,
},
},
}
if !reflect.DeepEqual(expect.Networks, r1.Networks) {
t.Fatalf("bad: %#v %#v", expect, r1)
}
}
func TestEncodeDecode(t *testing.T) {
type FooRequest struct {
Foo string

View File

@ -311,6 +311,15 @@ func (s *GenericScheduler) inplaceUpdate(updates []allocTuple) []allocTuple {
continue
}
// Restore the network offers from the existing allocation.
// We do not allow network resources (reserved/dynamic ports)
// to be updated. This is guarded in taskUpdated, so we can
// safely restore those here.
for task, resources := range option.TaskResources {
existing := update.Alloc.TaskResources[task]
resources.Networks = existing.Networks
}
// Create a shallow copy
newAlloc := new(structs.Allocation)
*newAlloc = *update.Alloc
@ -319,6 +328,7 @@ func (s *GenericScheduler) inplaceUpdate(updates []allocTuple) []allocTuple {
newAlloc.EvalID = s.eval.ID
newAlloc.Job = s.job
newAlloc.Resources = size
newAlloc.TaskResources = option.TaskResources
newAlloc.Metrics = s.ctx.Metrics()
newAlloc.DesiredStatus = structs.AllocDesiredStatusRun
newAlloc.ClientStatus = structs.AllocClientStatusPending
@ -361,36 +371,29 @@ func (s *GenericScheduler) computePlacements(place []allocTuple) error {
// Attempt to match the task group
option, size := s.stack.Select(missing.TaskGroup)
// Handle a placement failure
var nodeID, status, desc, clientStatus string
if option == nil {
status = structs.AllocDesiredStatusFailed
desc = "failed to find a node for placement"
clientStatus = structs.AllocClientStatusFailed
} else {
nodeID = option.Node.ID
status = structs.AllocDesiredStatusRun
clientStatus = structs.AllocClientStatusPending
}
// Create an allocation for this
alloc := &structs.Allocation{
ID: structs.GenerateUUID(),
EvalID: s.eval.ID,
Name: missing.Name,
NodeID: nodeID,
JobID: s.job.ID,
Job: s.job,
TaskGroup: missing.TaskGroup.Name,
Resources: size,
Metrics: s.ctx.Metrics(),
DesiredStatus: status,
DesiredDescription: desc,
ClientStatus: clientStatus,
ID: structs.GenerateUUID(),
EvalID: s.eval.ID,
Name: missing.Name,
JobID: s.job.ID,
Job: s.job,
TaskGroup: missing.TaskGroup.Name,
Resources: size,
Metrics: s.ctx.Metrics(),
}
if nodeID != "" {
// Set fields based on if we found an allocation option
if option != nil {
alloc.NodeID = option.Node.ID
alloc.TaskResources = option.TaskResources
alloc.DesiredStatus = structs.AllocDesiredStatusRun
alloc.ClientStatus = structs.AllocClientStatusPending
s.plan.AppendAlloc(alloc)
} else {
alloc.DesiredStatus = structs.AllocDesiredStatusFailed
alloc.DesiredDescription = "failed to find a node for placement"
alloc.ClientStatus = structs.AllocClientStatusFailed
s.plan.AppendFailed(alloc)
failedTG[missing.TaskGroup] = alloc
}

View File

@ -382,6 +382,15 @@ func TestServiceSched_JobModify_InPlace(t *testing.T) {
t.Fatalf("bad: %#v", out)
}
h.AssertEvalStatus(t, structs.EvalStatusComplete)
// Verify the network did not change
for _, alloc := range out {
for _, resources := range alloc.TaskResources {
if resources.Networks[0].ReservedPorts[0] != 5000 {
t.Fatalf("bad: %#v", alloc)
}
}
}
}
func TestServiceSched_JobDeregister(t *testing.T) {

View File

@ -10,8 +10,9 @@ import (
// along with a node when iterating. This state can be modified as
// various rank methods are applied.
type RankedNode struct {
Node *structs.Node
Score float64
Node *structs.Node
Score float64
TaskResources map[string]*structs.Resources
// Allocs is used to cache the proposed allocations on the
// node. This can be shared between iterators that require it.
@ -35,6 +36,14 @@ func (r *RankedNode) ProposedAllocs(ctx Context) ([]*structs.Allocation, error)
return p, nil
}
func (r *RankedNode) SetTaskResources(task *structs.Task,
resource *structs.Resources) {
if r.TaskResources == nil {
r.TaskResources = make(map[string]*structs.Resources)
}
r.TaskResources[task.Name] = resource
}
// RankFeasibleIterator is used to iteratively yield nodes along
// with ranking metadata. The iterators may manage some state for
// performance optimizations.
@ -122,35 +131,35 @@ func (iter *StaticRankIterator) Reset() {
// BinPackIterator is a RankIterator that scores potential options
// based on a bin-packing algorithm.
type BinPackIterator struct {
ctx Context
source RankIterator
resources *structs.Resources
evict bool
priority int
ctx Context
source RankIterator
evict bool
priority int
tasks []*structs.Task
}
// NewBinPackIterator returns a BinPackIterator which tries to fit the given
// resources, potentially evicting other tasks based on a given priority.
func NewBinPackIterator(ctx Context, source RankIterator, resources *structs.Resources, evict bool, priority int) *BinPackIterator {
// NewBinPackIterator returns a BinPackIterator which tries to fit tasks
// potentially evicting other tasks based on a given priority.
func NewBinPackIterator(ctx Context, source RankIterator, evict bool, priority int) *BinPackIterator {
iter := &BinPackIterator{
ctx: ctx,
source: source,
resources: resources,
evict: evict,
priority: priority,
ctx: ctx,
source: source,
evict: evict,
priority: priority,
}
return iter
}
func (iter *BinPackIterator) SetResources(r *structs.Resources) {
iter.resources = r
}
func (iter *BinPackIterator) SetPriority(p int) {
iter.priority = p
}
func (iter *BinPackIterator) SetTasks(tasks []*structs.Task) {
iter.tasks = tasks
}
func (iter *BinPackIterator) Next() *RankedNode {
OUTER:
for {
// Get the next potential option
option := iter.source.Next()
@ -167,13 +176,47 @@ func (iter *BinPackIterator) Next() *RankedNode {
continue
}
// Index the existing network usage
netIdx := structs.NewNetworkIndex()
netIdx.SetNode(option.Node)
netIdx.AddAllocs(proposed)
// Assign the resources for each task
total := new(structs.Resources)
for _, task := range iter.tasks {
taskResources := task.Resources.Copy()
// Check if we need a network resource
if len(taskResources.Networks) > 0 {
ask := taskResources.Networks[0]
offer, err := netIdx.AssignNetwork(ask)
if offer == nil {
iter.ctx.Metrics().ExhaustedNode(option.Node,
fmt.Sprintf("network: %s", err))
continue OUTER
}
// Reserve this to prevent another task from colliding
netIdx.AddReserved(offer)
// Update the network ask to the offer
taskResources.Networks = []*structs.NetworkResource{offer}
}
// Store the task resource
option.SetTaskResources(task, taskResources)
// Accumulate the total resource requirement
total.Add(taskResources)
}
// Add the resources we are trying to fit
proposed = append(proposed, &structs.Allocation{Resources: iter.resources})
proposed = append(proposed, &structs.Allocation{Resources: total})
// Check if these allocations fit, if they do not, simply skip this node
fit, util, _ := structs.AllocsFit(option.Node, proposed)
fit, util, _ := structs.AllocsFit(option.Node, proposed, netIdx)
if !fit {
iter.ctx.Metrics().ExhaustedNode(option.Node)
iter.ctx.Metrics().ExhaustedNode(option.Node, "resources")
continue
}

View File

@ -68,11 +68,16 @@ func TestBinPackIterator_NoExistingAlloc(t *testing.T) {
}
static := NewStaticRankIterator(ctx, nodes)
resources := &structs.Resources{
CPU: 1024,
MemoryMB: 1024,
task := &structs.Task{
Name: "web",
Resources: &structs.Resources{
CPU: 1024,
MemoryMB: 1024,
},
}
binp := NewBinPackIterator(ctx, static, resources, false, 0)
binp := NewBinPackIterator(ctx, static, false, 0)
binp.SetTasks([]*structs.Task{task})
out := collectRanked(binp)
if len(out) != 2 {
@ -137,11 +142,16 @@ func TestBinPackIterator_PlannedAlloc(t *testing.T) {
},
}
resources := &structs.Resources{
CPU: 1024,
MemoryMB: 1024,
task := &structs.Task{
Name: "web",
Resources: &structs.Resources{
CPU: 1024,
MemoryMB: 1024,
},
}
binp := NewBinPackIterator(ctx, static, resources, false, 0)
binp := NewBinPackIterator(ctx, static, false, 0)
binp.SetTasks([]*structs.Task{task})
out := collectRanked(binp)
if len(out) != 1 {
@ -207,11 +217,16 @@ func TestBinPackIterator_ExistingAlloc(t *testing.T) {
}
noErr(t, state.UpsertAllocs(1000, []*structs.Allocation{alloc1, alloc2}))
resources := &structs.Resources{
CPU: 1024,
MemoryMB: 1024,
task := &structs.Task{
Name: "web",
Resources: &structs.Resources{
CPU: 1024,
MemoryMB: 1024,
},
}
binp := NewBinPackIterator(ctx, static, resources, false, 0)
binp := NewBinPackIterator(ctx, static, false, 0)
binp.SetTasks([]*structs.Task{task})
out := collectRanked(binp)
if len(out) != 1 {
@ -280,11 +295,16 @@ func TestBinPackIterator_ExistingAlloc_PlannedEvict(t *testing.T) {
plan := ctx.Plan()
plan.NodeUpdate[nodes[0].Node.ID] = []*structs.Allocation{alloc1}
resources := &structs.Resources{
CPU: 1024,
MemoryMB: 1024,
task := &structs.Task{
Name: "web",
Resources: &structs.Resources{
CPU: 1024,
MemoryMB: 1024,
},
}
binp := NewBinPackIterator(ctx, static, resources, false, 0)
binp := NewBinPackIterator(ctx, static, false, 0)
binp.SetTasks([]*structs.Task{task})
out := collectRanked(binp)
if len(out) != 2 {

View File

@ -76,7 +76,7 @@ func NewGenericStack(batch bool, ctx Context, baseNodes []*structs.Node) *Generi
// by a particular task group. Only enable eviction for the service
// scheduler as that logic is expensive.
evict := !batch
s.binPack = NewBinPackIterator(ctx, rankSource, nil, evict, 0)
s.binPack = NewBinPackIterator(ctx, rankSource, evict, 0)
// Apply the job anti-affinity iterator. This is to avoid placing
// multiple allocations on the same node for this job. The penalty
@ -149,11 +149,18 @@ func (s *GenericStack) Select(tg *structs.TaskGroup) (*RankedNode, *structs.Reso
// Update the parameters of iterators
s.taskGroupDrivers.SetDrivers(drivers)
s.taskGroupConstraint.SetConstraints(constr)
s.binPack.SetResources(size)
s.binPack.SetTasks(tg.Tasks)
// Find the node with the max score
option := s.maxScore.Next()
// Ensure that the task resources were specified
if option != nil && len(option.TaskResources) != len(tg.Tasks) {
for _, task := range tg.Tasks {
option.SetTaskResources(task, task.Resources)
}
}
// Store the compute time
s.ctx.Metrics().AllocationTime = time.Since(start)
return option, size

View File

@ -227,6 +227,11 @@ func tasksUpdated(a, b *structs.TaskGroup) bool {
if !reflect.DeepEqual(at.Config, bt.Config) {
return true
}
// Inspect the network to see if the resource ask is different
if !reflect.DeepEqual(at.Resources.Networks, bt.Resources.Networks) {
return true
}
}
return false
}

View File

@ -259,4 +259,10 @@ func TestTasksUpdated(t *testing.T) {
if !tasksUpdated(j1.TaskGroups[0], j5.TaskGroups[0]) {
t.Fatalf("bad")
}
j6 := mock.Job()
j6.TaskGroups[0].Tasks[0].Resources.Networks[0].DynamicPorts = 3
if !tasksUpdated(j1.TaskGroups[0], j6.TaskGroups[0]) {
t.Fatalf("bad")
}
}