scheduler: adding helper library for network assignments

This commit is contained in:
Armon Dadgar 2015-09-12 19:34:46 -07:00
parent a829c8db50
commit 5c8f1c0fa5
3 changed files with 387 additions and 4 deletions

View file

@ -20,10 +20,9 @@ func Node() *structs.Node {
IOPS: 150,
Networks: []*structs.NetworkResource{
&structs.NetworkResource{
Device: "eth0",
CIDR: "192.168.0.100/32",
ReservedPorts: []int{22},
MBits: 1000,
Device: "eth0",
CIDR: "192.168.0.100/32",
MBits: 1000,
},
},
},
@ -31,6 +30,14 @@ func Node() *structs.Node {
CPU: 0.1,
MemoryMB: 256,
DiskMB: 4 * 1024,
Networks: []*structs.NetworkResource{
&structs.NetworkResource{
Device: "eth0",
IP: "192.168.0.100",
ReservedPorts: []int{22},
MBits: 1,
},
},
},
Links: map[string]string{
"consul": "foobar.dc1",

173
scheduler/network.go Normal file
View file

@ -0,0 +1,173 @@
package scheduler
import (
"math/rand"
"net"
"github.com/hashicorp/nomad/nomad/structs"
)
const (
// MinDynamicPort is the smallest dynamic port generated
MinDynamicPort = 20000
// MaxDynamicPort is the largest dynamic port generated
MaxDynamicPort = 60000
// maxRandPortAttempts is the maximum number of attempt
// to assign a random port
maxRandPortAttempts = 20
)
// NetworkIndex is used to index the available network resources
// and the used network resources on a machine given allocations
type NetworkIndex struct {
AvailNetworks []*structs.NetworkResource // List of available networks
AvailBandwidth map[string]int // Bandwidth by device
UsedPorts map[string]map[int]struct{} // Ports by IP
UsedBandwidth map[string]int // Bandwidth by device
}
// NewNetworkIndex is used to construct a new network index
func NewNetworkIndex() *NetworkIndex {
return &NetworkIndex{
AvailBandwidth: make(map[string]int),
UsedPorts: make(map[string]map[int]struct{}),
UsedBandwidth: make(map[string]int),
}
}
// SetNode is used to setup the available network resources
func (idx *NetworkIndex) SetNode(node *structs.Node) {
// Add the available CIDR blocks
for _, n := range node.Resources.Networks {
if n.CIDR != "" {
idx.AvailNetworks = append(idx.AvailNetworks, n)
idx.AvailBandwidth[n.Device] = n.MBits
}
}
// Add the reserved resources
if r := node.Reserved; r != nil {
for _, n := range r.Networks {
idx.addReserved(n)
}
}
}
// AddAllocs is used to add the used network resources
func (idx *NetworkIndex) AddAllocs(allocs []*structs.Allocation) {
for _, alloc := range allocs {
for _, task := range alloc.TaskResources {
if len(task.Networks) == 0 {
continue
}
n := task.Networks[0]
idx.addReserved(n)
}
}
}
// addReserved is used to add a reserved network usage
func (idx *NetworkIndex) addReserved(n *structs.NetworkResource) {
// Add the port usage
used := idx.UsedPorts[n.IP]
if used == nil {
used = make(map[int]struct{})
idx.UsedPorts[n.IP] = used
}
for _, port := range n.ReservedPorts {
used[port] = struct{}{}
}
// Add the bandwidth
idx.UsedBandwidth[n.Device] += n.MBits
}
// yieldIP is used to iteratively invoke the callback with
// an available IP
func (idx *NetworkIndex) yieldIP(cb func(net *structs.NetworkResource, ip net.IP) bool) {
inc := func(ip net.IP) {
for j := len(ip) - 1; j >= 0; j-- {
ip[j]++
if ip[j] > 0 {
break
}
}
}
for _, n := range idx.AvailNetworks {
ip, ipnet, err := net.ParseCIDR(n.CIDR)
if err != nil {
continue
}
for ip := ip.Mask(ipnet.Mask); ipnet.Contains(ip); inc(ip) {
if cb(n, ip) {
return
}
}
}
}
// AssignNetwork is used to assign network resources given an ask
func (idx *NetworkIndex) AssignNetwork(ask *structs.NetworkResource) (out *structs.NetworkResource) {
idx.yieldIP(func(n *structs.NetworkResource, ip net.IP) (stop bool) {
// Convert the IP to a string
ipStr := ip.String()
// Check if we would exceed the bandwidth cap
availBandwidth := idx.AvailBandwidth[n.Device]
usedBandwidth := idx.UsedBandwidth[n.Device]
if usedBandwidth+ask.MBits > availBandwidth {
return
}
// Check if any of the reserved ports are in use
for _, port := range ask.ReservedPorts {
if _, ok := idx.UsedPorts[ipStr][port]; ok {
return false
}
}
// Create the offer
offer := &structs.NetworkResource{
Device: n.Device,
IP: ipStr,
ReservedPorts: ask.ReservedPorts,
}
// Check if we need to generate any ports
for i := 0; i < ask.DynamicPorts; i++ {
attempts := 0
PICK:
attempts++
if attempts > maxRandPortAttempts {
return
}
randPort := MinDynamicPort + rand.Intn(MaxDynamicPort-MinDynamicPort)
if _, ok := idx.UsedPorts[ipStr][randPort]; ok {
goto PICK
}
if intContains(offer.ReservedPorts, randPort) {
goto PICK
}
offer.ReservedPorts = append(offer.ReservedPorts, randPort)
}
// Stop, we have an offer!
out = offer
return true
})
return
}
// intContains scans an integer slice for a value
func intContains(haystack []int, needle int) bool {
for _, item := range haystack {
if item == needle {
return true
}
}
return false
}

203
scheduler/network_test.go Normal file
View file

@ -0,0 +1,203 @@
package scheduler
import (
"net"
"reflect"
"testing"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
)
func TestNetworkIndex_SetNode(t *testing.T) {
idx := NewNetworkIndex()
n := mock.Node()
idx.SetNode(n)
if len(idx.AvailNetworks) != 1 {
t.Fatalf("Bad")
}
if idx.AvailBandwidth["eth0"] != 1000 {
t.Fatalf("Bad")
}
if idx.UsedBandwidth["eth0"] != 1 {
t.Fatalf("Bad")
}
if _, ok := idx.UsedPorts["192.168.0.100"][22]; !ok {
t.Fatalf("Bad")
}
}
func TestNetworkIndex_AddAllocs(t *testing.T) {
idx := NewNetworkIndex()
allocs := []*structs.Allocation{
&structs.Allocation{
TaskResources: map[string]*structs.Resources{
"web": &structs.Resources{
Networks: []*structs.NetworkResource{
&structs.NetworkResource{
Device: "eth0",
IP: "192.168.0.100",
MBits: 20,
ReservedPorts: []int{8000, 9000},
},
},
},
},
},
&structs.Allocation{
TaskResources: map[string]*structs.Resources{
"api": &structs.Resources{
Networks: []*structs.NetworkResource{
&structs.NetworkResource{
Device: "eth0",
IP: "192.168.0.100",
MBits: 50,
ReservedPorts: []int{10000},
},
},
},
},
},
}
idx.AddAllocs(allocs)
if idx.UsedBandwidth["eth0"] != 70 {
t.Fatalf("Bad")
}
if _, ok := idx.UsedPorts["192.168.0.100"][8000]; !ok {
t.Fatalf("Bad")
}
if _, ok := idx.UsedPorts["192.168.0.100"][9000]; !ok {
t.Fatalf("Bad")
}
if _, ok := idx.UsedPorts["192.168.0.100"][10000]; !ok {
t.Fatalf("Bad")
}
}
func TestNetworkIndex_yieldIP(t *testing.T) {
idx := NewNetworkIndex()
n := mock.Node()
n.Resources.Networks[0].CIDR = "192.168.0.100/30"
idx.SetNode(n)
var out []string
idx.yieldIP(func(n *structs.NetworkResource, ip net.IP) (stop bool) {
out = append(out, ip.String())
return
})
expect := []string{"192.168.0.100", "192.168.0.101",
"192.168.0.102", "192.168.0.103"}
if !reflect.DeepEqual(out, expect) {
t.Fatalf("bad: %v", out)
}
}
func TestNetworkIndex_AssignNetwork(t *testing.T) {
idx := NewNetworkIndex()
n := mock.Node()
n.Resources.Networks[0].CIDR = "192.168.0.100/30"
idx.SetNode(n)
allocs := []*structs.Allocation{
&structs.Allocation{
TaskResources: map[string]*structs.Resources{
"web": &structs.Resources{
Networks: []*structs.NetworkResource{
&structs.NetworkResource{
Device: "eth0",
IP: "192.168.0.100",
MBits: 20,
ReservedPorts: []int{8000, 9000},
},
},
},
},
},
&structs.Allocation{
TaskResources: map[string]*structs.Resources{
"api": &structs.Resources{
Networks: []*structs.NetworkResource{
&structs.NetworkResource{
Device: "eth0",
IP: "192.168.0.100",
MBits: 50,
ReservedPorts: []int{10000},
},
},
},
},
},
}
idx.AddAllocs(allocs)
// Ask for a reserved port
ask := &structs.NetworkResource{
ReservedPorts: []int{8000},
}
offer := idx.AssignNetwork(ask)
if offer == nil {
t.Fatalf("bad")
}
if offer.IP != "192.168.0.101" {
t.Fatalf("bad: %#v", offer)
}
if len(offer.ReservedPorts) != 1 || offer.ReservedPorts[0] != 8000 {
t.Fatalf("bad: %#v", offer)
}
// Ask for dynamic ports
ask = &structs.NetworkResource{
DynamicPorts: 3,
}
offer = idx.AssignNetwork(ask)
if offer == nil {
t.Fatalf("bad")
}
if offer.IP != "192.168.0.100" {
t.Fatalf("bad: %#v", offer)
}
if len(offer.ReservedPorts) != 3 {
t.Fatalf("bad: %#v", offer)
}
// Ask for reserved + dynamic ports
ask = &structs.NetworkResource{
ReservedPorts: []int{12345},
DynamicPorts: 3,
}
offer = idx.AssignNetwork(ask)
if offer == nil {
t.Fatalf("bad")
}
if offer.IP != "192.168.0.100" {
t.Fatalf("bad: %#v", offer)
}
if len(offer.ReservedPorts) != 4 || offer.ReservedPorts[0] != 12345 {
t.Fatalf("bad: %#v", offer)
}
// Ask for too much bandwidth
ask = &structs.NetworkResource{
MBits: 1000,
}
offer = idx.AssignNetwork(ask)
if offer != nil {
t.Fatalf("bad")
}
}
func TestIntContains(t *testing.T) {
l := []int{1, 2, 10, 20}
if intContains(l, 50) {
t.Fatalf("bad")
}
if !intContains(l, 20) {
t.Fatalf("bad")
}
if !intContains(l, 1) {
t.Fatalf("bad")
}
}