core: merge reserved_ports into host_networks (#13651)

Fixes #13505

This fixes #13505 by treating reserved_ports like we treat a lot of jobspec settings: merging settings from more global stanzas (client.reserved.reserved_ports) "down" into more specific stanzas (client.host_networks[].reserved_ports).

As discussed in #13505 there are other options, and since it's totally broken right now we have some flexibility:

Treat overlapping reserved_ports on addresses as invalid and refuse to start agents. However, I'm not sure there's a cohesive model we want to publish right now since so much 0.9-0.12 compat code still exists! We would have to explain to folks that if their -network-interface and host_network addresses overlapped, they could only specify reserved_ports in one place or the other?! It gets ugly.
Use the global client.reserved.reserved_ports value as the default and treat host_network[].reserverd_ports as overrides. My first suggestion in the issue, but @groggemans made me realize the addresses on the agent's interface (as configured by -network-interface) may overlap with host_networks, so you'd need to remove the global reserved_ports from addresses shared with a shared network?! This seemed really confusing and subtle for users to me.
So I think "merging down" creates the most expressive yet understandable approach. I've played around with it a bit, and it doesn't seem too surprising. The only frustrating part is how difficult it is to observe the available addresses and ports on a node! However that's a job for another PR.
This commit is contained in:
Michael Schurter 2022-07-12 14:40:25 -07:00 committed by GitHub
parent b9ebf94131
commit 3e50f72fad
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 499 additions and 208 deletions

3
.changelog/13651.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:bug
core: Fixed a bug where reserved ports on multiple node networks would be treated as a collision. `client.reserved.reserved_ports` is now merged into each `host_network`'s reserved ports instead of being treated as a collision.
```

View File

@ -399,6 +399,7 @@ func (c *Command) IsValidConfig(config, cmdConfig *Config) bool {
}
for _, hn := range config.Client.HostNetworks {
// Ensure port range is valid
if _, err := structs.ParsePortRanges(hn.ReservedPorts); err != nil {
c.Ui.Error(fmt.Sprintf("host_network[%q].reserved_ports %q invalid: %v",
hn.Name, hn.ReservedPorts, err))

View File

@ -207,8 +207,11 @@ func AllocsFit(node *Node, allocs []*Allocation, netIdx *NetworkIndex, checkDevi
netIdx = NewNetworkIndex()
defer netIdx.Release()
if collision, reason := netIdx.SetNode(node); collision {
return false, fmt.Sprintf("reserved node port collision: %v", reason), used, nil
if err := netIdx.SetNode(node); err != nil {
// To maintain backward compatibility with when SetNode
// returned collision+reason like AddAllocs, return
// this as a reason instead of an error.
return false, fmt.Sprintf("reserved node port collision: %v", err), used, nil
}
if collision, reason := netIdx.AddAllocs(allocs); collision {
return false, fmt.Sprintf("reserved alloc port collision: %v", reason), used, nil
@ -530,6 +533,10 @@ func ParsePortRanges(spec string) ([]uint64, error) {
if err != nil {
return nil, err
}
if port > MaxValidPort {
return nil, fmt.Errorf("port must be < %d but found %d", MaxValidPort, port)
}
ports[port] = struct{}{}
}
case 2:

View File

@ -36,12 +36,32 @@ var (
// NetworkIndex is used to index the available network resources
// and the used network resources on a machine given allocations
//
// Fields are exported so they may be JSON serialized for debugging.
// Fields are *not* intended to be used directly.
type NetworkIndex struct {
AvailNetworks []*NetworkResource // List of available networks
NodeNetworks []*NodeNetworkResource // List of available node networks
AvailAddresses map[string][]NodeNetworkAddress // Map of host network aliases to list of addresses
// TaskNetworks are the node networks available for
// task.resources.network asks.
TaskNetworks []*NetworkResource
// GroupNetworks are the node networks available for group.network
// asks.
GroupNetworks []*NodeNetworkResource
// HostNetworks indexes addresses by host network alias
HostNetworks map[string][]NodeNetworkAddress
// UsedPorts tracks which ports are used on a per-IP address basis. For
// example if a node has `network_interface=lo` and port 22 reserved,
// then on a dual stack loopback interface UsedPorts would contain:
// {
// "127.0.0.1": Bitmap{22},
// "::1": Bitmap{22},
// }
UsedPorts map[string]Bitmap
// Deprecated bandwidth fields
AvailBandwidth map[string]int // Bandwidth by device
UsedPorts map[string]Bitmap // Ports by IP
UsedBandwidth map[string]int // Bandwidth by device
MinDynamicPort int // The smallest dynamic port generated
@ -51,9 +71,9 @@ type NetworkIndex struct {
// NewNetworkIndex is used to construct a new network index
func NewNetworkIndex() *NetworkIndex {
return &NetworkIndex{
AvailAddresses: make(map[string][]NodeNetworkAddress),
AvailBandwidth: make(map[string]int),
HostNetworks: make(map[string][]NodeNetworkAddress),
UsedPorts: make(map[string]Bitmap),
AvailBandwidth: make(map[string]int),
UsedBandwidth: make(map[string]int),
MinDynamicPort: DefaultMinDynamicPort,
MaxDynamicPort: DefaultMaxDynamicPort,
@ -84,9 +104,9 @@ func (idx *NetworkIndex) Copy() *NetworkIndex {
c := new(NetworkIndex)
*c = *idx
c.AvailNetworks = copyNetworkResources(idx.AvailNetworks)
c.NodeNetworks = copyNodeNetworks(idx.NodeNetworks)
c.AvailAddresses = copyAvailAddresses(idx.AvailAddresses)
c.TaskNetworks = copyNetworkResources(idx.TaskNetworks)
c.GroupNetworks = copyNodeNetworks(idx.GroupNetworks)
c.HostNetworks = copyAvailAddresses(idx.HostNetworks)
if idx.AvailBandwidth != nil && len(idx.AvailBandwidth) == 0 {
c.AvailBandwidth = make(map[string]int)
} else {
@ -171,61 +191,141 @@ func (idx *NetworkIndex) Overcommitted() bool {
return false
}
// SetNode is used to setup the available network resources. Returns
// true if there is a collision
func (idx *NetworkIndex) SetNode(node *Node) (collide bool, reason string) {
// SetNode is used to initialize a node's network index with available IPs,
// reserved ports, and other details from a node's configuration and
// fingerprinting.
//
// SetNode must be idempotent as preemption causes SetNode to be called
// multiple times on the same NetworkIndex, only clearing UsedPorts between
// calls.
//
// An error is returned if the Node cannot produce a consistent NetworkIndex
// such as if reserved_ports are unparseable.
//
// Any errors returned by SetNode indicate a bug! The bug may lie in client
// code not properly validating its configuration or it may lie in improper
// Node object handling by servers. Users should not be able to cause SetNode
// to error. Data that cause SetNode to error should be caught upstream such as
// a client agent refusing to start with an invalid configuration.
func (idx *NetworkIndex) SetNode(node *Node) error {
// COMPAT(0.11): Remove in 0.11
// Grab the network resources, handling both new and old
var networks []*NetworkResource
// COMPAT(0.11): Deprecated. taskNetworks are only used for
// task.resources.network asks which have been deprecated since before
// 0.11.
// Grab the network resources, handling both new and old Node layouts
// from clients.
var taskNetworks []*NetworkResource
if node.NodeResources != nil && len(node.NodeResources.Networks) != 0 {
networks = node.NodeResources.Networks
taskNetworks = node.NodeResources.Networks
} else if node.Resources != nil {
networks = node.Resources.Networks
taskNetworks = node.Resources.Networks
}
// Reserved ports get merged downward. For example given an agent
// config:
//
// client.reserved.reserved_ports = "22"
// client.host_network["eth0"] = {reserved_ports = "80,443"}
// client.host_network["eth1"] = {reserved_ports = "1-1000"}
//
// Addresses on taskNetworks reserve port 22
// Addresses on eth0 reserve 22,80,443 (note 22 is also reserved!)
// Addresses on eth1 reserve 1-1000
globalResPorts := []uint{}
if node.ReservedResources != nil && node.ReservedResources.Networks.ReservedHostPorts != "" {
resPorts, err := ParsePortRanges(node.ReservedResources.Networks.ReservedHostPorts)
if err != nil {
// This is a fatal error that should have been
// prevented by client validation.
return fmt.Errorf("error parsing reserved_ports: %w", err)
}
globalResPorts = make([]uint, len(resPorts))
for i, p := range resPorts {
globalResPorts[i] = uint(p)
}
} else if node.Reserved != nil {
// COMPAT(0.11): Remove after 0.11. Nodes stopped reporting
// reserved ports under Node.Reserved.Resources in #4750 / v0.9
for _, n := range node.Reserved.Networks {
used := idx.getUsedPortsFor(n.IP)
for _, ports := range [][]Port{n.ReservedPorts, n.DynamicPorts} {
for _, p := range ports {
if p.Value > MaxValidPort || p.Value < 0 {
// This is a fatal error that
// should have been prevented
// by validation upstream.
return fmt.Errorf("invalid port %d for reserved_ports", p.Value)
}
globalResPorts = append(globalResPorts, uint(p.Value))
used.Set(uint(p.Value))
}
}
// Reserve mbits
if n.Device != "" {
idx.UsedBandwidth[n.Device] += n.MBits
}
}
}
// Filter task networks down to those with a device. For example
// taskNetworks may contain a "bridge" interface which has no device
// set and cannot be used to fulfill asks.
for _, n := range taskNetworks {
if n.Device != "" {
idx.TaskNetworks = append(idx.TaskNetworks, n)
idx.AvailBandwidth[n.Device] = n.MBits
// Reserve ports
used := idx.getUsedPortsFor(n.IP)
for _, p := range globalResPorts {
used.Set(p)
}
}
}
// nodeNetworks are used for group.network asks.
var nodeNetworks []*NodeNetworkResource
if node.NodeResources != nil && len(node.NodeResources.NodeNetworks) != 0 {
nodeNetworks = node.NodeResources.NodeNetworks
}
// Add the available CIDR blocks
for _, n := range networks {
if n.Device != "" {
idx.AvailNetworks = append(idx.AvailNetworks, n)
idx.AvailBandwidth[n.Device] = n.MBits
}
}
// TODO: upgrade path?
// is it possible to get duplicates here?
for _, n := range nodeNetworks {
for _, a := range n.Addresses {
idx.AvailAddresses[a.Alias] = append(idx.AvailAddresses[a.Alias], a)
if c, r := idx.AddReservedPortsForIP(a.ReservedPorts, a.Address); c {
collide = true
reason = fmt.Sprintf("collision when reserving ports for node network %s in node %s: %v", a.Alias, node.ID, r)
}
}
}
// COMPAT(0.11): Remove in 0.11
// Handle reserving ports, handling both new and old
if node.ReservedResources != nil && node.ReservedResources.Networks.ReservedHostPorts != "" {
c, r := idx.AddReservedPortRange(node.ReservedResources.Networks.ReservedHostPorts)
collide = c
if collide {
reason = fmt.Sprintf("collision when reserving port range for node %s: %v", node.ID, r)
}
} else if node.Reserved != nil {
for _, n := range node.Reserved.Networks {
if c, r := idx.AddReserved(n); c {
collide = true
reason = fmt.Sprintf("collision when reserving network %s for node %s: %v", n.IP, node.ID, r)
// Index host networks by their unique alias for asks
// with group.network.port.host_network set.
idx.HostNetworks[a.Alias] = append(idx.HostNetworks[a.Alias], a)
// Mark reserved ports as used without worrying about
// collisions. This effectively merges
// client.reserved.reserved_ports into each
// host_network.
used := idx.getUsedPortsFor(a.Address)
for _, p := range globalResPorts {
used.Set(p)
}
// If ReservedPorts is set on the NodeNetwork, use it
// and the global reserved ports.
if a.ReservedPorts != "" {
rp, err := ParsePortRanges(a.ReservedPorts)
if err != nil {
// This is a fatal error that should
// have been prevented by validation
// upstream.
return fmt.Errorf("error parsing reserved_ports for network %q: %w", a.Alias, err)
}
for _, p := range rp {
used.Set(uint(p))
}
}
}
}
// Set dynamic port range (applies to all addresses)
if node.NodeResources != nil && node.NodeResources.MinDynamicPort > 0 {
idx.MinDynamicPort = node.NodeResources.MinDynamicPort
}
@ -234,11 +334,16 @@ func (idx *NetworkIndex) SetNode(node *Node) (collide bool, reason string) {
idx.MaxDynamicPort = node.NodeResources.MaxDynamicPort
}
return
return nil
}
// AddAllocs is used to add the used network resources. Returns
// true if there is a collision
//
// AddAllocs may be called multiple times for the same NetworkIndex with
// UsedPorts cleared between calls (by Release). Therefore AddAllocs must be
// determistic and must not manipulate state outside of UsedPorts as that state
// would persist between Release calls.
func (idx *NetworkIndex) AddAllocs(allocs []*Allocation) (collide bool, reason string) {
for _, alloc := range allocs {
// Do not consider the resource impact of terminal allocations
@ -338,51 +443,11 @@ func (idx *NetworkIndex) AddReservedPorts(ports AllocatedPorts) (collide bool, r
return
}
// AddReservedPortRange marks the ports given as reserved on all network
// interfaces. The port format is comma delimited, with spans given as n1-n2
// (80,100-200,205)
func (idx *NetworkIndex) AddReservedPortRange(ports string) (collide bool, reasons []string) {
// Convert the ports into a slice of ints
resPorts, err := ParsePortRanges(ports)
if err != nil {
return
}
// Ensure we create a bitmap for each available network
for _, n := range idx.AvailNetworks {
idx.getUsedPortsFor(n.IP)
}
for _, used := range idx.UsedPorts {
for _, port := range resPorts {
// Guard against invalid port
if port >= MaxValidPort {
return true, []string{fmt.Sprintf("invalid port %d", port)}
}
if used.Check(uint(port)) {
collide = true
reason := fmt.Sprintf("port %d already in use", port)
reasons = append(reasons, reason)
} else {
used.Set(uint(port))
}
}
}
return
}
// AddReservedPortsForIP checks whether any reserved ports collide with those
// in use for the IP address.
func (idx *NetworkIndex) AddReservedPortsForIP(ports string, ip string) (collide bool, reasons []string) {
// Convert the ports into a slice of ints
resPorts, err := ParsePortRanges(ports)
if err != nil {
return
}
func (idx *NetworkIndex) AddReservedPortsForIP(ports []uint64, ip string) (collide bool, reasons []string) {
used := idx.getUsedPortsFor(ip)
for _, port := range resPorts {
for _, port := range ports {
// Guard against invalid port
if port >= MaxValidPort {
return true, []string{fmt.Sprintf("invalid port %d", port)}
@ -401,22 +466,13 @@ func (idx *NetworkIndex) AddReservedPortsForIP(ports string, ip string) (collide
// yieldIP is used to iteratively invoke the callback with
// an available IP
func (idx *NetworkIndex) yieldIP(cb func(net *NetworkResource, ip net.IP) bool) {
inc := func(ip net.IP) {
for j := len(ip) - 1; j >= 0; j-- {
ip[j]++
if ip[j] > 0 {
break
}
}
}
for _, n := range idx.AvailNetworks {
func (idx *NetworkIndex) yieldIP(cb func(net *NetworkResource, offerIP net.IP) bool) {
for _, n := range idx.TaskNetworks {
ip, ipnet, err := net.ParseCIDR(n.CIDR)
if err != nil {
continue
}
for ip := ip.Mask(ipnet.Mask); ipnet.Contains(ip); inc(ip) {
for ip := ip.Mask(ipnet.Mask); ipnet.Contains(ip); incIP(ip) {
if cb(n, ip) {
return
}
@ -424,6 +480,26 @@ func (idx *NetworkIndex) yieldIP(cb func(net *NetworkResource, ip net.IP) bool)
}
}
func incIP(ip net.IP) {
// Iterate over IP octects from right to left
for j := len(ip) - 1; j >= 0; j-- {
// Increment octect
ip[j]++
// If this octect did not wrap around to 0, it's the next IP to
// try. If it did wrap (p[j]==0), then the next octect is
// incremented.
if ip[j] > 0 {
break
}
}
}
// AssignPorts based on an ask from the scheduler processing a group.network
// stanza. Supports multi-interfaces through node configured host_networks.
//
// AssignTaskNetwork supports the deprecated task.resources.network stanza.
func (idx *NetworkIndex) AssignPorts(ask *NetworkResource) (AllocatedPorts, error) {
var offer AllocatedPorts
@ -437,7 +513,7 @@ func (idx *NetworkIndex) AssignPorts(ask *NetworkResource) (AllocatedPorts, erro
// if allocPort is still nil after the loop, the port wasn't available for reservation
var allocPort *AllocatedPortMapping
var addrErr error
for _, addr := range idx.AvailAddresses[port.HostNetwork] {
for _, addr := range idx.HostNetworks[port.HostNetwork] {
used := idx.getUsedPortsFor(addr.Address)
// Guard against invalid port
if port.Value < 0 || port.Value >= MaxValidPort {
@ -472,7 +548,7 @@ func (idx *NetworkIndex) AssignPorts(ask *NetworkResource) (AllocatedPorts, erro
for _, port := range ask.DynamicPorts {
var allocPort *AllocatedPortMapping
var addrErr error
for _, addr := range idx.AvailAddresses[port.HostNetwork] {
for _, addr := range idx.HostNetworks[port.HostNetwork] {
used := idx.getUsedPortsFor(addr.Address)
// Try to stochastically pick the dynamic ports as it is faster and
// lower memory usage.
@ -512,13 +588,18 @@ func (idx *NetworkIndex) AssignPorts(ask *NetworkResource) (AllocatedPorts, erro
return offer, nil
}
// AssignNetwork is used to assign network resources given an ask.
// If the ask cannot be satisfied, returns nil
func (idx *NetworkIndex) AssignNetwork(ask *NetworkResource) (out *NetworkResource, err error) {
// AssignTaskNetwork is used to offer network resources given a
// task.resources.network ask. If the ask cannot be satisfied, returns nil
//
// AssignTaskNetwork and task.resources.network are deprecated in favor of
// AssignPorts and group.network. AssignTaskNetwork does not support multiple
// interfaces and only uses the node's default interface. AssignPorts is the
// method that is used for group.network asks.
func (idx *NetworkIndex) AssignTaskNetwork(ask *NetworkResource) (out *NetworkResource, err error) {
err = fmt.Errorf("no networks available")
idx.yieldIP(func(n *NetworkResource, ip net.IP) (stop bool) {
idx.yieldIP(func(n *NetworkResource, offerIP net.IP) (stop bool) {
// Convert the IP to a string
ipStr := ip.String()
offerIPStr := offerIP.String()
// Check if we would exceed the bandwidth cap
availBandwidth := idx.AvailBandwidth[n.Device]
@ -528,7 +609,7 @@ func (idx *NetworkIndex) AssignNetwork(ask *NetworkResource) (out *NetworkResour
return
}
used := idx.UsedPorts[ipStr]
used := idx.UsedPorts[offerIPStr]
// Check if any of the reserved ports are in use
for _, port := range ask.ReservedPorts {
@ -549,7 +630,7 @@ func (idx *NetworkIndex) AssignNetwork(ask *NetworkResource) (out *NetworkResour
offer := &NetworkResource{
Mode: ask.Mode,
Device: n.Device,
IP: ipStr,
IP: offerIPStr,
MBits: ask.MBits,
DNS: ask.DNS,
ReservedPorts: ask.ReservedPorts,

View File

@ -189,20 +189,10 @@ func TestNetworkIndex_SetNode(t *testing.T) {
},
},
}
collide, reason := idx.SetNode(n)
if collide || reason != "" {
t.Fatalf("bad")
}
if len(idx.AvailNetworks) != 1 {
t.Fatalf("Bad")
}
if idx.AvailBandwidth["eth0"] != 1000 {
t.Fatalf("Bad")
}
if !idx.UsedPorts["192.168.0.100"].Check(22) {
t.Fatalf("Bad")
}
require.NoError(t, idx.SetNode(n))
require.Len(t, idx.TaskNetworks, 1)
require.Equal(t, 1000, idx.AvailBandwidth["eth0"])
require.True(t, idx.UsedPorts["192.168.0.100"].Check(22))
}
func TestNetworkIndex_AddAllocs(t *testing.T) {
@ -327,7 +317,7 @@ func TestNetworkIndex_yieldIP(t *testing.T) {
}
}
func TestNetworkIndex_AssignNetwork(t *testing.T) {
func TestNetworkIndex_AssignTaskNetwork(t *testing.T) {
ci.Parallel(t)
idx := NewNetworkIndex()
n := &Node{
@ -379,7 +369,7 @@ func TestNetworkIndex_AssignNetwork(t *testing.T) {
ask := &NetworkResource{
ReservedPorts: []Port{{"main", 8000, 0, ""}},
}
offer, err := idx.AssignNetwork(ask)
offer, err := idx.AssignTaskNetwork(ask)
require.NoError(t, err)
require.NotNil(t, offer)
require.Equal(t, "192.168.0.101", offer.IP)
@ -391,7 +381,7 @@ func TestNetworkIndex_AssignNetwork(t *testing.T) {
ask = &NetworkResource{
DynamicPorts: []Port{{"http", 0, 80, ""}, {"https", 0, 443, ""}, {"admin", 0, -1, ""}},
}
offer, err = idx.AssignNetwork(ask)
offer, err = idx.AssignTaskNetwork(ask)
require.NoError(t, err)
require.NotNil(t, offer)
require.Equal(t, "192.168.0.100", offer.IP)
@ -410,7 +400,7 @@ func TestNetworkIndex_AssignNetwork(t *testing.T) {
ReservedPorts: []Port{{"main", 2345, 0, ""}},
DynamicPorts: []Port{{"http", 0, 80, ""}, {"https", 0, 443, ""}, {"admin", 0, 8080, ""}},
}
offer, err = idx.AssignNetwork(ask)
offer, err = idx.AssignTaskNetwork(ask)
require.NoError(t, err)
require.NotNil(t, offer)
require.Equal(t, "192.168.0.100", offer.IP)
@ -423,7 +413,7 @@ func TestNetworkIndex_AssignNetwork(t *testing.T) {
ask = &NetworkResource{
MBits: 1000,
}
offer, err = idx.AssignNetwork(ask)
offer, err = idx.AssignTaskNetwork(ask)
require.Error(t, err)
require.Equal(t, "bandwidth exceeded", err.Error())
require.Nil(t, offer)
@ -431,7 +421,7 @@ func TestNetworkIndex_AssignNetwork(t *testing.T) {
// This test ensures that even with a small domain of available ports we are
// able to make a dynamic port allocation.
func TestNetworkIndex_AssignNetwork_Dynamic_Contention(t *testing.T) {
func TestNetworkIndex_AssignTaskNetwork_Dynamic_Contention(t *testing.T) {
ci.Parallel(t)
// Create a node that only has one free port
@ -459,7 +449,7 @@ func TestNetworkIndex_AssignNetwork_Dynamic_Contention(t *testing.T) {
ask := &NetworkResource{
DynamicPorts: []Port{{"http", 0, 80, ""}},
}
offer, err := idx.AssignNetwork(ask)
offer, err := idx.AssignTaskNetwork(ask)
if err != nil {
t.Fatalf("err: %v", err)
}
@ -503,23 +493,11 @@ func TestNetworkIndex_SetNode_Old(t *testing.T) {
},
},
}
collide, reason := idx.SetNode(n)
if collide || reason != "" {
t.Fatalf("bad")
}
if len(idx.AvailNetworks) != 1 {
t.Fatalf("Bad")
}
if idx.AvailBandwidth["eth0"] != 1000 {
t.Fatalf("Bad")
}
if idx.UsedBandwidth["eth0"] != 1 {
t.Fatalf("Bad")
}
if !idx.UsedPorts["192.168.0.100"].Check(22) {
t.Fatalf("Bad")
}
require.NoError(t, idx.SetNode(n))
require.Len(t, idx.TaskNetworks, 1)
require.Equal(t, 1000, idx.AvailBandwidth["eth0"])
require.Equal(t, 1, idx.UsedBandwidth["eth0"])
require.True(t, idx.UsedPorts["192.168.0.100"].Check(22))
}
// COMPAT(0.11): Remove in 0.11
@ -618,7 +596,7 @@ func TestNetworkIndex_yieldIP_Old(t *testing.T) {
}
// COMPAT(0.11): Remove in 0.11
func TestNetworkIndex_AssignNetwork_Old(t *testing.T) {
func TestNetworkIndex_AssignTaskNetwork_Old(t *testing.T) {
ci.Parallel(t)
idx := NewNetworkIndex()
@ -681,7 +659,7 @@ func TestNetworkIndex_AssignNetwork_Old(t *testing.T) {
ask := &NetworkResource{
ReservedPorts: []Port{{"main", 8000, 0, ""}},
}
offer, err := idx.AssignNetwork(ask)
offer, err := idx.AssignTaskNetwork(ask)
if err != nil {
t.Fatalf("err: %v", err)
}
@ -700,7 +678,7 @@ func TestNetworkIndex_AssignNetwork_Old(t *testing.T) {
ask = &NetworkResource{
DynamicPorts: []Port{{"http", 0, 80, ""}, {"https", 0, 443, ""}, {"admin", 0, 8080, ""}},
}
offer, err = idx.AssignNetwork(ask)
offer, err = idx.AssignTaskNetwork(ask)
if err != nil {
t.Fatalf("err: %v", err)
}
@ -724,7 +702,7 @@ func TestNetworkIndex_AssignNetwork_Old(t *testing.T) {
ReservedPorts: []Port{{"main", 2345, 0, ""}},
DynamicPorts: []Port{{"http", 0, 80, ""}, {"https", 0, 443, ""}, {"admin", 0, 8080, ""}},
}
offer, err = idx.AssignNetwork(ask)
offer, err = idx.AssignTaskNetwork(ask)
if err != nil {
t.Fatalf("err: %v", err)
}
@ -744,7 +722,7 @@ func TestNetworkIndex_AssignNetwork_Old(t *testing.T) {
ask = &NetworkResource{
MBits: 1000,
}
offer, err = idx.AssignNetwork(ask)
offer, err = idx.AssignTaskNetwork(ask)
if err.Error() != "bandwidth exceeded" {
t.Fatalf("err: %v", err)
}
@ -756,7 +734,7 @@ func TestNetworkIndex_AssignNetwork_Old(t *testing.T) {
// COMPAT(0.11): Remove in 0.11
// This test ensures that even with a small domain of available ports we are
// able to make a dynamic port allocation.
func TestNetworkIndex_AssignNetwork_Dynamic_Contention_Old(t *testing.T) {
func TestNetworkIndex_AssignTaskNetwork_Dynamic_Contention_Old(t *testing.T) {
ci.Parallel(t)
// Create a node that only has one free port
@ -791,7 +769,7 @@ func TestNetworkIndex_AssignNetwork_Dynamic_Contention_Old(t *testing.T) {
ask := &NetworkResource{
DynamicPorts: []Port{{"http", 0, 80, ""}},
}
offer, err := idx.AssignNetwork(ask)
offer, err := idx.AssignTaskNetwork(ask)
if err != nil {
t.Fatalf("err: %v", err)
}
@ -823,3 +801,116 @@ func TestIntContains(t *testing.T) {
t.Fatalf("bad")
}
}
func TestNetworkIndex_SetNode_HostNets(t *testing.T) {
ci.Parallel(t)
idx := NewNetworkIndex()
n := &Node{
NodeResources: &NodeResources{
Networks: []*NetworkResource{
// As of Nomad v1.3 bridge networks get
// registered with only their mode set.
{
Mode: "bridge",
},
// Localhost (agent interface)
{
CIDR: "127.0.0.1/32",
Device: "lo",
IP: "127.0.0.1",
MBits: 1000,
Mode: "host",
},
{
CIDR: "::1/128",
Device: "lo",
IP: "::1",
MBits: 1000,
Mode: "host",
},
// Node.NodeResources.Networks does *not*
// contain host_networks.
},
NodeNetworks: []*NodeNetworkResource{
// As of Nomad v1.3 bridge networks get
// registered with only their mode set.
{
Mode: "bridge",
},
{
Addresses: []NodeNetworkAddress{
{
Address: "127.0.0.1",
Alias: "default",
Family: "ipv4",
},
{
Address: "::1",
Alias: "default",
Family: "ipv6",
},
},
Device: "lo",
Mode: "host",
Speed: 1000,
},
{
Addresses: []NodeNetworkAddress{
{
Address: "192.168.0.1",
Alias: "eth0",
Family: "ipv4",
ReservedPorts: "22",
},
},
Device: "enxaaaaaaaaaaaa",
MacAddress: "aa:aa:aa:aa:aa:aa",
Mode: "host",
Speed: 1000,
},
{
Addresses: []NodeNetworkAddress{
{
Address: "192.168.1.1",
Alias: "eth1",
Family: "ipv4",
ReservedPorts: "80",
},
},
Device: "enxbbbbbbbbbbbb",
MacAddress: "bb:bb:bb:bb:bb:bb",
Mode: "host",
Speed: 1000,
},
},
},
ReservedResources: &NodeReservedResources{
Networks: NodeReservedNetworkResources{
ReservedHostPorts: "22",
},
},
}
require.NoError(t, idx.SetNode(n))
// TaskNetworks should only contain the bridge and agent network
require.Len(t, idx.TaskNetworks, 2)
// Ports should be used across all 4 IPs
require.Equal(t, 4, len(idx.UsedPorts))
// 22 should be reserved on all IPs
require.True(t, idx.UsedPorts["127.0.0.1"].Check(22))
require.True(t, idx.UsedPorts["::1"].Check(22))
require.True(t, idx.UsedPorts["192.168.0.1"].Check(22))
require.True(t, idx.UsedPorts["192.168.1.1"].Check(22))
// 80 should only be reserved on eth1's address
require.False(t, idx.UsedPorts["127.0.0.1"].Check(80))
require.False(t, idx.UsedPorts["::1"].Check(80))
require.False(t, idx.UsedPorts["192.168.0.1"].Check(80))
require.True(t, idx.UsedPorts["192.168.1.1"].Check(80))
}

View File

@ -2913,10 +2913,20 @@ type NodeResources struct {
Cpu NodeCpuResources
Memory NodeMemoryResources
Disk NodeDiskResources
Networks Networks
NodeNetworks []*NodeNetworkResource
Devices []*NodeDeviceResource
// NodeNetworks was added in Nomad 0.12 to support multiple interfaces.
// It is the superset of host_networks, fingerprinted networks, and the
// node's default interface.
NodeNetworks []*NodeNetworkResource
// Networks is the node's bridge network and default interface. It is
// only used when scheduling jobs with a deprecated
// task.resources.network stanza.
Networks Networks
// MinDynamicPort and MaxDynamicPort represent the inclusive port range
// to select dynamic ports from across all networks.
MinDynamicPort int
MaxDynamicPort int
}
@ -2993,17 +3003,8 @@ func (n *NodeResources) Merge(o *NodeResources) {
}
if len(o.NodeNetworks) != 0 {
lookupNetwork := func(nets []*NodeNetworkResource, name string) (int, *NodeNetworkResource) {
for i, nw := range nets {
if nw.Device == name {
return i, nw
}
}
return 0, nil
}
for _, nw := range o.NodeNetworks {
if i, nnw := lookupNetwork(n.NodeNetworks, nw.Device); nnw != nil {
if i, nnw := lookupNetworkByDevice(n.NodeNetworks, nw.Device); nnw != nil {
n.NodeNetworks[i] = nw
} else {
n.NodeNetworks = append(n.NodeNetworks, nw)
@ -3012,6 +3013,15 @@ func (n *NodeResources) Merge(o *NodeResources) {
}
}
func lookupNetworkByDevice(nets []*NodeNetworkResource, name string) (int, *NodeNetworkResource) {
for i, nw := range nets {
if nw.Device == name {
return i, nw
}
}
return 0, nil
}
func (n *NodeResources) Equals(o *NodeResources) bool {
if o == nil && n == nil {
return true

View File

@ -435,7 +435,7 @@ func TestPortCollisionEvent_Copy(t *testing.T) {
evCopy.Allocations = append(evCopy.Allocations, mock.Alloc())
require.NotEqual(t, ev.Allocations, evCopy.Allocations)
evCopy.NetIndex.AddReservedPortRange("1000-2000")
evCopy.NetIndex.AddAllocs(evCopy.Allocations)
require.NotEqual(t, ev.NetIndex, evCopy.NetIndex)
}

View File

@ -417,13 +417,13 @@ func (c *NetworkChecker) hasHostNetworks(option *structs.Node) bool {
}
found := false
for _, net := range option.NodeResources.NodeNetworks {
if net.HasAlias(hostNetworkValue.(string)) {
if net.HasAlias(hostNetworkValue) {
found = true
break
}
}
if !found {
c.ctx.Metrics().FilterNode(option, fmt.Sprintf("missing host network %q for port %q", hostNetworkValue.(string), port.Label))
c.ctx.Metrics().FilterNode(option, fmt.Sprintf("missing host network %q for port %q", hostNetworkValue, port.Label))
return false
}
}
@ -766,7 +766,7 @@ func (c *ConstraintChecker) meetsConstraint(constraint *structs.Constraint, opti
}
// resolveTarget is used to resolve the LTarget and RTarget of a Constraint.
func resolveTarget(target string, node *structs.Node) (interface{}, bool) {
func resolveTarget(target string, node *structs.Node) (string, bool) {
// If no prefix, this must be a literal value
if !strings.HasPrefix(target, "${") {
return target, true
@ -797,7 +797,7 @@ func resolveTarget(target string, node *structs.Node) (interface{}, bool) {
return val, ok
default:
return nil, false
return "", false
}
}

View File

@ -342,14 +342,5 @@ func getProperty(n *structs.Node, property string) (string, bool) {
return "", false
}
val, ok := resolveTarget(property, n)
if !ok {
return "", false
}
nodeValue, ok := val.(string)
if !ok {
return "", false
}
return nodeValue, true
return resolveTarget(property, n)
}

View File

@ -211,13 +211,13 @@ OUTER:
// the node. If it does collide though, it means we found a bug! So
// collect as much information as possible.
netIdx := structs.NewNetworkIndex()
if collide, reason := netIdx.SetNode(option.Node); collide {
if err := netIdx.SetNode(option.Node); err != nil {
iter.ctx.SendEvent(&PortCollisionEvent{
Reason: reason,
Reason: err.Error(),
NetIndex: netIdx.Copy(),
Node: option.Node,
})
iter.ctx.Metrics().ExhaustedNode(option.Node, "network: port collision")
iter.ctx.Metrics().ExhaustedNode(option.Node, "network: invalid node")
continue
}
if collide, reason := netIdx.AddAllocs(proposed); collide {
@ -274,7 +274,7 @@ OUTER:
for i, port := range ask.DynamicPorts {
if port.HostNetwork != "" {
if hostNetworkValue, hostNetworkOk := resolveTarget(port.HostNetwork, option.Node); hostNetworkOk {
ask.DynamicPorts[i].HostNetwork = hostNetworkValue.(string)
ask.DynamicPorts[i].HostNetwork = hostNetworkValue
} else {
iter.ctx.Logger().Named("binpack").Error(fmt.Sprintf("Invalid template for %s host network in port %s", port.HostNetwork, port.Label))
netIdx.Release()
@ -285,7 +285,7 @@ OUTER:
for i, port := range ask.ReservedPorts {
if port.HostNetwork != "" {
if hostNetworkValue, hostNetworkOk := resolveTarget(port.HostNetwork, option.Node); hostNetworkOk {
ask.ReservedPorts[i].HostNetwork = hostNetworkValue.(string)
ask.ReservedPorts[i].HostNetwork = hostNetworkValue
} else {
iter.ctx.Logger().Named("binpack").Error(fmt.Sprintf("Invalid template for %s host network in port %s", port.HostNetwork, port.Label))
netIdx.Release()
@ -363,7 +363,7 @@ OUTER:
// Check if we need a network resource
if len(task.Resources.Networks) > 0 {
ask := task.Resources.Networks[0].Copy()
offer, err := netIdx.AssignNetwork(ask)
offer, err := netIdx.AssignTaskNetwork(ask)
if offer == nil {
// If eviction is not enabled, mark this node as exhausted and continue
if !iter.evict {
@ -393,7 +393,7 @@ OUTER:
netIdx.SetNode(option.Node)
netIdx.AddAllocs(proposed)
offer, err = netIdx.AssignNetwork(ask)
offer, err = netIdx.AssignTaskNetwork(ask)
if offer == nil {
iter.ctx.Logger().Named("binpack").Debug("unexpected error, unable to create network offer after considering preemption", "error", err)
netIdx.Release()

View File

@ -492,12 +492,13 @@ func TestBinPackIterator_Network_Failure(t *testing.T) {
require.Equal(1, ctx.metrics.DimensionExhausted["network: bandwidth exceeded"])
}
func TestBinPackIterator_Network_PortCollision_Node(t *testing.T) {
func TestBinPackIterator_Network_NoCollision_Node(t *testing.T) {
_, ctx := testContext(t)
eventsCh := make(chan interface{})
ctx.eventsCh = eventsCh
// Collide on host with duplicate IPs.
// Host networks can have overlapping addresses in which case their
// reserved ports are merged.
nodes := []*RankedNode{
{
Node: &structs.Node{
@ -577,9 +578,110 @@ func TestBinPackIterator_Network_PortCollision_Node(t *testing.T) {
scoreNorm := NewScoreNormalizationIterator(ctx, binp)
out := collectRanked(scoreNorm)
// We expect a placement failure due to port collision.
// Placement should succeed since reserved ports are merged instead of
// treating them as a collision
require.Len(t, out, 1)
}
// TestBinPackIterator_Network_NodeError asserts that NetworkIndex.SetNode can
// return an error and cause a node to be infeasible.
//
// This should never happen as it indicates "bad" configuration was either not
// caught by validation or caused by bugs in serverside Node handling.
func TestBinPackIterator_Network_NodeError(t *testing.T) {
_, ctx := testContext(t)
eventsCh := make(chan interface{})
ctx.eventsCh = eventsCh
nodes := []*RankedNode{
{
Node: &structs.Node{
ID: uuid.Generate(),
Resources: &structs.Resources{
Networks: []*structs.NetworkResource{
{
Device: "eth0",
CIDR: "192.168.0.100/32",
IP: "192.158.0.100",
},
},
},
NodeResources: &structs.NodeResources{
Cpu: structs.NodeCpuResources{
CpuShares: 4096,
},
Memory: structs.NodeMemoryResources{
MemoryMB: 4096,
},
Networks: []*structs.NetworkResource{
{
Device: "eth0",
CIDR: "192.168.0.100/32",
IP: "192.158.0.100",
},
},
NodeNetworks: []*structs.NodeNetworkResource{
{
Mode: "host",
Device: "eth0",
Addresses: []structs.NodeNetworkAddress{
{
Alias: "default",
Address: "192.168.0.100",
ReservedPorts: "22,80",
},
{
Alias: "private",
Address: "192.168.0.100",
ReservedPorts: "22",
},
},
},
},
},
ReservedResources: &structs.NodeReservedResources{
Networks: structs.NodeReservedNetworkResources{
ReservedHostPorts: "not-valid-ports",
},
},
},
},
}
static := NewStaticRankIterator(ctx, nodes)
taskGroup := &structs.TaskGroup{
EphemeralDisk: &structs.EphemeralDisk{},
Tasks: []*structs.Task{
{
Name: "web",
Resources: &structs.Resources{
CPU: 1024,
MemoryMB: 1024,
Networks: []*structs.NetworkResource{
{
Device: "eth0",
},
},
},
},
},
Networks: []*structs.NetworkResource{
{
Device: "eth0",
},
},
}
binp := NewBinPackIterator(ctx, static, false, 0, testSchedulerConfig)
binp.SetTaskGroup(taskGroup)
scoreNorm := NewScoreNormalizationIterator(ctx, binp)
out := collectRanked(scoreNorm)
// We expect a placement failure because the node has invalid reserved
// ports
require.Len(t, out, 0)
require.Equal(t, 1, ctx.metrics.DimensionExhausted["network: port collision"])
require.Equal(t, 1, ctx.metrics.DimensionExhausted["network: invalid node"],
ctx.metrics.DimensionExhausted)
}
func TestBinPackIterator_Network_PortCollision_Alloc(t *testing.T) {

View File

@ -206,9 +206,12 @@ chroot as doing so would cause infinite recursion.
- `disk` `(int: 0)` - Specifies the amount of disk to reserve, in MB.
- `reserved_ports` `(string: "")` - Specifies a comma-separated list of ports to
reserve on all fingerprinted network devices. Ranges can be specified by using
a hyphen separated the two inclusive ends.
- `reserved_ports` `(string: "")` - Specifies a comma-separated list of ports
to reserve on all fingerprinted network devices. Ranges can be specified by
using a hyphen separating the two inclusive ends. See also
[`host_network`](#host_network-stanza) for reserving ports on specific host
networks.
### `artifact` Parameters
@ -396,8 +399,10 @@ client {
- `interface` `(string: "")` - Filters searching of addresses to a specific interface.
- `reserved_ports` `(string: "")` - Specifies a comma-separated list of ports to
reserve on all fingerprinted network devices. Ranges can be specified by using
reserve on all addresses associated with this network. Ranges can be specified by using
a hyphen separating the two inclusive ends.
[`reserved.reserved_ports`](#reserved_ports) are also reserved on each host
network.
## `client` Examples