Merge pull request #8208 from hashicorp/f-multi-network

multi-interface network support
This commit is contained in:
Michael Schurter 2020-06-19 15:46:48 -07:00 committed by GitHub
commit 562704124d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
35 changed files with 781 additions and 227 deletions

View File

@ -20,7 +20,7 @@ func TestCompose(t *testing.T) {
{
CIDR: "0.0.0.0/0",
MBits: intToPtr(100),
ReservedPorts: []Port{{"", 80, 0}, {"", 443, 0}},
ReservedPorts: []Port{{"", 80, 0, ""}, {"", 443, 0, ""}},
},
},
})
@ -111,8 +111,8 @@ func TestCompose(t *testing.T) {
CIDR: "0.0.0.0/0",
MBits: intToPtr(100),
ReservedPorts: []Port{
{"", 80, 0},
{"", 443, 0},
{"", 80, 0, ""},
{"", 443, 0, ""},
},
},
},

View File

@ -87,6 +87,7 @@ type Port struct {
Label string
Value int `mapstructure:"static"`
To int `mapstructure:"to"`
HostNetwork string `mapstructure:"host_network"`
}
type DNSConfig struct {

View File

@ -269,7 +269,7 @@ func TestTask_Require(t *testing.T) {
{
CIDR: "0.0.0.0/0",
MBits: intToPtr(100),
ReservedPorts: []Port{{"", 80, 0}, {"", 443, 0}},
ReservedPorts: []Port{{"", 80, 0, ""}, {"", 443, 0, ""}},
},
},
}

View File

@ -164,10 +164,12 @@ func (c *cniNetworkConfigurator) ensureCNIInitialized() error {
// portmapping capability arguments for the portmap CNI plugin
func getPortMapping(alloc *structs.Allocation) []cni.PortMapping {
ports := []cni.PortMapping{}
if len(alloc.AllocatedResources.Shared.Ports) == 0 && len(alloc.AllocatedResources.Shared.Networks) > 0 {
for _, network := range alloc.AllocatedResources.Shared.Networks {
for _, port := range append(network.DynamicPorts, network.ReservedPorts...) {
if port.To < 1 {
continue
port.To = port.Value
}
for _, proto := range []string{"tcp", "udp"} {
ports = append(ports, cni.PortMapping{
@ -178,5 +180,20 @@ func getPortMapping(alloc *structs.Allocation) []cni.PortMapping {
}
}
}
} else {
for _, port := range alloc.AllocatedResources.Shared.Ports {
if port.To < 1 {
port.To = port.Value
}
for _, proto := range []string{"tcp", "udp"} {
ports = append(ports, cni.PortMapping{
HostPort: int32(port.Value),
ContainerPort: int32(port.To),
Protocol: proto,
HostIP: port.HostIP,
})
}
}
}
return ports
}

View File

@ -1114,6 +1114,7 @@ func TestClient_UpdateNodeFromDevicesAccumulates(t *testing.T) {
expectedResources := &structs.NodeResources{
// computed through test client initialization
Networks: client.configCopy.Node.NodeResources.Networks,
NodeNetworks: client.configCopy.Node.NodeResources.NodeNetworks,
Disk: client.configCopy.Node.NodeResources.Disk,
// injected
@ -1151,6 +1152,7 @@ func TestClient_UpdateNodeFromDevicesAccumulates(t *testing.T) {
expectedResources2 := &structs.NodeResources{
// computed through test client initialization
Networks: client.configCopy.Node.NodeResources.Networks,
NodeNetworks: client.configCopy.Node.NodeResources.NodeNetworks,
Disk: client.configCopy.Node.NodeResources.Disk,
// injected

View File

@ -254,6 +254,9 @@ type Config struct {
// HostVolumes is a map of the configured host volumes by name.
HostVolumes map[string]*structs.ClientHostVolumeConfig
// HostNetworks is a map of the conigured host networks by name.
HostNetworks map[string]*structs.ClientHostNetworkConfig
}
type ClientTemplateConfig struct {
@ -313,6 +316,7 @@ func DefaultConfig() *Config {
CNIPath: "/opt/cni/bin",
CNIConfigDir: "/opt/cni/config",
CNIInterfacePrefix: "eth",
HostNetworks: map[string]*structs.ClientHostNetworkConfig{},
}
}

View File

@ -23,6 +23,12 @@ func (f *BridgeFingerprint) Fingerprint(req *FingerprintRequest, resp *Fingerpri
Mode: "bridge",
},
},
NodeNetworks: []*structs.NodeNetworkResource{
{
Mode: "bridge",
Device: req.Config.BridgeNetworkName,
},
},
}
resp.Detected = true
return nil

View File

@ -58,16 +58,22 @@ func (f *CNIFingerprint) Fingerprint(req *FingerprintRequest, resp *FingerprintR
}
var nodeNetworks structs.Networks
var newNodeNetworks []*structs.NodeNetworkResource
for name := range networks {
mode := fmt.Sprintf("cni/%s", name)
nodeNetworks = append(nodeNetworks, &structs.NetworkResource{
Mode: fmt.Sprintf("cni/%s", name),
Mode: mode,
})
newNodeNetworks = append(newNodeNetworks, &structs.NodeNetworkResource{
Mode: mode,
})
f.logger.Debug("detected CNI network", "name", name)
}
resp.NodeResources = &structs.NodeResources{
Networks: nodeNetworks,
NodeNetworks: newNodeNetworks,
}
resp.Detected = true

View File

@ -3,9 +3,12 @@ package fingerprint
import (
"fmt"
"net"
"strings"
log "github.com/hashicorp/go-hclog"
sockaddr "github.com/hashicorp/go-sockaddr"
"github.com/hashicorp/go-sockaddr/template"
"github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/nomad/structs"
)
@ -113,11 +116,135 @@ func (f *NetworkFingerprint) Fingerprint(req *FingerprintRequest, resp *Fingerpr
if len(nwResources) > 0 {
resp.AddAttribute("unique.network.ip-address", nwResources[0].IP)
}
ifaces, err := f.interfaceDetector.Interfaces()
if err != nil {
return err
}
nodeNetResources, err := f.createNodeNetworkResources(ifaces, disallowLinkLocal, req.Config)
if err != nil {
return err
}
resp.NodeResources.NodeNetworks = nodeNetResources
resp.Detected = true
return nil
}
func (f *NetworkFingerprint) createNodeNetworkResources(ifaces []net.Interface, disallowLinkLocal bool, conf *config.Config) ([]*structs.NodeNetworkResource, error) {
nets := make([]*structs.NodeNetworkResource, 0)
for _, iface := range ifaces {
speed := f.linkSpeed(iface.Name)
if speed == 0 {
speed = defaultNetworkSpeed
f.logger.Debug("link speed could not be detected, falling back to default speed", "mbits", defaultNetworkSpeed)
}
newNetwork := &structs.NodeNetworkResource{
Mode: "host",
Device: iface.Name,
MacAddress: iface.HardwareAddr.String(),
Speed: speed,
}
addrs, err := f.interfaceDetector.Addrs(&iface)
if err != nil {
return nil, err
}
var networkAddrs, linkLocalAddrs []structs.NodeNetworkAddress
for _, addr := range addrs {
// Find the IP Addr and the CIDR from the Address
var ip net.IP
var family structs.NodeNetworkAF
switch v := (addr).(type) {
case *net.IPNet:
ip = v.IP
case *net.IPAddr:
ip = v.IP
}
if ip.To4() != nil {
family = structs.NodeNetworkAF_IPv4
} else {
family = structs.NodeNetworkAF_IPv6
}
newAddr := structs.NodeNetworkAddress{
Address: ip.String(),
Family: family,
Alias: deriveAddressAlias(iface, ip, conf),
}
if newAddr.Alias != "" {
if ip.IsLinkLocalUnicast() || ip.IsLinkLocalMulticast() {
linkLocalAddrs = append(linkLocalAddrs, newAddr)
} else {
networkAddrs = append(networkAddrs, newAddr)
}
}
}
if len(networkAddrs) == 0 && len(linkLocalAddrs) > 0 {
if disallowLinkLocal {
f.logger.Debug("ignoring detected link-local address on interface", "interface", iface.Name)
} else {
newNetwork.Addresses = linkLocalAddrs
}
} else {
newNetwork.Addresses = networkAddrs
}
if len(newNetwork.Addresses) > 0 {
nets = append(nets, newNetwork)
}
}
return nets, nil
}
func deriveAddressAlias(iface net.Interface, addr net.IP, config *config.Config) string {
for name, conf := range config.HostNetworks {
var cidrMatch, ifaceMatch bool
if conf.CIDR != "" {
for _, cidr := range strings.Split(conf.CIDR, ",") {
_, ipnet, err := net.ParseCIDR(cidr)
if err != nil {
continue
}
if ipnet.Contains(addr) {
cidrMatch = true
break
}
}
} else {
cidrMatch = true
}
if conf.Interface != "" {
ifaceName, err := template.Parse(conf.Interface)
if err != nil {
continue
}
if ifaceName == iface.Name {
ifaceMatch = true
}
} else {
ifaceMatch = true
}
if cidrMatch && ifaceMatch {
return name
}
}
ri, err := sockaddr.NewRouteInfo()
if err == nil {
defaultIface, err := ri.GetDefaultInterfaceName()
if err == nil && iface.Name == defaultIface {
return "default"
}
}
return ""
}
// createNetworkResources creates network resources for every IP
func (f *NetworkFingerprint) createNetworkResources(throughput int, intf *net.Interface, disallowLinkLocal bool) ([]*structs.NetworkResource, error) {
// Find the interface with the name

View File

@ -6,6 +6,7 @@ import (
"os"
"testing"
"github.com/davecgh/go-spew/spew"
"github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/nomad/structs"
@ -197,6 +198,8 @@ func TestNetworkFingerprint_basic(t *testing.T) {
t.Fatalf("err: %v", err)
}
spew.Dump(response)
os.Exit(0)
if !response.Detected {
t.Fatalf("expected response to be applicable")
}

View File

@ -70,15 +70,21 @@ const (
// The ip:port are always the host's.
AddrPrefix = "NOMAD_ADDR_"
HostAddrPrefix = "NOMAD_HOST_ADDR_"
// IpPrefix is the prefix for passing the host IP of a port allocation
// to a task.
IpPrefix = "NOMAD_IP_"
HostIpPrefix = "NOMAD_HOST_IP_"
// PortPrefix is the prefix for passing the port allocation to a task.
// It will be the task's port if a port map is specified. Task's should
// bind to this port.
PortPrefix = "NOMAD_PORT_"
AllocPortPrefix = "NOMAD_ALLOC_PORT_"
// HostPortPrefix is the prefix for passing the host port when a port
// map is specified.
HostPortPrefix = "NOMAD_HOST_PORT_"
@ -620,6 +626,7 @@ func (b *Builder) setAlloc(alloc *structs.Allocation) *Builder {
}
}
// COMPAT(1.0): remove in 1.0 when AllocatedPorts can be used exclusively
// Add ports from other tasks
for taskName, resources := range alloc.AllocatedResources.Tasks {
// Add ports from other tasks
@ -637,6 +644,7 @@ func (b *Builder) setAlloc(alloc *structs.Allocation) *Builder {
}
}
// COMPAT(1.0): remove in 1.0 when AllocatedPorts can be used exclusively
// Add ports from group networks
//TODO Expose IPs but possibly only via variable interpolation
for _, nw := range alloc.AllocatedResources.Shared.Networks {
@ -647,6 +655,11 @@ func (b *Builder) setAlloc(alloc *structs.Allocation) *Builder {
addGroupPort(b.otherPorts, p)
}
}
// Add any allocated host ports
if alloc.AllocatedResources.Shared.Ports != nil {
addPorts(b.otherPorts, alloc.AllocatedResources.Shared.Ports)
}
}
upstreams := []structs.ConsulUpstream{}
@ -857,3 +870,23 @@ func addGroupPort(m map[string]string, port structs.Port) {
m[HostPortPrefix+port.Label] = strconv.Itoa(port.Value)
}
func addPorts(m map[string]string, ports structs.AllocatedPorts) {
for _, p := range ports {
m[AddrPrefix+p.Label] = fmt.Sprintf("%s:%d", p.HostIP, p.Value)
m[HostAddrPrefix+p.Label] = fmt.Sprintf("%s:%d", p.HostIP, p.Value)
m[IpPrefix+p.Label] = p.HostIP
m[HostIpPrefix+p.Label] = p.HostIP
if p.To > 0 {
val := strconv.Itoa(p.To)
m[PortPrefix+p.Label] = val
m[AllocPortPrefix+p.Label] = val
} else {
val := strconv.Itoa(p.Value)
m[PortPrefix+p.Label] = val
m[AllocPortPrefix+p.Label] = val
}
m[HostPortPrefix+p.Label] = strconv.Itoa(p.Value)
}
}

View File

@ -360,6 +360,15 @@ func TestEnvironment_AllValues(t *testing.T) {
},
}
a.AllocatedResources.Shared.Ports = structs.AllocatedPorts{
{
Label: "admin",
Value: 32000,
To: 9000,
HostIP: "127.0.0.1",
},
}
sharedNet := a.AllocatedResources.Shared.Networks[0]
// Add group network port with only a host port.
@ -463,6 +472,13 @@ func TestEnvironment_AllValues(t *testing.T) {
"NOMAD_HOST_PORT_hostonly": "9998",
"NOMAD_PORT_static": "97",
"NOMAD_HOST_PORT_static": "9997",
"NOMAD_ADDR_admin": "127.0.0.1:32000",
"NOMAD_HOST_ADDR_admin": "127.0.0.1:32000",
"NOMAD_IP_admin": "127.0.0.1",
"NOMAD_HOST_IP_admin": "127.0.0.1",
"NOMAD_PORT_admin": "9000",
"NOMAD_ALLOC_PORT_admin": "9000",
"NOMAD_HOST_PORT_admin": "32000",
// 0.9 style env map
`env["taskEnvKey"]`: "taskEnvVal",

View File

@ -635,6 +635,10 @@ func convertClientConfig(agentConfig *Config) (*clientconfig.Config, error) {
conf.BridgeNetworkName = agentConfig.Client.BridgeNetworkName
conf.BridgeNetworkAllocSubnet = agentConfig.Client.BridgeNetworkSubnet
for _, hn := range agentConfig.Client.HostNetworks {
conf.HostNetworks[hn.Name] = hn
}
return conf, nil
}

View File

@ -286,6 +286,8 @@ type ClientConfig struct {
// the host
BridgeNetworkSubnet string `hcl:"bridge_network_subnet"`
HostNetworks []*structs.ClientHostNetworkConfig `hcl:"host_network"`
// ExtraKeysHCL is used by hcl to surface unexpected keys
ExtraKeysHCL []string `hcl:",unusedKeys" json:"-"`
}
@ -1531,6 +1533,12 @@ func (a *ClientConfig) Merge(b *ClientConfig) *ClientConfig {
result.BridgeNetworkSubnet = b.BridgeNetworkSubnet
}
if len(b.HostNetworks) != 0 {
result.HostNetworks = append(a.HostNetworks, b.HostNetworks...)
} else {
result.HostNetworks = a.HostNetworks
}
return &result
}

View File

@ -138,6 +138,12 @@ func extraKeys(c *Config) error {
helper.RemoveEqualFold(&c.Client.ExtraKeysHCL, "host_volume")
}
// Remove HostNetwork extra keys
for _, hn := range c.Client.HostNetworks {
helper.RemoveEqualFold(&c.Client.ExtraKeysHCL, hn.Name)
helper.RemoveEqualFold(&c.Client.ExtraKeysHCL, "host_network")
}
// Remove AuditConfig extra keys
for _, f := range c.Audit.Filters {
helper.RemoveEqualFold(&c.Audit.ExtraKeysHCL, f.Name)

View File

@ -1142,22 +1142,14 @@ func ApiNetworkResourceToStructs(in []*api.NetworkResource) []*structs.NetworkRe
if l := len(nw.DynamicPorts); l != 0 {
out[i].DynamicPorts = make([]structs.Port, l)
for j, dp := range nw.DynamicPorts {
out[i].DynamicPorts[j] = structs.Port{
Label: dp.Label,
Value: dp.Value,
To: dp.To,
}
out[i].DynamicPorts[j] = ApiPortToStructs(dp)
}
}
if l := len(nw.ReservedPorts); l != 0 {
out[i].ReservedPorts = make([]structs.Port, l)
for j, rp := range nw.ReservedPorts {
out[i].ReservedPorts[j] = structs.Port{
Label: rp.Label,
Value: rp.Value,
To: rp.To,
}
out[i].ReservedPorts[j] = ApiPortToStructs(rp)
}
}
}
@ -1165,6 +1157,15 @@ func ApiNetworkResourceToStructs(in []*api.NetworkResource) []*structs.NetworkRe
return out
}
func ApiPortToStructs(in api.Port) structs.Port {
return structs.Port{
Label: in.Label,
Value: in.Value,
To: in.To,
HostNetwork: in.HostNetwork,
}
}
//TODO(schmichael) refactor and reuse in service parsing above
func ApiServicesToStructs(in []*api.Service) []*structs.Service {
if len(in) == 0 {

1
go.mod
View File

@ -36,6 +36,7 @@ require (
github.com/coreos/go-semver v0.3.0
github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f // indirect
github.com/cyphar/filepath-securejoin v0.2.3-0.20190205144030-7efe413b52e1 // indirect
github.com/davecgh/go-spew v1.1.1
github.com/docker/cli v0.0.0-20200303215952-eb310fca4956
github.com/docker/distribution v2.7.1+incompatible
github.com/docker/docker v17.12.0-ce-rc1.0.20200330121334-7f8b4b621b5d+incompatible

View File

@ -79,6 +79,7 @@ func parsePorts(networkObj *ast.ObjectList, nw *api.NetworkResource) error {
valid := []string{
"static",
"to",
"host_network",
}
if err := helper.CheckHCLKeys(port.Val, valid); err != nil {
return err

View File

@ -1016,6 +1016,7 @@ func TestParse(t *testing.T) {
Label: "http",
Value: 80,
To: 8080,
HostNetwork: "public",
},
},
DNS: &api.DNSConfig{

View File

@ -11,6 +11,7 @@ job "foo" {
port "http" {
static = 80
to = 8080
host_network = "public"
}
dns {

View File

@ -71,6 +71,13 @@ func Node() *structs.Node {
MBits: 1000,
},
},
NodeNetworks: []*structs.NodeNetworkResource{
{
Mode: "host",
Device: "eth0",
Speed: 1000,
},
},
},
ReservedResources: &structs.NodeReservedResources{
Cpu: structs.NodeReservedCpuResources{

View File

@ -2933,6 +2933,7 @@ func TestTaskGroupDiff(t *testing.T) {
{
Label: "bar",
To: 8081,
HostNetwork: "public",
},
},
DNS: &DNSConfig{
@ -2966,6 +2967,12 @@ func TestTaskGroupDiff(t *testing.T) {
Type: DiffTypeAdded,
Name: "Dynamic Port",
Fields: []*FieldDiff{
{
Type: DiffTypeAdded,
Name: "HostNetwork",
Old: "",
New: "public",
},
{
Type: DiffTypeAdded,
Name: "Label",
@ -3016,6 +3023,12 @@ func TestTaskGroupDiff(t *testing.T) {
Type: DiffTypeDeleted,
Name: "Static Port",
Fields: []*FieldDiff{
{
Type: DiffTypeNone,
Name: "HostNetwork",
Old: "",
New: "",
},
{
Type: DiffTypeDeleted,
Name: "Label",
@ -4561,6 +4574,12 @@ func TestTaskDiff(t *testing.T) {
Old: "2",
New: "2",
},
{
Type: DiffTypeNone,
Name: "boom.HostNetwork",
Old: "",
New: "",
},
{
Type: DiffTypeNone,
Name: "boom.Label",

View File

@ -108,7 +108,7 @@ func TestAllocsFit_PortsOvercommitted_Old(t *testing.T) {
Device: "eth0",
IP: "10.0.0.1",
MBits: 50,
ReservedPorts: []Port{{"main", 8000, 80}},
ReservedPorts: []Port{{"main", 8000, 80, ""}},
},
},
},
@ -160,7 +160,7 @@ func TestAllocsFit_Old(t *testing.T) {
Device: "eth0",
IP: "10.0.0.1",
MBits: 50,
ReservedPorts: []Port{{"main", 80, 0}},
ReservedPorts: []Port{{"main", 80, 0, ""}},
},
},
},
@ -176,7 +176,7 @@ func TestAllocsFit_Old(t *testing.T) {
Device: "eth0",
IP: "10.0.0.1",
MBits: 50,
ReservedPorts: []Port{{"main", 8000, 80}},
ReservedPorts: []Port{{"main", 8000, 80, ""}},
},
},
},
@ -227,7 +227,7 @@ func TestAllocsFit_TerminalAlloc_Old(t *testing.T) {
Device: "eth0",
IP: "10.0.0.1",
MBits: 50,
ReservedPorts: []Port{{"main", 80, 0}},
ReservedPorts: []Port{{"main", 80, 0, ""}},
},
},
},
@ -243,7 +243,7 @@ func TestAllocsFit_TerminalAlloc_Old(t *testing.T) {
Device: "eth0",
IP: "10.0.0.1",
MBits: 50,
ReservedPorts: []Port{{"main", 8000, 0}},
ReservedPorts: []Port{{"main", 8000, 0, ""}},
},
},
},
@ -323,7 +323,7 @@ func TestAllocsFit(t *testing.T) {
Device: "eth0",
IP: "10.0.0.1",
MBits: 50,
ReservedPorts: []Port{{"main", 8000, 0}},
ReservedPorts: []Port{{"main", 8000, 0, ""}},
},
},
},
@ -407,7 +407,7 @@ func TestAllocsFit_TerminalAlloc(t *testing.T) {
Device: "eth0",
IP: "10.0.0.1",
MBits: 50,
ReservedPorts: []Port{{"main", 8000, 80}},
ReservedPorts: []Port{{"main", 8000, 80, ""}},
},
},
},

View File

@ -34,6 +34,8 @@ var (
// and the used network resources on a machine given allocations
type NetworkIndex struct {
AvailNetworks []*NetworkResource // List of available networks
NodeNetworks []*NodeNetworkResource // List of available node networks
AvailAddresses map[string][]NodeNetworkAddress // Map of host network aliases to list of addresses
AvailBandwidth map[string]int // Bandwidth by device
UsedPorts map[string]Bitmap // Ports by IP
UsedBandwidth map[string]int // Bandwidth by device
@ -42,12 +44,29 @@ type NetworkIndex struct {
// NewNetworkIndex is used to construct a new network index
func NewNetworkIndex() *NetworkIndex {
return &NetworkIndex{
AvailAddresses: make(map[string][]NodeNetworkAddress),
AvailBandwidth: make(map[string]int),
UsedPorts: make(map[string]Bitmap),
UsedBandwidth: make(map[string]int),
}
}
func (idx *NetworkIndex) getUsedPortsFor(ip string) Bitmap {
used := idx.UsedPorts[ip]
if used == nil {
// Try to get a bitmap from the pool, else create
raw := bitmapPool.Get()
if raw != nil {
used = raw.(Bitmap)
used.Clear()
} else {
used, _ = NewBitmap(maxValidPort)
}
idx.UsedPorts[ip] = used
}
return used
}
// Release is called when the network index is no longer needed
// to attempt to re-use some of the memory it has allocated
func (idx *NetworkIndex) Release() {
@ -58,12 +77,13 @@ func (idx *NetworkIndex) Release() {
// Overcommitted checks if the network is overcommitted
func (idx *NetworkIndex) Overcommitted() bool {
for device, used := range idx.UsedBandwidth {
// TODO remove since bandwidth is deprecated
/*for device, used := range idx.UsedBandwidth {
avail := idx.AvailBandwidth[device]
if used > avail {
return true
}
}
}*/
return false
}
@ -80,6 +100,11 @@ func (idx *NetworkIndex) SetNode(node *Node) (collide bool) {
networks = node.Resources.Networks
}
var nodeNetworks []*NodeNetworkResource
if node.NodeResources != nil && len(node.NodeResources.NodeNetworks) != 0 {
nodeNetworks = node.NodeResources.NodeNetworks
}
// Add the available CIDR blocks
for _, n := range networks {
if n.Device != "" {
@ -88,6 +113,17 @@ func (idx *NetworkIndex) SetNode(node *Node) (collide bool) {
}
}
// TODO: upgrade path?
// is it possible to get duplicates here?
for _, n := range nodeNetworks {
for _, a := range n.Addresses {
idx.AvailAddresses[a.Alias] = append(idx.AvailAddresses[a.Alias], a)
if idx.AddReservedPortsForIP(a.ReservedPorts, a.Address) {
collide = true
}
}
}
// COMPAT(0.11): Remove in 0.11
// Handle reserving ports, handling both new and old
if node.ReservedResources != nil && node.ReservedResources.Networks.ReservedHostPorts != "" {
@ -131,6 +167,11 @@ func (idx *NetworkIndex) AddAllocs(allocs []*Allocation) (collide bool) {
collide = true
}
}
// Multi-interface TODO: handle upgrade path here?
if idx.AddReservedPorts(alloc.AllocatedResources.Shared.Ports) {
collide = true
}
} else {
// COMPAT(0.11): Remove in 0.11
for _, task := range alloc.TaskResources {
@ -151,18 +192,7 @@ func (idx *NetworkIndex) AddAllocs(allocs []*Allocation) (collide bool) {
// if there is a port collision
func (idx *NetworkIndex) AddReserved(n *NetworkResource) (collide bool) {
// Add the port usage
used := idx.UsedPorts[n.IP]
if used == nil {
// Try to get a bitmap from the pool, else create
raw := bitmapPool.Get()
if raw != nil {
used = raw.(Bitmap)
used.Clear()
} else {
used, _ = NewBitmap(maxValidPort)
}
idx.UsedPorts[n.IP] = used
}
used := idx.getUsedPortsFor(n.IP)
for _, ports := range [][]Port{n.ReservedPorts, n.DynamicPorts} {
for _, port := range ports {
@ -183,6 +213,22 @@ func (idx *NetworkIndex) AddReserved(n *NetworkResource) (collide bool) {
return
}
func (idx *NetworkIndex) AddReservedPorts(ports AllocatedPorts) (collide bool) {
for _, port := range ports {
used := idx.getUsedPortsFor(port.HostIP)
if port.Value < 0 || port.Value >= maxValidPort {
return true
}
if used.Check(uint(port.Value)) {
collide = true
} else {
used.Set(uint(port.Value))
}
}
return
}
// AddReservedPortRange marks the ports given as reserved on all network
// interfaces. The port format is comma delimited, with spans given as n1-n2
// (80,100-200,205)
@ -195,18 +241,7 @@ func (idx *NetworkIndex) AddReservedPortRange(ports string) (collide bool) {
// Ensure we create a bitmap for each available network
for _, n := range idx.AvailNetworks {
used := idx.UsedPorts[n.IP]
if used == nil {
// Try to get a bitmap from the pool, else create
raw := bitmapPool.Get()
if raw != nil {
used = raw.(Bitmap)
used.Clear()
} else {
used, _ = NewBitmap(maxValidPort)
}
idx.UsedPorts[n.IP] = used
}
idx.getUsedPortsFor(n.IP)
}
for _, used := range idx.UsedPorts {
@ -226,6 +261,30 @@ func (idx *NetworkIndex) AddReservedPortRange(ports string) (collide bool) {
return
}
// AddReservedPortsForIP
func (idx *NetworkIndex) AddReservedPortsForIP(ports string, ip string) (collide bool) {
// Convert the ports into a slice of ints
resPorts, err := ParsePortRanges(ports)
if err != nil {
return
}
used := idx.getUsedPortsFor(ip)
for _, port := range resPorts {
// Guard against invalid port
if port < 0 || port >= maxValidPort {
return true
}
if used.Check(uint(port)) {
collide = true
} else {
used.Set(uint(port))
}
}
return
}
// yieldIP is used to iteratively invoke the callback with
// an available IP
func (idx *NetworkIndex) yieldIP(cb func(net *NetworkResource, ip net.IP) bool) {
@ -251,6 +310,95 @@ func (idx *NetworkIndex) yieldIP(cb func(net *NetworkResource, ip net.IP) bool)
}
}
func (idx *NetworkIndex) AssignPorts(ask *NetworkResource) (AllocatedPorts, error) {
var offer AllocatedPorts
// index of host network name to slice of reserved ports, used during dynamic port assignment
reservedIdx := map[string][]Port{}
for _, port := range ask.ReservedPorts {
reservedIdx[port.HostNetwork] = append(reservedIdx[port.HostNetwork], port)
// allocPort is set in the inner for loop if a port mapping can be created
// if allocPort is still nil after the loop, the port wasn't available for reservation
var allocPort *AllocatedPortMapping
var addrErr error
for _, addr := range idx.AvailAddresses[port.HostNetwork] {
used := idx.getUsedPortsFor(addr.Address)
// Guard against invalid port
if port.Value < 0 || port.Value >= maxValidPort {
return nil, fmt.Errorf("invalid port %d (out of range)", port.Value)
}
// Check if in use
if used != nil && used.Check(uint(port.Value)) {
addrErr = fmt.Errorf("reserved port collision")
continue
}
allocPort = &AllocatedPortMapping{
Label: port.Label,
Value: port.Value,
To: port.To,
HostIP: addr.Address,
}
break
}
if allocPort == nil {
if addrErr != nil {
return nil, addrErr
}
return nil, fmt.Errorf("no addresses available for %q network", port.HostNetwork)
}
offer = append(offer, *allocPort)
}
for _, port := range ask.DynamicPorts {
var allocPort *AllocatedPortMapping
var addrErr error
for _, addr := range idx.AvailAddresses[port.HostNetwork] {
used := idx.getUsedPortsFor(addr.Address)
// Try to stochastically pick the dynamic ports as it is faster and
// lower memory usage.
var dynPorts []int
// TODO: its more efficient to find multiple dynamic ports at once
dynPorts, addrErr = getDynamicPortsStochastic(used, reservedIdx[port.HostNetwork], 1)
if addrErr != nil {
// Fall back to the precise method if the random sampling failed.
dynPorts, addrErr = getDynamicPortsPrecise(used, reservedIdx[port.HostNetwork], 1)
if addrErr != nil {
continue
}
}
allocPort = &AllocatedPortMapping{
Label: port.Label,
Value: dynPorts[0],
To: port.To,
HostIP: addr.Address,
}
if allocPort.To == -1 {
allocPort.To = allocPort.Value
}
break
}
if allocPort == nil {
if addrErr != nil {
return nil, addrErr
}
return nil, fmt.Errorf("no addresses availale for %q network", port.HostNetwork)
}
offer = append(offer, *allocPort)
}
return offer, nil
}
// AssignNetwork is used to assign network resources given an ask.
// If the ask cannot be satisfied, returns nil
func (idx *NetworkIndex) AssignNetwork(ask *NetworkResource) (out *NetworkResource, err error) {
@ -299,13 +447,13 @@ func (idx *NetworkIndex) AssignNetwork(ask *NetworkResource) (out *NetworkResour
// lower memory usage.
var dynPorts []int
var dynErr error
dynPorts, dynErr = getDynamicPortsStochastic(used, ask)
dynPorts, dynErr = getDynamicPortsStochastic(used, ask.ReservedPorts, len(ask.DynamicPorts))
if dynErr == nil {
goto BUILD_OFFER
}
// Fall back to the precise method if the random sampling failed.
dynPorts, dynErr = getDynamicPortsPrecise(used, ask)
dynPorts, dynErr = getDynamicPortsPrecise(used, ask.ReservedPorts, len(ask.DynamicPorts))
if dynErr != nil {
err = dynErr
return
@ -334,7 +482,7 @@ func (idx *NetworkIndex) AssignNetwork(ask *NetworkResource) (out *NetworkResour
// no ports have been allocated yet, the network ask and returns a set of unused
// ports to fulfil the ask's DynamicPorts or an error if it failed. An error
// means the ask can not be satisfied as the method does a precise search.
func getDynamicPortsPrecise(nodeUsed Bitmap, ask *NetworkResource) ([]int, error) {
func getDynamicPortsPrecise(nodeUsed Bitmap, reserved []Port, numDyn int) ([]int, error) {
// Create a copy of the used ports and apply the new reserves
var usedSet Bitmap
var err error
@ -350,7 +498,7 @@ func getDynamicPortsPrecise(nodeUsed Bitmap, ask *NetworkResource) ([]int, error
}
}
for _, port := range ask.ReservedPorts {
for _, port := range reserved {
usedSet.Set(uint(port.Value))
}
@ -358,7 +506,6 @@ func getDynamicPortsPrecise(nodeUsed Bitmap, ask *NetworkResource) ([]int, error
availablePorts := usedSet.IndexesInRange(false, MinDynamicPort, MaxDynamicPort)
// Randomize the amount we need
numDyn := len(ask.DynamicPorts)
if len(availablePorts) < numDyn {
return nil, fmt.Errorf("dynamic port selection failed")
}
@ -377,13 +524,13 @@ func getDynamicPortsPrecise(nodeUsed Bitmap, ask *NetworkResource) ([]int, error
// ports to fulfil the ask's DynamicPorts or an error if it failed. An error
// does not mean the ask can not be satisfied as the method has a fixed amount
// of random probes and if these fail, the search is aborted.
func getDynamicPortsStochastic(nodeUsed Bitmap, ask *NetworkResource) ([]int, error) {
func getDynamicPortsStochastic(nodeUsed Bitmap, reservedPorts []Port, count int) ([]int, error) {
var reserved, dynamic []int
for _, port := range ask.ReservedPorts {
for _, port := range reservedPorts {
reserved = append(reserved, port.Value)
}
for i := 0; i < len(ask.DynamicPorts); i++ {
for i := 0; i < count; i++ {
attempts := 0
PICK:
attempts++
@ -416,3 +563,23 @@ func isPortReserved(haystack []int, needle int) bool {
}
return false
}
// COMPAT(1.0) remove when NetworkResource is no longer used for materialized client view of ports
func AllocatedPortsToNetworkResouce(ask *NetworkResource, ports AllocatedPorts) *NetworkResource {
out := ask.Copy()
for i, port := range ask.DynamicPorts {
if p, ok := ports.Get(port.Label); ok {
out.DynamicPorts[i].Value = p.Value
out.DynamicPorts[i].To = p.To
}
}
return out
}
type ClientHostNetworkConfig struct {
Name string `hcl:",key"`
CIDR string `hcl:"cidr"`
Interface string `hcl:"interface"`
ReservedPorts string `hcl:"reserved_ports"`
}

View File

@ -10,6 +10,7 @@ import (
)
func TestNetworkIndex_Overcommitted(t *testing.T) {
t.Skip()
idx := NewNetworkIndex()
// Consume some network
@ -17,7 +18,7 @@ func TestNetworkIndex_Overcommitted(t *testing.T) {
Device: "eth0",
IP: "192.168.0.100",
MBits: 505,
ReservedPorts: []Port{{"one", 8000, 0}, {"two", 9000, 0}},
ReservedPorts: []Port{{"one", 8000, 0, ""}, {"two", 9000, 0, ""}},
}
collide := idx.AddReserved(reserved)
if collide {
@ -98,7 +99,7 @@ func TestNetworkIndex_AddAllocs(t *testing.T) {
Device: "eth0",
IP: "192.168.0.100",
MBits: 20,
ReservedPorts: []Port{{"one", 8000, 0}, {"two", 9000, 0}},
ReservedPorts: []Port{{"one", 8000, 0, ""}, {"two", 9000, 0, ""}},
},
},
},
@ -114,7 +115,7 @@ func TestNetworkIndex_AddAllocs(t *testing.T) {
Device: "eth0",
IP: "192.168.0.100",
MBits: 50,
ReservedPorts: []Port{{"one", 10000, 0}},
ReservedPorts: []Port{{"one", 10000, 0, ""}},
},
},
},
@ -148,7 +149,7 @@ func TestNetworkIndex_AddReserved(t *testing.T) {
Device: "eth0",
IP: "192.168.0.100",
MBits: 20,
ReservedPorts: []Port{{"one", 8000, 0}, {"two", 9000, 0}},
ReservedPorts: []Port{{"one", 8000, 0, ""}, {"two", 9000, 0, ""}},
}
collide := idx.AddReserved(reserved)
if collide {
@ -226,7 +227,7 @@ func TestNetworkIndex_AssignNetwork(t *testing.T) {
Device: "eth0",
IP: "192.168.0.100",
MBits: 20,
ReservedPorts: []Port{{"one", 8000, 0}, {"two", 9000, 0}},
ReservedPorts: []Port{{"one", 8000, 0, ""}, {"two", 9000, 0, ""}},
},
},
},
@ -240,7 +241,7 @@ func TestNetworkIndex_AssignNetwork(t *testing.T) {
Device: "eth0",
IP: "192.168.0.100",
MBits: 50,
ReservedPorts: []Port{{"main", 10000, 0}},
ReservedPorts: []Port{{"main", 10000, 0, ""}},
},
},
},
@ -251,19 +252,19 @@ func TestNetworkIndex_AssignNetwork(t *testing.T) {
// Ask for a reserved port
ask := &NetworkResource{
ReservedPorts: []Port{{"main", 8000, 0}},
ReservedPorts: []Port{{"main", 8000, 0, ""}},
}
offer, err := idx.AssignNetwork(ask)
require.NoError(t, err)
require.NotNil(t, offer)
require.Equal(t, "192.168.0.101", offer.IP)
rp := Port{"main", 8000, 0}
rp := Port{"main", 8000, 0, ""}
require.Len(t, offer.ReservedPorts, 1)
require.Exactly(t, rp, offer.ReservedPorts[0])
// Ask for dynamic ports
ask = &NetworkResource{
DynamicPorts: []Port{{"http", 0, 80}, {"https", 0, 443}, {"admin", 0, -1}},
DynamicPorts: []Port{{"http", 0, 80, ""}, {"https", 0, 443, ""}, {"admin", 0, -1, ""}},
}
offer, err = idx.AssignNetwork(ask)
require.NoError(t, err)
@ -281,15 +282,15 @@ func TestNetworkIndex_AssignNetwork(t *testing.T) {
// Ask for reserved + dynamic ports
ask = &NetworkResource{
ReservedPorts: []Port{{"main", 2345, 0}},
DynamicPorts: []Port{{"http", 0, 80}, {"https", 0, 443}, {"admin", 0, 8080}},
ReservedPorts: []Port{{"main", 2345, 0, ""}},
DynamicPorts: []Port{{"http", 0, 80, ""}, {"https", 0, 443, ""}, {"admin", 0, 8080, ""}},
}
offer, err = idx.AssignNetwork(ask)
require.NoError(t, err)
require.NotNil(t, offer)
require.Equal(t, "192.168.0.100", offer.IP)
rp = Port{"main", 2345, 0}
rp = Port{"main", 2345, 0, ""}
require.Len(t, offer.ReservedPorts, 1)
require.Exactly(t, rp, offer.ReservedPorts[0])
@ -330,7 +331,7 @@ func TestNetworkIndex_AssignNetwork_Dynamic_Contention(t *testing.T) {
// Ask for dynamic ports
ask := &NetworkResource{
DynamicPorts: []Port{{"http", 0, 80}},
DynamicPorts: []Port{{"http", 0, 80, ""}},
}
offer, err := idx.AssignNetwork(ask)
if err != nil {
@ -350,49 +351,6 @@ func TestNetworkIndex_AssignNetwork_Dynamic_Contention(t *testing.T) {
}
}
// COMPAT(0.11): Remove in 0.11
func TestNetworkIndex_Overcommitted_Old(t *testing.T) {
idx := NewNetworkIndex()
// Consume some network
reserved := &NetworkResource{
Device: "eth0",
IP: "192.168.0.100",
MBits: 505,
ReservedPorts: []Port{{"one", 8000, 0}, {"two", 9000, 0}},
}
collide := idx.AddReserved(reserved)
if collide {
t.Fatalf("bad")
}
if !idx.Overcommitted() {
t.Fatalf("have no resources")
}
// Add resources
n := &Node{
Resources: &Resources{
Networks: []*NetworkResource{
{
Device: "eth0",
CIDR: "192.168.0.100/32",
MBits: 1000,
},
},
},
}
idx.SetNode(n)
if idx.Overcommitted() {
t.Fatalf("have resources")
}
// Double up our usage
idx.AddReserved(reserved)
if !idx.Overcommitted() {
t.Fatalf("should be overcommitted")
}
}
// COMPAT(0.11): Remove in 0.11
func TestNetworkIndex_SetNode_Old(t *testing.T) {
idx := NewNetworkIndex()
@ -411,7 +369,7 @@ func TestNetworkIndex_SetNode_Old(t *testing.T) {
{
Device: "eth0",
IP: "192.168.0.100",
ReservedPorts: []Port{{"ssh", 22, 0}},
ReservedPorts: []Port{{"ssh", 22, 0, ""}},
MBits: 1,
},
},
@ -448,7 +406,7 @@ func TestNetworkIndex_AddAllocs_Old(t *testing.T) {
Device: "eth0",
IP: "192.168.0.100",
MBits: 20,
ReservedPorts: []Port{{"one", 8000, 0}, {"two", 9000, 0}},
ReservedPorts: []Port{{"one", 8000, 0, ""}, {"two", 9000, 0, ""}},
},
},
},
@ -462,7 +420,7 @@ func TestNetworkIndex_AddAllocs_Old(t *testing.T) {
Device: "eth0",
IP: "192.168.0.100",
MBits: 50,
ReservedPorts: []Port{{"one", 10000, 0}},
ReservedPorts: []Port{{"one", 10000, 0, ""}},
},
},
},
@ -506,7 +464,7 @@ func TestNetworkIndex_yieldIP_Old(t *testing.T) {
{
Device: "eth0",
IP: "192.168.0.100",
ReservedPorts: []Port{{"ssh", 22, 0}},
ReservedPorts: []Port{{"ssh", 22, 0, ""}},
MBits: 1,
},
},
@ -545,7 +503,7 @@ func TestNetworkIndex_AssignNetwork_Old(t *testing.T) {
{
Device: "eth0",
IP: "192.168.0.100",
ReservedPorts: []Port{{"ssh", 22, 0}},
ReservedPorts: []Port{{"ssh", 22, 0, ""}},
MBits: 1,
},
},
@ -562,7 +520,7 @@ func TestNetworkIndex_AssignNetwork_Old(t *testing.T) {
Device: "eth0",
IP: "192.168.0.100",
MBits: 20,
ReservedPorts: []Port{{"one", 8000, 0}, {"two", 9000, 0}},
ReservedPorts: []Port{{"one", 8000, 0, ""}, {"two", 9000, 0, ""}},
},
},
},
@ -576,7 +534,7 @@ func TestNetworkIndex_AssignNetwork_Old(t *testing.T) {
Device: "eth0",
IP: "192.168.0.100",
MBits: 50,
ReservedPorts: []Port{{"main", 10000, 0}},
ReservedPorts: []Port{{"main", 10000, 0, ""}},
},
},
},
@ -587,7 +545,7 @@ func TestNetworkIndex_AssignNetwork_Old(t *testing.T) {
// Ask for a reserved port
ask := &NetworkResource{
ReservedPorts: []Port{{"main", 8000, 0}},
ReservedPorts: []Port{{"main", 8000, 0, ""}},
}
offer, err := idx.AssignNetwork(ask)
if err != nil {
@ -599,14 +557,14 @@ func TestNetworkIndex_AssignNetwork_Old(t *testing.T) {
if offer.IP != "192.168.0.101" {
t.Fatalf("bad: %#v", offer)
}
rp := Port{"main", 8000, 0}
rp := Port{"main", 8000, 0, ""}
if len(offer.ReservedPorts) != 1 || offer.ReservedPorts[0] != rp {
t.Fatalf("bad: %#v", offer)
}
// Ask for dynamic ports
ask = &NetworkResource{
DynamicPorts: []Port{{"http", 0, 80}, {"https", 0, 443}, {"admin", 0, 8080}},
DynamicPorts: []Port{{"http", 0, 80, ""}, {"https", 0, 443, ""}, {"admin", 0, 8080, ""}},
}
offer, err = idx.AssignNetwork(ask)
if err != nil {
@ -629,8 +587,8 @@ func TestNetworkIndex_AssignNetwork_Old(t *testing.T) {
// Ask for reserved + dynamic ports
ask = &NetworkResource{
ReservedPorts: []Port{{"main", 2345, 0}},
DynamicPorts: []Port{{"http", 0, 80}, {"https", 0, 443}, {"admin", 0, 8080}},
ReservedPorts: []Port{{"main", 2345, 0, ""}},
DynamicPorts: []Port{{"http", 0, 80, ""}, {"https", 0, 443, ""}, {"admin", 0, 8080, ""}},
}
offer, err = idx.AssignNetwork(ask)
if err != nil {
@ -643,7 +601,7 @@ func TestNetworkIndex_AssignNetwork_Old(t *testing.T) {
t.Fatalf("bad: %#v", offer)
}
rp = Port{"main", 2345, 0}
rp = Port{"main", 2345, 0, ""}
if len(offer.ReservedPorts) != 1 || offer.ReservedPorts[0] != rp {
t.Fatalf("bad: %#v", offer)
}
@ -696,7 +654,7 @@ func TestNetworkIndex_AssignNetwork_Dynamic_Contention_Old(t *testing.T) {
// Ask for dynamic ports
ask := &NetworkResource{
DynamicPorts: []Port{{"http", 0, 80}},
DynamicPorts: []Port{{"http", 0, 80, ""}},
}
offer, err := idx.AssignNetwork(ask)
if err != nil {

View File

@ -1811,6 +1811,30 @@ func (n *Node) Canonicalize() {
n.SchedulingEligibility = NodeSchedulingEligible
}
}
// COMPAT remove in 1.0
// In v0.12.0 we introduced a separate node specific network resource struct
// so we need to covert any pre 0.12 clients to the correct struct
if n.NodeResources != nil && n.NodeResources.NodeNetworks == nil {
if n.NodeResources.Networks != nil {
for _, nr := range n.NodeResources.Networks {
nnr := &NodeNetworkResource{
Mode: nr.Mode,
Speed: nr.MBits,
Device: nr.Device,
}
if nr.IP != "" {
nnr.Addresses = []NodeNetworkAddress{
{
Alias: "default",
Address: nr.IP,
},
}
}
n.NodeResources.NodeNetworks = append(n.NodeResources.NodeNetworks, nnr)
}
}
}
}
func (n *Node) Copy() *Node {
@ -2244,10 +2268,70 @@ func (r *Resources) GoString() string {
return fmt.Sprintf("*%#v", *r)
}
// NodeNetworkResource is used to describe a fingerprinted network of a node
type NodeNetworkResource struct {
Mode string // host for physical networks, cni/<name> for cni networks
// The following apply only to host networks
Device string // interface name
MacAddress string
Speed int
Addresses []NodeNetworkAddress // not valid for cni, for bridge there will only be 1 ip
}
func (n *NodeNetworkResource) Equals(o *NodeNetworkResource) bool {
return reflect.DeepEqual(n, o)
}
func (n *NodeNetworkResource) HasAlias(alias string) bool {
for _, addr := range n.Addresses {
if addr.Alias == alias {
return true
}
}
return false
}
type NodeNetworkAF string
const (
NodeNetworkAF_IPv4 NodeNetworkAF = "ipv4"
NodeNetworkAF_IPv6 NodeNetworkAF = "ipv6"
)
type NodeNetworkAddress struct {
Family NodeNetworkAF
Alias string
Address string
ReservedPorts string
Gateway string // default route for this address
}
type AllocatedPortMapping struct {
Label string
Value int
To int
HostIP string
}
type AllocatedPorts []AllocatedPortMapping
func (p AllocatedPorts) Get(label string) (AllocatedPortMapping, bool) {
for _, port := range p {
if port.Label == label {
return port, true
}
}
return AllocatedPortMapping{}, false
}
type Port struct {
Label string
Value int
To int
HostNetwork string
}
type DNSConfig struct {
@ -2297,6 +2381,17 @@ func (n *NetworkResource) Canonicalize() {
if len(n.DynamicPorts) == 0 {
n.DynamicPorts = nil
}
for i, p := range n.DynamicPorts {
if p.HostNetwork == "" {
n.DynamicPorts[i].HostNetwork = "default"
}
}
for i, p := range n.ReservedPorts {
if p.HostNetwork == "" {
n.ReservedPorts[i].HostNetwork = "default"
}
}
}
// MeetsMinResources returns an error if the resources specified are less than
@ -2527,6 +2622,7 @@ type NodeResources struct {
Memory NodeMemoryResources
Disk NodeDiskResources
Networks Networks
NodeNetworks []*NodeNetworkResource
Devices []*NodeDeviceResource
}
@ -2593,6 +2689,25 @@ func (n *NodeResources) Merge(o *NodeResources) {
if len(o.Devices) != 0 {
n.Devices = o.Devices
}
if len(o.NodeNetworks) != 0 {
lookupNetwork := func(nets []*NodeNetworkResource, name string) (int, *NodeNetworkResource) {
for i, nw := range nets {
if nw.Device == name {
return i, nw
}
}
return 0, nil
}
for _, nw := range o.NodeNetworks {
if i, nnw := lookupNetwork(n.NodeNetworks, nw.Device); nnw != nil {
n.NodeNetworks[i] = nw
} else {
n.NodeNetworks = append(n.NodeNetworks, nw)
}
}
}
}
func (n *NodeResources) Equals(o *NodeResources) bool {
@ -2622,6 +2737,10 @@ func (n *NodeResources) Equals(o *NodeResources) bool {
return false
}
if !NodeNetworksEquals(n.NodeNetworks, o.NodeNetworks) {
return false
}
return true
}
@ -2666,6 +2785,25 @@ func DevicesEquals(d1, d2 []*NodeDeviceResource) bool {
return true
}
func NodeNetworksEquals(n1, n2 []*NodeNetworkResource) bool {
if len(n1) != len(n2) {
return false
}
netMap := make(map[string]*NodeNetworkResource, len(n1))
for _, n := range n1 {
netMap[n.Device] = n
}
for _, otherN := range n2 {
if n, ok := netMap[otherN.Device]; !ok || !n.Equals(otherN) {
return false
}
}
return true
}
// NodeCpuResources captures the CPU resources of the node.
type NodeCpuResources struct {
// CpuShares is the CPU shares available. This is calculated by number of
@ -3275,6 +3413,7 @@ func (a *AllocatedTaskResources) Subtract(delta *AllocatedTaskResources) {
type AllocatedSharedResources struct {
Networks Networks
DiskMB int64
Ports AllocatedPorts
}
func (a AllocatedSharedResources) Copy() AllocatedSharedResources {
@ -5656,7 +5795,6 @@ func (tg *TaskGroup) validateNetworks() error {
var mErr multierror.Error
portLabels := make(map[string]string)
staticPorts := make(map[int]string)
mappedPorts := make(map[int]string)
for _, net := range tg.Networks {
for _, port := range append(net.ReservedPorts, net.DynamicPorts...) {
@ -5676,20 +5814,13 @@ func (tg *TaskGroup) validateNetworks() error {
}
}
if port.To > 0 {
if other, ok := mappedPorts[port.To]; ok {
err := fmt.Errorf("Port mapped to %d already in use by %s", port.To, other)
mErr.Errors = append(mErr.Errors, err)
} else {
mappedPorts[port.To] = fmt.Sprintf("taskgroup network:%s", port.Label)
}
} else if port.To < -1 {
if port.To < -1 {
err := fmt.Errorf("Port %q cannot be mapped to negative value %d", port.Label, port.To)
mErr.Errors = append(mErr.Errors, err)
}
}
}
// Check for duplicate tasks or port labels, and no duplicated static or mapped ports
// Check for duplicate tasks or port labels, and no duplicated static ports
for _, task := range tg.Tasks {
if task.Resources == nil {
continue
@ -5709,15 +5840,6 @@ func (tg *TaskGroup) validateNetworks() error {
staticPorts[port.Value] = fmt.Sprintf("%s:%s", task.Name, port.Label)
}
}
if port.To != 0 {
if other, ok := mappedPorts[port.To]; ok {
err := fmt.Errorf("Port mapped to %d already in use by %s", port.To, other)
mErr.Errors = append(mErr.Errors, err)
} else {
mappedPorts[port.To] = fmt.Sprintf("taskgroup network:%s", port.Label)
}
}
}
}
}

View File

@ -1019,7 +1019,7 @@ func TestTaskGroup_Validate(t *testing.T) {
tg = &TaskGroup{
Networks: []*NetworkResource{
{
DynamicPorts: []Port{{"http", 0, 80}},
DynamicPorts: []Port{{"http", 0, 80, ""}},
},
},
Tasks: []*Task{
@ -1027,7 +1027,7 @@ func TestTaskGroup_Validate(t *testing.T) {
Resources: &Resources{
Networks: []*NetworkResource{
{
DynamicPorts: []Port{{"http", 0, 80}},
DynamicPorts: []Port{{"http", 0, 80, ""}},
},
},
},
@ -1036,7 +1036,6 @@ func TestTaskGroup_Validate(t *testing.T) {
}
err = tg.Validate(j)
require.Contains(t, err.Error(), "Port label http already in use")
require.Contains(t, err.Error(), "Port mapped to 80 already in use")
tg = &TaskGroup{
Volumes: map[string]*VolumeRequest{
@ -2325,7 +2324,7 @@ func TestResource_Add(t *testing.T) {
{
CIDR: "10.0.0.0/8",
MBits: 100,
ReservedPorts: []Port{{"ssh", 22, 0}},
ReservedPorts: []Port{{"ssh", 22, 0, ""}},
},
},
}
@ -2337,7 +2336,7 @@ func TestResource_Add(t *testing.T) {
{
IP: "10.0.0.1",
MBits: 50,
ReservedPorts: []Port{{"web", 80, 0}},
ReservedPorts: []Port{{"web", 80, 0, ""}},
},
},
}
@ -2355,7 +2354,7 @@ func TestResource_Add(t *testing.T) {
{
CIDR: "10.0.0.0/8",
MBits: 150,
ReservedPorts: []Port{{"ssh", 22, 0}, {"web", 80, 0}},
ReservedPorts: []Port{{"ssh", 22, 0, ""}, {"web", 80, 0, ""}},
},
},
}
@ -2371,7 +2370,7 @@ func TestResource_Add_Network(t *testing.T) {
Networks: []*NetworkResource{
{
MBits: 50,
DynamicPorts: []Port{{"http", 0, 80}, {"https", 0, 443}},
DynamicPorts: []Port{{"http", 0, 80, ""}, {"https", 0, 443, ""}},
},
},
}
@ -2379,7 +2378,7 @@ func TestResource_Add_Network(t *testing.T) {
Networks: []*NetworkResource{
{
MBits: 25,
DynamicPorts: []Port{{"admin", 0, 8080}},
DynamicPorts: []Port{{"admin", 0, 8080, ""}},
},
},
}
@ -2397,7 +2396,7 @@ func TestResource_Add_Network(t *testing.T) {
Networks: []*NetworkResource{
{
MBits: 75,
DynamicPorts: []Port{{"http", 0, 80}, {"https", 0, 443}, {"admin", 0, 8080}},
DynamicPorts: []Port{{"http", 0, 80, ""}, {"https", 0, 443, ""}, {"admin", 0, 8080, ""}},
},
},
}
@ -2420,7 +2419,7 @@ func TestComparableResources_Subtract(t *testing.T) {
{
CIDR: "10.0.0.0/8",
MBits: 100,
ReservedPorts: []Port{{"ssh", 22, 0}},
ReservedPorts: []Port{{"ssh", 22, 0, ""}},
},
},
},
@ -2441,7 +2440,7 @@ func TestComparableResources_Subtract(t *testing.T) {
{
CIDR: "10.0.0.0/8",
MBits: 20,
ReservedPorts: []Port{{"ssh", 22, 0}},
ReservedPorts: []Port{{"ssh", 22, 0, ""}},
},
},
},
@ -2463,7 +2462,7 @@ func TestComparableResources_Subtract(t *testing.T) {
{
CIDR: "10.0.0.0/8",
MBits: 100,
ReservedPorts: []Port{{"ssh", 22, 0}},
ReservedPorts: []Port{{"ssh", 22, 0, ""}},
},
},
},
@ -4807,12 +4806,12 @@ func TestNetworkResourcesEquals(t *testing.T) {
{
IP: "10.0.0.1",
MBits: 50,
ReservedPorts: []Port{{"web", 80, 0}},
ReservedPorts: []Port{{"web", 80, 0, ""}},
},
{
IP: "10.0.0.1",
MBits: 50,
ReservedPorts: []Port{{"web", 80, 0}},
ReservedPorts: []Port{{"web", 80, 0, ""}},
},
},
true,
@ -4823,12 +4822,12 @@ func TestNetworkResourcesEquals(t *testing.T) {
{
IP: "10.0.0.0",
MBits: 50,
ReservedPorts: []Port{{"web", 80, 0}},
ReservedPorts: []Port{{"web", 80, 0, ""}},
},
{
IP: "10.0.0.1",
MBits: 50,
ReservedPorts: []Port{{"web", 80, 0}},
ReservedPorts: []Port{{"web", 80, 0, ""}},
},
},
false,
@ -4839,12 +4838,12 @@ func TestNetworkResourcesEquals(t *testing.T) {
{
IP: "10.0.0.1",
MBits: 40,
ReservedPorts: []Port{{"web", 80, 0}},
ReservedPorts: []Port{{"web", 80, 0, ""}},
},
{
IP: "10.0.0.1",
MBits: 50,
ReservedPorts: []Port{{"web", 80, 0}},
ReservedPorts: []Port{{"web", 80, 0, ""}},
},
},
false,
@ -4855,12 +4854,12 @@ func TestNetworkResourcesEquals(t *testing.T) {
{
IP: "10.0.0.1",
MBits: 50,
ReservedPorts: []Port{{"web", 80, 0}},
ReservedPorts: []Port{{"web", 80, 0, ""}},
},
{
IP: "10.0.0.1",
MBits: 50,
ReservedPorts: []Port{{"web", 80, 0}, {"web", 80, 0}},
ReservedPorts: []Port{{"web", 80, 0, ""}, {"web", 80, 0, ""}},
},
},
false,
@ -4871,7 +4870,7 @@ func TestNetworkResourcesEquals(t *testing.T) {
{
IP: "10.0.0.1",
MBits: 50,
ReservedPorts: []Port{{"web", 80, 0}},
ReservedPorts: []Port{{"web", 80, 0, ""}},
},
{
IP: "10.0.0.1",
@ -4887,12 +4886,12 @@ func TestNetworkResourcesEquals(t *testing.T) {
{
IP: "10.0.0.1",
MBits: 50,
ReservedPorts: []Port{{"web", 80, 0}},
ReservedPorts: []Port{{"web", 80, 0, ""}},
},
{
IP: "10.0.0.1",
MBits: 50,
ReservedPorts: []Port{{"notweb", 80, 0}},
ReservedPorts: []Port{{"notweb", 80, 0, ""}},
},
},
false,
@ -4903,12 +4902,12 @@ func TestNetworkResourcesEquals(t *testing.T) {
{
IP: "10.0.0.1",
MBits: 50,
DynamicPorts: []Port{{"web", 80, 0}},
DynamicPorts: []Port{{"web", 80, 0, ""}},
},
{
IP: "10.0.0.1",
MBits: 50,
DynamicPorts: []Port{{"web", 80, 0}, {"web", 80, 0}},
DynamicPorts: []Port{{"web", 80, 0, ""}, {"web", 80, 0, ""}},
},
},
false,
@ -4919,7 +4918,7 @@ func TestNetworkResourcesEquals(t *testing.T) {
{
IP: "10.0.0.1",
MBits: 50,
DynamicPorts: []Port{{"web", 80, 0}},
DynamicPorts: []Port{{"web", 80, 0, ""}},
},
{
IP: "10.0.0.1",
@ -4935,12 +4934,12 @@ func TestNetworkResourcesEquals(t *testing.T) {
{
IP: "10.0.0.1",
MBits: 50,
DynamicPorts: []Port{{"web", 80, 0}},
DynamicPorts: []Port{{"web", 80, 0, ""}},
},
{
IP: "10.0.0.1",
MBits: 50,
DynamicPorts: []Port{{"notweb", 80, 0}},
DynamicPorts: []Port{{"notweb", 80, 0, ""}},
},
},
false,

View File

@ -319,23 +319,60 @@ func (c *CSIVolumeChecker) hasPlugins(n *structs.Node) (bool, string) {
type NetworkChecker struct {
ctx Context
networkMode string
ports []structs.Port
}
func NewNetworkChecker(ctx Context) *NetworkChecker {
return &NetworkChecker{ctx: ctx, networkMode: "host"}
}
func (c *NetworkChecker) SetNetworkMode(netMode string) {
c.networkMode = netMode
func (c *NetworkChecker) SetNetwork(network *structs.NetworkResource) {
c.networkMode = network.Mode
if c.networkMode == "" {
c.networkMode = "host"
}
c.ports = make([]structs.Port, len(network.DynamicPorts)+len(network.ReservedPorts))
for _, port := range network.DynamicPorts {
c.ports = append(c.ports, port)
}
for _, port := range network.ReservedPorts {
c.ports = append(c.ports, port)
}
}
func (c *NetworkChecker) Feasible(option *structs.Node) bool {
if c.hasNetwork(option) {
return true
}
if !c.hasNetwork(option) {
c.ctx.Metrics().FilterNode(option, "missing network")
return false
}
if c.ports != nil {
if !c.hasHostNetworks(option) {
return false
}
}
return true
}
func (c *NetworkChecker) hasHostNetworks(option *structs.Node) bool {
for _, port := range c.ports {
if port.HostNetwork != "" {
found := false
for _, net := range option.NodeResources.NodeNetworks {
if net.HasAlias(port.HostNetwork) {
found = true
break
}
}
if !found {
c.ctx.Metrics().FilterNode(option, fmt.Sprintf("missing host network %q for port %q", port.HostNetwork, port.Label))
return false
}
}
}
return true
}
func (c *NetworkChecker) hasNetwork(option *structs.Node) bool {

View File

@ -410,31 +410,31 @@ func TestNetworkChecker(t *testing.T) {
checker := NewNetworkChecker(ctx)
cases := []struct {
mode string
network *structs.NetworkResource
results []bool
}{
{
mode: "host",
network: &structs.NetworkResource{Mode: "host"},
results: []bool{true, true, true},
},
{
mode: "bridge",
network: &structs.NetworkResource{Mode: "bridge"},
results: []bool{true, true, false},
},
{
mode: "cni/mynet",
network: &structs.NetworkResource{Mode: "cni/mynet"},
results: []bool{false, false, true},
},
{
mode: "cni/nonexistent",
network: &structs.NetworkResource{Mode: "cni/nonexistent"},
results: []bool{false, false, false},
},
}
for _, c := range cases {
checker.SetNetworkMode(c.mode)
checker.SetNetwork(c.network)
for i, node := range nodes {
require.Equal(t, c.results[i], checker.Feasible(node), "mode=%q, idx=%d", c.mode, i)
require.Equal(t, c.results[i], checker.Feasible(node), "mode=%q, idx=%d", c.network.Mode, i)
}
}
}

View File

@ -500,6 +500,7 @@ func (s *GenericScheduler) computePlacements(destructive, place []placementResul
}
if option.AllocResources != nil {
resources.Shared.Networks = option.AllocResources.Networks
resources.Shared.Ports = option.AllocResources.Ports
}
// Create an allocation for this

View File

@ -242,8 +242,8 @@ OUTER:
// Check if we need task group network resource
if len(iter.taskGroup.Networks) > 0 {
ask := iter.taskGroup.Networks[0].Copy()
offer, err := netIdx.AssignNetwork(ask)
if offer == nil {
offer, err := netIdx.AssignPorts(ask)
if err != nil {
// If eviction is not enabled, mark this node as exhausted and continue
if !iter.evict {
iter.ctx.Metrics().ExhaustedNode(option.Node,
@ -272,8 +272,8 @@ OUTER:
netIdx.SetNode(option.Node)
netIdx.AddAllocs(proposed)
offer, err = netIdx.AssignNetwork(ask)
if offer == nil {
offer, err = netIdx.AssignPorts(ask)
if err != nil {
iter.ctx.Logger().Named("binpack").Debug("unexpected error, unable to create network offer after considering preemption", "error", err)
netIdx.Release()
continue OUTER
@ -281,13 +281,15 @@ OUTER:
}
// Reserve this to prevent another task from colliding
netIdx.AddReserved(offer)
netIdx.AddReservedPorts(offer)
// Update the network ask to the offer
total.Shared.Networks = []*structs.NetworkResource{offer}
nwRes := structs.AllocatedPortsToNetworkResouce(ask, offer)
total.Shared.Networks = []*structs.NetworkResource{nwRes}
option.AllocResources = &structs.AllocatedSharedResources{
Networks: []*structs.NetworkResource{offer},
Networks: []*structs.NetworkResource{nwRes},
DiskMB: int64(iter.taskGroup.EphemeralDisk.SizeMB),
Ports: offer,
}
}

View File

@ -373,6 +373,8 @@ func TestBinPackIterator_Network_Success(t *testing.T) {
// Tests that bin packing iterator fails due to overprovisioning of network
// This test has network resources at task group and task level
func TestBinPackIterator_Network_Failure(t *testing.T) {
// Bandwidth tracking is deprecated
t.Skip()
_, ctx := testContext(t)
nodes := []*RankedNode{
{

View File

@ -137,7 +137,7 @@ func (s *GenericStack) Select(tg *structs.TaskGroup, options *SelectOptions) *Ra
s.taskGroupHostVolumes.SetVolumes(tg.Volumes)
s.taskGroupCSIVolumes.SetVolumes(tg.Volumes)
if len(tg.Networks) > 0 {
s.taskGroupNetwork.SetNetworkMode(tg.Networks[0].Mode)
s.taskGroupNetwork.SetNetwork(tg.Networks[0])
}
s.distinctHostsConstraint.SetTaskGroup(tg)
s.distinctPropertyConstraint.SetTaskGroup(tg)

View File

@ -87,6 +87,7 @@ type Port struct {
Label string
Value int `mapstructure:"static"`
To int `mapstructure:"to"`
HostNetwork string `mapstructure:"host_network"`
}
type DNSConfig struct {

1
vendor/modules.txt vendored
View File

@ -179,6 +179,7 @@ github.com/coreos/pkg/dlopen
## explicit
github.com/cyphar/filepath-securejoin
# github.com/davecgh/go-spew v1.1.1
## explicit
github.com/davecgh/go-spew/spew
# github.com/denverdino/aliyungo v0.0.0-20190125010748-a747050bb1ba
github.com/denverdino/aliyungo/common