diff --git a/nomad/mock/mock.go b/nomad/mock/mock.go index 13fc5af01..1b8fa528b 100644 --- a/nomad/mock/mock.go +++ b/nomad/mock/mock.go @@ -20,10 +20,9 @@ func Node() *structs.Node { IOPS: 150, Networks: []*structs.NetworkResource{ &structs.NetworkResource{ - Public: true, - CIDR: "192.168.0.100/32", - ReservedPorts: []int{22}, - MBits: 1000, + Device: "eth0", + CIDR: "192.168.0.100/32", + MBits: 1000, }, }, }, @@ -31,6 +30,14 @@ func Node() *structs.Node { CPU: 0.1, MemoryMB: 256, DiskMB: 4 * 1024, + Networks: []*structs.NetworkResource{ + &structs.NetworkResource{ + Device: "eth0", + IP: "192.168.0.100", + ReservedPorts: []int{22}, + MBits: 1, + }, + }, }, Links: map[string]string{ "consul": "foobar.dc1", @@ -75,6 +82,12 @@ func Job() *structs.Job { Resources: &structs.Resources{ CPU: 0.5, MemoryMB: 256, + Networks: []*structs.NetworkResource{ + &structs.NetworkResource{ + MBits: 50, + DynamicPorts: 1, + }, + }, }, }, }, @@ -113,16 +126,30 @@ func Alloc() *structs.Allocation { NodeID: "foo", TaskGroup: "web", Resources: &structs.Resources{ - CPU: 1.0, - MemoryMB: 1024, - DiskMB: 1024, - IOPS: 10, + CPU: 0.5, + MemoryMB: 256, Networks: []*structs.NetworkResource{ &structs.NetworkResource{ - Public: true, - CIDR: "192.168.0.100/32", + Device: "eth0", + IP: "192.168.0.100", ReservedPorts: []int{12345}, MBits: 100, + DynamicPorts: 1, + }, + }, + }, + TaskResources: map[string]*structs.Resources{ + "web": &structs.Resources{ + CPU: 0.5, + MemoryMB: 256, + Networks: []*structs.NetworkResource{ + &structs.NetworkResource{ + Device: "eth0", + IP: "192.168.0.100", + ReservedPorts: []int{5000}, + MBits: 50, + DynamicPorts: 1, + }, }, }, }, diff --git a/nomad/plan_apply.go b/nomad/plan_apply.go index 06d7b0735..ad45749cb 100644 --- a/nomad/plan_apply.go +++ b/nomad/plan_apply.go @@ -194,6 +194,6 @@ func evaluateNodePlan(snap *state.StateSnapshot, plan *structs.Plan, nodeID stri proposed = append(proposed, plan.NodeAllocation[nodeID]...) // Check if these allocations fit - fit, _, err := structs.AllocsFit(node, proposed) + fit, _, err := structs.AllocsFit(node, proposed, nil) return fit, err } diff --git a/nomad/structs/funcs.go b/nomad/structs/funcs.go index 016b327ca..4d2bae575 100644 --- a/nomad/structs/funcs.go +++ b/nomad/structs/funcs.go @@ -41,32 +41,13 @@ func FilterTerminalAllocs(allocs []*Allocation) []*Allocation { return allocs[:n] } -// PortsOvercommited checks if any ports are over-committed. -// This does not handle CIDR subsets, and computes for the entire -// CIDR block currently. -func PortsOvercommited(r *Resources) bool { - for _, net := range r.Networks { - ports := make(map[int]struct{}) - for _, port := range net.ReservedPorts { - if _, ok := ports[port]; ok { - return true - } - ports[port] = struct{}{} - } - } - return false -} - -// AllocsFit checks if a given set of allocations will fit on a node -func AllocsFit(node *Node, allocs []*Allocation) (bool, *Resources, error) { +// AllocsFit checks if a given set of allocations will fit on a node. +// The netIdx can optionally be provided if its already been computed. +// If the netIdx is provided, it is assumed that the client has already +// ensured there are no collisions. +func AllocsFit(node *Node, allocs []*Allocation, netIdx *NetworkIndex) (bool, *Resources, error) { // Compute the utilization from zero used := new(Resources) - for _, net := range node.Resources.Networks { - used.Networks = append(used.Networks, &NetworkResource{ - Public: net.Public, - CIDR: net.CIDR, - }) - } // Add the reserved resources of the node if node.Reserved != nil { @@ -88,8 +69,16 @@ func AllocsFit(node *Node, allocs []*Allocation) (bool, *Resources, error) { return false, used, nil } - // Ensure ports are not over commited - if PortsOvercommited(used) { + // Create the network index if missing + if netIdx == nil { + netIdx = NewNetworkIndex() + if netIdx.SetNode(node) || netIdx.AddAllocs(allocs) { + return false, used, nil + } + } + + // Check if the network is overcommitted + if netIdx.Overcommitted() { return false, used, nil } diff --git a/nomad/structs/funcs_test.go b/nomad/structs/funcs_test.go index 4411f7728..06899b151 100644 --- a/nomad/structs/funcs_test.go +++ b/nomad/structs/funcs_test.go @@ -39,25 +39,48 @@ func TestFilterTerminalALlocs(t *testing.T) { } } -func TestPortsOvercommitted(t *testing.T) { - r := &Resources{ - Networks: []*NetworkResource{ - &NetworkResource{ - ReservedPorts: []int{22, 80}, - }, - &NetworkResource{ - ReservedPorts: []int{22, 80}, +func TestAllocsFit_PortsOvercommitted(t *testing.T) { + n := &Node{ + Resources: &Resources{ + Networks: []*NetworkResource{ + &NetworkResource{ + CIDR: "10.0.0.0/8", + MBits: 100, + }, }, }, } - if PortsOvercommited(r) { - t.Fatalf("bad") + + a1 := &Allocation{ + TaskResources: map[string]*Resources{ + "web": &Resources{ + Networks: []*NetworkResource{ + &NetworkResource{ + IP: "10.0.0.1", + MBits: 50, + ReservedPorts: []int{8000}, + }, + }, + }, + }, } - // Overcommit 22 - r.Networks[1].ReservedPorts[1] = 22 - if !PortsOvercommited(r) { - t.Fatalf("bad") + // Should fit one allocation + fit, _, err := AllocsFit(n, []*Allocation{a1}, nil) + if err != nil { + t.Fatalf("err: %v", err) + } + if !fit { + t.Fatalf("Bad") + } + + // Should not fit second allocation + fit, _, err = AllocsFit(n, []*Allocation{a1, a1}, nil) + if err != nil { + t.Fatalf("err: %v", err) + } + if fit { + t.Fatalf("Bad") } } @@ -82,7 +105,7 @@ func TestAllocsFit(t *testing.T) { IOPS: 50, Networks: []*NetworkResource{ &NetworkResource{ - CIDR: "10.0.0.0/8", + IP: "10.0.0.1", MBits: 50, ReservedPorts: []int{80}, }, @@ -98,7 +121,7 @@ func TestAllocsFit(t *testing.T) { IOPS: 50, Networks: []*NetworkResource{ &NetworkResource{ - CIDR: "10.0.0.0/8", + IP: "10.0.0.1", MBits: 50, ReservedPorts: []int{8000}, }, @@ -107,7 +130,7 @@ func TestAllocsFit(t *testing.T) { } // Should fit one allocation - fit, used, err := AllocsFit(n, []*Allocation{a1}) + fit, used, err := AllocsFit(n, []*Allocation{a1}, nil) if err != nil { t.Fatalf("err: %v", err) } @@ -124,7 +147,7 @@ func TestAllocsFit(t *testing.T) { } // Should not fit second allocation - fit, used, err = AllocsFit(n, []*Allocation{a1, a1}) + fit, used, err = AllocsFit(n, []*Allocation{a1, a1}, nil) if err != nil { t.Fatalf("err: %v", err) } diff --git a/nomad/structs/network.go b/nomad/structs/network.go new file mode 100644 index 000000000..216e51015 --- /dev/null +++ b/nomad/structs/network.go @@ -0,0 +1,202 @@ +package structs + +import ( + "fmt" + "math/rand" + "net" +) + +const ( + // MinDynamicPort is the smallest dynamic port generated + MinDynamicPort = 20000 + + // MaxDynamicPort is the largest dynamic port generated + MaxDynamicPort = 60000 + + // maxRandPortAttempts is the maximum number of attempt + // to assign a random port + maxRandPortAttempts = 20 +) + +// NetworkIndex is used to index the available network resources +// and the used network resources on a machine given allocations +type NetworkIndex struct { + AvailNetworks []*NetworkResource // List of available networks + AvailBandwidth map[string]int // Bandwidth by device + UsedPorts map[string]map[int]struct{} // Ports by IP + UsedBandwidth map[string]int // Bandwidth by device +} + +// NewNetworkIndex is used to construct a new network index +func NewNetworkIndex() *NetworkIndex { + return &NetworkIndex{ + AvailBandwidth: make(map[string]int), + UsedPorts: make(map[string]map[int]struct{}), + UsedBandwidth: make(map[string]int), + } +} + +// Overcommitted checks if the network is overcommitted +func (idx *NetworkIndex) Overcommitted() bool { + for device, used := range idx.UsedBandwidth { + avail := idx.AvailBandwidth[device] + if used > avail { + return true + } + } + return false +} + +// SetNode is used to setup the available network resources. Returns +// true if there is a collision +func (idx *NetworkIndex) SetNode(node *Node) (collide bool) { + // Add the available CIDR blocks + for _, n := range node.Resources.Networks { + if n.CIDR != "" { + idx.AvailNetworks = append(idx.AvailNetworks, n) + idx.AvailBandwidth[n.Device] = n.MBits + } + } + + // Add the reserved resources + if r := node.Reserved; r != nil { + for _, n := range r.Networks { + if idx.AddReserved(n) { + collide = true + } + } + } + return +} + +// AddAllocs is used to add the used network resources. Returns +// true if there is a collision +func (idx *NetworkIndex) AddAllocs(allocs []*Allocation) (collide bool) { + for _, alloc := range allocs { + for _, task := range alloc.TaskResources { + if len(task.Networks) == 0 { + continue + } + n := task.Networks[0] + if idx.AddReserved(n) { + collide = true + } + } + } + return +} + +// AddReserved is used to add a reserved network usage, returns true +// if there is a port collision +func (idx *NetworkIndex) AddReserved(n *NetworkResource) (collide bool) { + // Add the port usage + used := idx.UsedPorts[n.IP] + if used == nil { + used = make(map[int]struct{}) + idx.UsedPorts[n.IP] = used + } + for _, port := range n.ReservedPorts { + if _, ok := used[port]; ok { + collide = true + } else { + used[port] = struct{}{} + } + } + + // Add the bandwidth + idx.UsedBandwidth[n.Device] += n.MBits + return +} + +// yieldIP is used to iteratively invoke the callback with +// an available IP +func (idx *NetworkIndex) yieldIP(cb func(net *NetworkResource, ip net.IP) bool) { + inc := func(ip net.IP) { + for j := len(ip) - 1; j >= 0; j-- { + ip[j]++ + if ip[j] > 0 { + break + } + } + } + + for _, n := range idx.AvailNetworks { + ip, ipnet, err := net.ParseCIDR(n.CIDR) + if err != nil { + continue + } + for ip := ip.Mask(ipnet.Mask); ipnet.Contains(ip); inc(ip) { + if cb(n, ip) { + return + } + } + } +} + +// AssignNetwork is used to assign network resources given an ask. +// If the ask cannot be satisfied, returns nil +func (idx *NetworkIndex) AssignNetwork(ask *NetworkResource) (out *NetworkResource, err error) { + idx.yieldIP(func(n *NetworkResource, ip net.IP) (stop bool) { + // Convert the IP to a string + ipStr := ip.String() + + // Check if we would exceed the bandwidth cap + availBandwidth := idx.AvailBandwidth[n.Device] + usedBandwidth := idx.UsedBandwidth[n.Device] + if usedBandwidth+ask.MBits > availBandwidth { + err = fmt.Errorf("bandwidth exceeded") + return + } + + // Check if any of the reserved ports are in use + for _, port := range ask.ReservedPorts { + if _, ok := idx.UsedPorts[ipStr][port]; ok { + err = fmt.Errorf("reserved port collision") + return + } + } + + // Create the offer + offer := &NetworkResource{ + Device: n.Device, + IP: ipStr, + ReservedPorts: ask.ReservedPorts, + } + + // Check if we need to generate any ports + for i := 0; i < ask.DynamicPorts; i++ { + attempts := 0 + PICK: + attempts++ + if attempts > maxRandPortAttempts { + err = fmt.Errorf("dynamic port selection failed") + return + } + + randPort := MinDynamicPort + rand.Intn(MaxDynamicPort-MinDynamicPort) + if _, ok := idx.UsedPorts[ipStr][randPort]; ok { + goto PICK + } + if IntContains(offer.ReservedPorts, randPort) { + goto PICK + } + offer.ReservedPorts = append(offer.ReservedPorts, randPort) + } + + // Stop, we have an offer! + out = offer + err = nil + return true + }) + return +} + +// IntContains scans an integer slice for a value +func IntContains(haystack []int, needle int) bool { + for _, item := range haystack { + if item == needle { + return true + } + } + return false +} diff --git a/nomad/structs/network_test.go b/nomad/structs/network_test.go new file mode 100644 index 000000000..1db783d65 --- /dev/null +++ b/nomad/structs/network_test.go @@ -0,0 +1,349 @@ +package structs + +import ( + "net" + "reflect" + "testing" +) + +func TestNetworkIndex_Overcommitted(t *testing.T) { + idx := NewNetworkIndex() + + // Consume some network + reserved := &NetworkResource{ + Device: "eth0", + IP: "192.168.0.100", + MBits: 505, + ReservedPorts: []int{8000, 9000}, + } + collide := idx.AddReserved(reserved) + if collide { + t.Fatalf("bad") + } + if !idx.Overcommitted() { + t.Fatalf("have no resources") + } + + // Add resources + n := &Node{ + Resources: &Resources{ + Networks: []*NetworkResource{ + &NetworkResource{ + Device: "eth0", + CIDR: "192.168.0.100/32", + MBits: 1000, + }, + }, + }, + } + idx.SetNode(n) + if idx.Overcommitted() { + t.Fatalf("have resources") + } + + // Double up our ussage + idx.AddReserved(reserved) + if !idx.Overcommitted() { + t.Fatalf("should be overcommitted") + } +} + +func TestNetworkIndex_SetNode(t *testing.T) { + idx := NewNetworkIndex() + n := &Node{ + Resources: &Resources{ + Networks: []*NetworkResource{ + &NetworkResource{ + Device: "eth0", + CIDR: "192.168.0.100/32", + MBits: 1000, + }, + }, + }, + Reserved: &Resources{ + Networks: []*NetworkResource{ + &NetworkResource{ + Device: "eth0", + IP: "192.168.0.100", + ReservedPorts: []int{22}, + MBits: 1, + }, + }, + }, + } + collide := idx.SetNode(n) + if collide { + t.Fatalf("bad") + } + + if len(idx.AvailNetworks) != 1 { + t.Fatalf("Bad") + } + if idx.AvailBandwidth["eth0"] != 1000 { + t.Fatalf("Bad") + } + if idx.UsedBandwidth["eth0"] != 1 { + t.Fatalf("Bad") + } + if _, ok := idx.UsedPorts["192.168.0.100"][22]; !ok { + t.Fatalf("Bad") + } +} + +func TestNetworkIndex_AddAllocs(t *testing.T) { + idx := NewNetworkIndex() + allocs := []*Allocation{ + &Allocation{ + TaskResources: map[string]*Resources{ + "web": &Resources{ + Networks: []*NetworkResource{ + &NetworkResource{ + Device: "eth0", + IP: "192.168.0.100", + MBits: 20, + ReservedPorts: []int{8000, 9000}, + }, + }, + }, + }, + }, + &Allocation{ + TaskResources: map[string]*Resources{ + "api": &Resources{ + Networks: []*NetworkResource{ + &NetworkResource{ + Device: "eth0", + IP: "192.168.0.100", + MBits: 50, + ReservedPorts: []int{10000}, + }, + }, + }, + }, + }, + } + collide := idx.AddAllocs(allocs) + if collide { + t.Fatalf("bad") + } + + if idx.UsedBandwidth["eth0"] != 70 { + t.Fatalf("Bad") + } + if _, ok := idx.UsedPorts["192.168.0.100"][8000]; !ok { + t.Fatalf("Bad") + } + if _, ok := idx.UsedPorts["192.168.0.100"][9000]; !ok { + t.Fatalf("Bad") + } + if _, ok := idx.UsedPorts["192.168.0.100"][10000]; !ok { + t.Fatalf("Bad") + } +} + +func TestNetworkIndex_AddReserved(t *testing.T) { + idx := NewNetworkIndex() + + reserved := &NetworkResource{ + Device: "eth0", + IP: "192.168.0.100", + MBits: 20, + ReservedPorts: []int{8000, 9000}, + } + collide := idx.AddReserved(reserved) + if collide { + t.Fatalf("bad") + } + + if idx.UsedBandwidth["eth0"] != 20 { + t.Fatalf("Bad") + } + if _, ok := idx.UsedPorts["192.168.0.100"][8000]; !ok { + t.Fatalf("Bad") + } + if _, ok := idx.UsedPorts["192.168.0.100"][9000]; !ok { + t.Fatalf("Bad") + } + + // Try to reserve the same network + collide = idx.AddReserved(reserved) + if !collide { + t.Fatalf("bad") + } +} + +func TestNetworkIndex_yieldIP(t *testing.T) { + idx := NewNetworkIndex() + n := &Node{ + Resources: &Resources{ + Networks: []*NetworkResource{ + &NetworkResource{ + Device: "eth0", + CIDR: "192.168.0.100/30", + MBits: 1000, + }, + }, + }, + Reserved: &Resources{ + Networks: []*NetworkResource{ + &NetworkResource{ + Device: "eth0", + IP: "192.168.0.100", + ReservedPorts: []int{22}, + MBits: 1, + }, + }, + }, + } + idx.SetNode(n) + + var out []string + idx.yieldIP(func(n *NetworkResource, ip net.IP) (stop bool) { + out = append(out, ip.String()) + return + }) + + expect := []string{"192.168.0.100", "192.168.0.101", + "192.168.0.102", "192.168.0.103"} + if !reflect.DeepEqual(out, expect) { + t.Fatalf("bad: %v", out) + } +} + +func TestNetworkIndex_AssignNetwork(t *testing.T) { + idx := NewNetworkIndex() + n := &Node{ + Resources: &Resources{ + Networks: []*NetworkResource{ + &NetworkResource{ + Device: "eth0", + CIDR: "192.168.0.100/30", + MBits: 1000, + }, + }, + }, + Reserved: &Resources{ + Networks: []*NetworkResource{ + &NetworkResource{ + Device: "eth0", + IP: "192.168.0.100", + ReservedPorts: []int{22}, + MBits: 1, + }, + }, + }, + } + idx.SetNode(n) + + allocs := []*Allocation{ + &Allocation{ + TaskResources: map[string]*Resources{ + "web": &Resources{ + Networks: []*NetworkResource{ + &NetworkResource{ + Device: "eth0", + IP: "192.168.0.100", + MBits: 20, + ReservedPorts: []int{8000, 9000}, + }, + }, + }, + }, + }, + &Allocation{ + TaskResources: map[string]*Resources{ + "api": &Resources{ + Networks: []*NetworkResource{ + &NetworkResource{ + Device: "eth0", + IP: "192.168.0.100", + MBits: 50, + ReservedPorts: []int{10000}, + }, + }, + }, + }, + }, + } + idx.AddAllocs(allocs) + + // Ask for a reserved port + ask := &NetworkResource{ + ReservedPorts: []int{8000}, + } + offer, err := idx.AssignNetwork(ask) + if err != nil { + t.Fatalf("err: %v", err) + } + if offer == nil { + t.Fatalf("bad") + } + if offer.IP != "192.168.0.101" { + t.Fatalf("bad: %#v", offer) + } + if len(offer.ReservedPorts) != 1 || offer.ReservedPorts[0] != 8000 { + t.Fatalf("bad: %#v", offer) + } + + // Ask for dynamic ports + ask = &NetworkResource{ + DynamicPorts: 3, + } + offer, err = idx.AssignNetwork(ask) + if err != nil { + t.Fatalf("err: %v", err) + } + if offer == nil { + t.Fatalf("bad") + } + if offer.IP != "192.168.0.100" { + t.Fatalf("bad: %#v", offer) + } + if len(offer.ReservedPorts) != 3 { + t.Fatalf("bad: %#v", offer) + } + + // Ask for reserved + dynamic ports + ask = &NetworkResource{ + ReservedPorts: []int{12345}, + DynamicPorts: 3, + } + offer, err = idx.AssignNetwork(ask) + if err != nil { + t.Fatalf("err: %v", err) + } + if offer == nil { + t.Fatalf("bad") + } + if offer.IP != "192.168.0.100" { + t.Fatalf("bad: %#v", offer) + } + if len(offer.ReservedPorts) != 4 || offer.ReservedPorts[0] != 12345 { + t.Fatalf("bad: %#v", offer) + } + + // Ask for too much bandwidth + ask = &NetworkResource{ + MBits: 1000, + } + offer, err = idx.AssignNetwork(ask) + if err.Error() != "bandwidth exceeded" { + t.Fatalf("err: %v", err) + } + if offer != nil { + t.Fatalf("bad") + } +} + +func TestIntContains(t *testing.T) { + l := []int{1, 2, 10, 20} + if IntContains(l, 50) { + t.Fatalf("bad") + } + if !IntContains(l, 20) { + t.Fatalf("bad") + } + if !IntContains(l, 1) { + t.Fatalf("bad") + } +} diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 418287e12..a49e1b385 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -538,12 +538,22 @@ type Resources struct { Networks []*NetworkResource } -// NetIndexByCIDR scans the list of networks for a matching -// CIDR, returning the index. This currently ONLY handles -// an exact match and not a subset CIDR. -func (r *Resources) NetIndexByCIDR(cidr string) int { +// Copy returns a deep copy of the resources +func (r *Resources) Copy() *Resources { + newR := new(Resources) + *newR = *r + n := len(r.Networks) + newR.Networks = make([]*NetworkResource, n) + for i := 0; i < n; i++ { + newR.Networks[i] = r.Networks[i].Copy() + } + return newR +} + +// NetIndex finds the matching net index using device name +func (r *Resources) NetIndex(n *NetworkResource) int { for idx, net := range r.Networks { - if net.CIDR == cidr { + if net.Device == n.Device { return idx } } @@ -551,7 +561,8 @@ func (r *Resources) NetIndexByCIDR(cidr string) int { } // Superset checks if one set of resources is a superset -// of another. +// of another. This ignores network resources, and the NetworkIndex +// should be used for that. func (r *Resources) Superset(other *Resources) bool { if r.CPU < other.CPU { return false @@ -565,21 +576,6 @@ func (r *Resources) Superset(other *Resources) bool { if r.IOPS < other.IOPS { return false } - for _, net := range r.Networks { - idx := other.NetIndexByCIDR(net.CIDR) - if idx >= 0 { - if net.MBits < other.Networks[idx].MBits { - return false - } - } - } - // Check that other does not have a network we are missing - for _, net := range other.Networks { - idx := r.NetIndexByCIDR(net.CIDR) - if idx == -1 { - return false - } - } return true } @@ -594,12 +590,14 @@ func (r *Resources) Add(delta *Resources) error { r.DiskMB += delta.DiskMB r.IOPS += delta.IOPS - for _, net := range delta.Networks { - idx := r.NetIndexByCIDR(net.CIDR) + for _, n := range delta.Networks { + // Find the matching interface by IP or CIDR + idx := r.NetIndex(n) if idx == -1 { - return fmt.Errorf("missing network for CIDR %s", net.CIDR) + r.Networks = append(r.Networks, n.Copy()) + } else { + r.Networks[idx].Add(n) } - r.Networks[idx].Add(net) } return nil } @@ -607,10 +605,21 @@ func (r *Resources) Add(delta *Resources) error { // NetworkResource is used to represesent available network // resources type NetworkResource struct { - Public bool // Is this a public address? + Device string // Name of the device CIDR string // CIDR block of addresses + IP string // IP address ReservedPorts []int // Reserved ports MBits int // Throughput + DynamicPorts int // Dynamically assigned ports +} + +// Copy returns a deep copy of the network resource +func (n *NetworkResource) Copy() *NetworkResource { + newR := new(NetworkResource) + *newR = *n + newR.ReservedPorts = make([]int, len(n.ReservedPorts)) + copy(newR.ReservedPorts, n.ReservedPorts) + return newR } // Add adds the resources of the delta to this, potentially @@ -620,13 +629,13 @@ func (n *NetworkResource) Add(delta *NetworkResource) { n.ReservedPorts = append(n.ReservedPorts, delta.ReservedPorts...) } n.MBits += delta.MBits + n.DynamicPorts += delta.DynamicPorts } const ( // JobTypeNomad is reserved for internal system tasks and is // always handled by the CoreScheduler. JobTypeCore = "_core" - JobTypeSystem = "system" JobTypeService = "service" JobTypeBatch = "batch" ) @@ -871,10 +880,14 @@ type Allocation struct { // TaskGroup is the name of the task group that should be run TaskGroup string - // Resources is the set of resources allocated as part + // Resources is the total set of resources allocated as part // of this allocation of the task group. Resources *Resources + // TaskResources is the set of resources allocated to each + // task. These should sum to the total Resources. + TaskResources map[string]*Resources + // Metrics associated with this allocation Metrics *AllocMetric @@ -964,6 +977,9 @@ type AllocMetric struct { // ClassExhausted is the number of nodes exhausted by class ClassExhausted map[string]int + // DimensionExhaused provides the count by dimension or reason + DimensionExhaused map[string]int + // Scores is the scores of the final few nodes remaining // for placement. The top score is typically selected. Scores map[string]float64 @@ -999,7 +1015,7 @@ func (a *AllocMetric) FilterNode(node *Node, constraint string) { } } -func (a *AllocMetric) ExhaustedNode(node *Node) { +func (a *AllocMetric) ExhaustedNode(node *Node, dimension string) { a.NodesExhausted += 1 if node != nil && node.NodeClass != "" { if a.ClassExhausted == nil { @@ -1007,6 +1023,12 @@ func (a *AllocMetric) ExhaustedNode(node *Node) { } a.ClassExhausted[node.NodeClass] += 1 } + if dimension != "" { + if a.DimensionExhaused == nil { + a.DimensionExhaused = make(map[string]int) + } + a.DimensionExhaused[dimension] += 1 + } } func (a *AllocMetric) ScoreNode(node *Node, name string, score float64) { diff --git a/nomad/structs/structs_test.go b/nomad/structs/structs_test.go index e9debea19..259f51135 100644 --- a/nomad/structs/structs_test.go +++ b/nomad/structs/structs_test.go @@ -5,20 +5,21 @@ import ( "testing" ) -func TestResource_NetIndexByCIDR(t *testing.T) { +func TestResource_NetIndex(t *testing.T) { r := &Resources{ Networks: []*NetworkResource{ - &NetworkResource{CIDR: "10.0.0.0/8"}, - &NetworkResource{CIDR: "127.0.0.0/24"}, + &NetworkResource{Device: "eth0"}, + &NetworkResource{Device: "lo0"}, + &NetworkResource{Device: ""}, }, } - if idx := r.NetIndexByCIDR("10.0.0.0/8"); idx != 0 { + if idx := r.NetIndex(&NetworkResource{Device: "eth0"}); idx != 0 { t.Fatalf("Bad: %d", idx) } - if idx := r.NetIndexByCIDR("127.0.0.0/24"); idx != 1 { + if idx := r.NetIndex(&NetworkResource{Device: "lo0"}); idx != 1 { t.Fatalf("Bad: %d", idx) } - if idx := r.NetIndexByCIDR("10.0.0.0/16"); idx != -1 { + if idx := r.NetIndex(&NetworkResource{Device: "eth1"}); idx != -1 { t.Fatalf("Bad: %d", idx) } } @@ -29,24 +30,12 @@ func TestResource_Superset(t *testing.T) { MemoryMB: 2048, DiskMB: 10000, IOPS: 100, - Networks: []*NetworkResource{ - &NetworkResource{ - CIDR: "10.0.0.0/8", - MBits: 100, - }, - }, } r2 := &Resources{ CPU: 1.0, MemoryMB: 1024, DiskMB: 5000, IOPS: 50, - Networks: []*NetworkResource{ - &NetworkResource{ - CIDR: "10.0.0.0/8", - MBits: 50, - }, - }, } if !r1.Superset(r1) { @@ -84,7 +73,7 @@ func TestResource_Add(t *testing.T) { IOPS: 50, Networks: []*NetworkResource{ &NetworkResource{ - CIDR: "10.0.0.0/8", + IP: "10.0.0.1", MBits: 50, ReservedPorts: []int{80}, }, @@ -115,6 +104,48 @@ func TestResource_Add(t *testing.T) { } } +func TestResource_Add_Network(t *testing.T) { + r1 := &Resources{} + r2 := &Resources{ + Networks: []*NetworkResource{ + &NetworkResource{ + MBits: 50, + DynamicPorts: 2, + }, + }, + } + r3 := &Resources{ + Networks: []*NetworkResource{ + &NetworkResource{ + MBits: 25, + DynamicPorts: 1, + }, + }, + } + + err := r1.Add(r2) + if err != nil { + t.Fatalf("Err: %v", err) + } + err = r1.Add(r3) + if err != nil { + t.Fatalf("Err: %v", err) + } + + expect := &Resources{ + Networks: []*NetworkResource{ + &NetworkResource{ + MBits: 75, + DynamicPorts: 3, + }, + }, + } + + if !reflect.DeepEqual(expect.Networks, r1.Networks) { + t.Fatalf("bad: %#v %#v", expect, r1) + } +} + func TestEncodeDecode(t *testing.T) { type FooRequest struct { Foo string diff --git a/scheduler/generic_sched.go b/scheduler/generic_sched.go index 47dcdfbdf..e51280ca9 100644 --- a/scheduler/generic_sched.go +++ b/scheduler/generic_sched.go @@ -311,6 +311,15 @@ func (s *GenericScheduler) inplaceUpdate(updates []allocTuple) []allocTuple { continue } + // Restore the network offers from the existing allocation. + // We do not allow network resources (reserved/dynamic ports) + // to be updated. This is guarded in taskUpdated, so we can + // safely restore those here. + for task, resources := range option.TaskResources { + existing := update.Alloc.TaskResources[task] + resources.Networks = existing.Networks + } + // Create a shallow copy newAlloc := new(structs.Allocation) *newAlloc = *update.Alloc @@ -319,6 +328,7 @@ func (s *GenericScheduler) inplaceUpdate(updates []allocTuple) []allocTuple { newAlloc.EvalID = s.eval.ID newAlloc.Job = s.job newAlloc.Resources = size + newAlloc.TaskResources = option.TaskResources newAlloc.Metrics = s.ctx.Metrics() newAlloc.DesiredStatus = structs.AllocDesiredStatusRun newAlloc.ClientStatus = structs.AllocClientStatusPending @@ -361,36 +371,29 @@ func (s *GenericScheduler) computePlacements(place []allocTuple) error { // Attempt to match the task group option, size := s.stack.Select(missing.TaskGroup) - // Handle a placement failure - var nodeID, status, desc, clientStatus string - if option == nil { - status = structs.AllocDesiredStatusFailed - desc = "failed to find a node for placement" - clientStatus = structs.AllocClientStatusFailed - } else { - nodeID = option.Node.ID - status = structs.AllocDesiredStatusRun - clientStatus = structs.AllocClientStatusPending - } - // Create an allocation for this alloc := &structs.Allocation{ - ID: structs.GenerateUUID(), - EvalID: s.eval.ID, - Name: missing.Name, - NodeID: nodeID, - JobID: s.job.ID, - Job: s.job, - TaskGroup: missing.TaskGroup.Name, - Resources: size, - Metrics: s.ctx.Metrics(), - DesiredStatus: status, - DesiredDescription: desc, - ClientStatus: clientStatus, + ID: structs.GenerateUUID(), + EvalID: s.eval.ID, + Name: missing.Name, + JobID: s.job.ID, + Job: s.job, + TaskGroup: missing.TaskGroup.Name, + Resources: size, + Metrics: s.ctx.Metrics(), } - if nodeID != "" { + + // Set fields based on if we found an allocation option + if option != nil { + alloc.NodeID = option.Node.ID + alloc.TaskResources = option.TaskResources + alloc.DesiredStatus = structs.AllocDesiredStatusRun + alloc.ClientStatus = structs.AllocClientStatusPending s.plan.AppendAlloc(alloc) } else { + alloc.DesiredStatus = structs.AllocDesiredStatusFailed + alloc.DesiredDescription = "failed to find a node for placement" + alloc.ClientStatus = structs.AllocClientStatusFailed s.plan.AppendFailed(alloc) failedTG[missing.TaskGroup] = alloc } diff --git a/scheduler/generic_sched_test.go b/scheduler/generic_sched_test.go index 6adbff991..61a1534d2 100644 --- a/scheduler/generic_sched_test.go +++ b/scheduler/generic_sched_test.go @@ -382,6 +382,15 @@ func TestServiceSched_JobModify_InPlace(t *testing.T) { t.Fatalf("bad: %#v", out) } h.AssertEvalStatus(t, structs.EvalStatusComplete) + + // Verify the network did not change + for _, alloc := range out { + for _, resources := range alloc.TaskResources { + if resources.Networks[0].ReservedPorts[0] != 5000 { + t.Fatalf("bad: %#v", alloc) + } + } + } } func TestServiceSched_JobDeregister(t *testing.T) { diff --git a/scheduler/rank.go b/scheduler/rank.go index 740a68a11..e9f65e098 100644 --- a/scheduler/rank.go +++ b/scheduler/rank.go @@ -10,8 +10,9 @@ import ( // along with a node when iterating. This state can be modified as // various rank methods are applied. type RankedNode struct { - Node *structs.Node - Score float64 + Node *structs.Node + Score float64 + TaskResources map[string]*structs.Resources // Allocs is used to cache the proposed allocations on the // node. This can be shared between iterators that require it. @@ -35,6 +36,14 @@ func (r *RankedNode) ProposedAllocs(ctx Context) ([]*structs.Allocation, error) return p, nil } +func (r *RankedNode) SetTaskResources(task *structs.Task, + resource *structs.Resources) { + if r.TaskResources == nil { + r.TaskResources = make(map[string]*structs.Resources) + } + r.TaskResources[task.Name] = resource +} + // RankFeasibleIterator is used to iteratively yield nodes along // with ranking metadata. The iterators may manage some state for // performance optimizations. @@ -122,35 +131,35 @@ func (iter *StaticRankIterator) Reset() { // BinPackIterator is a RankIterator that scores potential options // based on a bin-packing algorithm. type BinPackIterator struct { - ctx Context - source RankIterator - resources *structs.Resources - evict bool - priority int + ctx Context + source RankIterator + evict bool + priority int + tasks []*structs.Task } -// NewBinPackIterator returns a BinPackIterator which tries to fit the given -// resources, potentially evicting other tasks based on a given priority. -func NewBinPackIterator(ctx Context, source RankIterator, resources *structs.Resources, evict bool, priority int) *BinPackIterator { +// NewBinPackIterator returns a BinPackIterator which tries to fit tasks +// potentially evicting other tasks based on a given priority. +func NewBinPackIterator(ctx Context, source RankIterator, evict bool, priority int) *BinPackIterator { iter := &BinPackIterator{ - ctx: ctx, - source: source, - resources: resources, - evict: evict, - priority: priority, + ctx: ctx, + source: source, + evict: evict, + priority: priority, } return iter } -func (iter *BinPackIterator) SetResources(r *structs.Resources) { - iter.resources = r -} - func (iter *BinPackIterator) SetPriority(p int) { iter.priority = p } +func (iter *BinPackIterator) SetTasks(tasks []*structs.Task) { + iter.tasks = tasks +} + func (iter *BinPackIterator) Next() *RankedNode { +OUTER: for { // Get the next potential option option := iter.source.Next() @@ -167,13 +176,47 @@ func (iter *BinPackIterator) Next() *RankedNode { continue } + // Index the existing network usage + netIdx := structs.NewNetworkIndex() + netIdx.SetNode(option.Node) + netIdx.AddAllocs(proposed) + + // Assign the resources for each task + total := new(structs.Resources) + for _, task := range iter.tasks { + taskResources := task.Resources.Copy() + + // Check if we need a network resource + if len(taskResources.Networks) > 0 { + ask := taskResources.Networks[0] + offer, err := netIdx.AssignNetwork(ask) + if offer == nil { + iter.ctx.Metrics().ExhaustedNode(option.Node, + fmt.Sprintf("network: %s", err)) + continue OUTER + } + + // Reserve this to prevent another task from colliding + netIdx.AddReserved(offer) + + // Update the network ask to the offer + taskResources.Networks = []*structs.NetworkResource{offer} + } + + // Store the task resource + option.SetTaskResources(task, taskResources) + + // Accumulate the total resource requirement + total.Add(taskResources) + } + // Add the resources we are trying to fit - proposed = append(proposed, &structs.Allocation{Resources: iter.resources}) + proposed = append(proposed, &structs.Allocation{Resources: total}) // Check if these allocations fit, if they do not, simply skip this node - fit, util, _ := structs.AllocsFit(option.Node, proposed) + fit, util, _ := structs.AllocsFit(option.Node, proposed, netIdx) if !fit { - iter.ctx.Metrics().ExhaustedNode(option.Node) + iter.ctx.Metrics().ExhaustedNode(option.Node, "resources") continue } diff --git a/scheduler/rank_test.go b/scheduler/rank_test.go index 69bcdb452..605902ed4 100644 --- a/scheduler/rank_test.go +++ b/scheduler/rank_test.go @@ -68,11 +68,16 @@ func TestBinPackIterator_NoExistingAlloc(t *testing.T) { } static := NewStaticRankIterator(ctx, nodes) - resources := &structs.Resources{ - CPU: 1024, - MemoryMB: 1024, + task := &structs.Task{ + Name: "web", + Resources: &structs.Resources{ + CPU: 1024, + MemoryMB: 1024, + }, } - binp := NewBinPackIterator(ctx, static, resources, false, 0) + + binp := NewBinPackIterator(ctx, static, false, 0) + binp.SetTasks([]*structs.Task{task}) out := collectRanked(binp) if len(out) != 2 { @@ -137,11 +142,16 @@ func TestBinPackIterator_PlannedAlloc(t *testing.T) { }, } - resources := &structs.Resources{ - CPU: 1024, - MemoryMB: 1024, + task := &structs.Task{ + Name: "web", + Resources: &structs.Resources{ + CPU: 1024, + MemoryMB: 1024, + }, } - binp := NewBinPackIterator(ctx, static, resources, false, 0) + + binp := NewBinPackIterator(ctx, static, false, 0) + binp.SetTasks([]*structs.Task{task}) out := collectRanked(binp) if len(out) != 1 { @@ -207,11 +217,16 @@ func TestBinPackIterator_ExistingAlloc(t *testing.T) { } noErr(t, state.UpsertAllocs(1000, []*structs.Allocation{alloc1, alloc2})) - resources := &structs.Resources{ - CPU: 1024, - MemoryMB: 1024, + task := &structs.Task{ + Name: "web", + Resources: &structs.Resources{ + CPU: 1024, + MemoryMB: 1024, + }, } - binp := NewBinPackIterator(ctx, static, resources, false, 0) + + binp := NewBinPackIterator(ctx, static, false, 0) + binp.SetTasks([]*structs.Task{task}) out := collectRanked(binp) if len(out) != 1 { @@ -280,11 +295,16 @@ func TestBinPackIterator_ExistingAlloc_PlannedEvict(t *testing.T) { plan := ctx.Plan() plan.NodeUpdate[nodes[0].Node.ID] = []*structs.Allocation{alloc1} - resources := &structs.Resources{ - CPU: 1024, - MemoryMB: 1024, + task := &structs.Task{ + Name: "web", + Resources: &structs.Resources{ + CPU: 1024, + MemoryMB: 1024, + }, } - binp := NewBinPackIterator(ctx, static, resources, false, 0) + + binp := NewBinPackIterator(ctx, static, false, 0) + binp.SetTasks([]*structs.Task{task}) out := collectRanked(binp) if len(out) != 2 { diff --git a/scheduler/stack.go b/scheduler/stack.go index fdedc738b..156c231c3 100644 --- a/scheduler/stack.go +++ b/scheduler/stack.go @@ -76,7 +76,7 @@ func NewGenericStack(batch bool, ctx Context, baseNodes []*structs.Node) *Generi // by a particular task group. Only enable eviction for the service // scheduler as that logic is expensive. evict := !batch - s.binPack = NewBinPackIterator(ctx, rankSource, nil, evict, 0) + s.binPack = NewBinPackIterator(ctx, rankSource, evict, 0) // Apply the job anti-affinity iterator. This is to avoid placing // multiple allocations on the same node for this job. The penalty @@ -149,11 +149,18 @@ func (s *GenericStack) Select(tg *structs.TaskGroup) (*RankedNode, *structs.Reso // Update the parameters of iterators s.taskGroupDrivers.SetDrivers(drivers) s.taskGroupConstraint.SetConstraints(constr) - s.binPack.SetResources(size) + s.binPack.SetTasks(tg.Tasks) // Find the node with the max score option := s.maxScore.Next() + // Ensure that the task resources were specified + if option != nil && len(option.TaskResources) != len(tg.Tasks) { + for _, task := range tg.Tasks { + option.SetTaskResources(task, task.Resources) + } + } + // Store the compute time s.ctx.Metrics().AllocationTime = time.Since(start) return option, size diff --git a/scheduler/util.go b/scheduler/util.go index 7f8278ba4..cf7224a16 100644 --- a/scheduler/util.go +++ b/scheduler/util.go @@ -227,6 +227,11 @@ func tasksUpdated(a, b *structs.TaskGroup) bool { if !reflect.DeepEqual(at.Config, bt.Config) { return true } + + // Inspect the network to see if the resource ask is different + if !reflect.DeepEqual(at.Resources.Networks, bt.Resources.Networks) { + return true + } } return false } diff --git a/scheduler/util_test.go b/scheduler/util_test.go index 9ae78b75b..45c387e29 100644 --- a/scheduler/util_test.go +++ b/scheduler/util_test.go @@ -259,4 +259,10 @@ func TestTasksUpdated(t *testing.T) { if !tasksUpdated(j1.TaskGroups[0], j5.TaskGroups[0]) { t.Fatalf("bad") } + + j6 := mock.Job() + j6.TaskGroups[0].Tasks[0].Resources.Networks[0].DynamicPorts = 3 + if !tasksUpdated(j1.TaskGroups[0], j6.TaskGroups[0]) { + t.Fatalf("bad") + } }