networking: Add new bridge networking mode implementation
This commit is contained in:
parent
4501fe3c4d
commit
af66a35924
|
@ -44,7 +44,7 @@ func (h *networkHook) Name() string {
|
|||
|
||||
func (h *networkHook) Prerun() error {
|
||||
if h.manager == nil {
|
||||
h.logger.Debug("shared network namespaces are not supported on this platform, skipping network hook")
|
||||
h.logger.Trace("shared network namespaces are not supported on this platform, skipping network hook")
|
||||
return nil
|
||||
}
|
||||
tg := h.alloc.Job.LookupTaskGroup(h.alloc.TaskGroup)
|
||||
|
@ -62,6 +62,9 @@ func (h *networkHook) Prerun() error {
|
|||
h.setter.SetNetworkIsolation(spec)
|
||||
}
|
||||
|
||||
if err := ConfigureNetworking(h.alloc, spec); err != nil {
|
||||
return fmt.Errorf("failed to configure networking for alloc: %v", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -70,5 +73,8 @@ func (h *networkHook) Postrun() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
if err := CleanupNetworking(h.alloc, h.spec); err != nil {
|
||||
h.logger.Error("failed to cleanup network for allocation, resources may have leaked", "alloc", h.alloc.ID, "error", err)
|
||||
}
|
||||
return h.manager.DestroyNetwork(h.alloc.ID, h.spec)
|
||||
}
|
||||
|
|
|
@ -118,3 +118,38 @@ func netModeToIsolationMode(netMode string) drivers.NetIsolationMode {
|
|||
return drivers.NetIsolationModeHost
|
||||
}
|
||||
}
|
||||
|
||||
func getPortMapping(alloc *structs.Allocation) []*nsutil.PortMapping {
|
||||
ports := []*nsutil.PortMapping{}
|
||||
for _, network := range alloc.AllocatedResources.Shared.Networks {
|
||||
for _, port := range append(network.DynamicPorts, network.ReservedPorts...) {
|
||||
for _, proto := range []string{"tcp", "udp"} {
|
||||
ports = append(ports, &nsutil.PortMapping{
|
||||
Host: port.Value,
|
||||
Container: port.To,
|
||||
Proto: proto,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
return ports
|
||||
}
|
||||
|
||||
func ConfigureNetworking(alloc *structs.Allocation, spec *drivers.NetworkIsolationSpec) error {
|
||||
|
||||
// TODO: CNI support
|
||||
if err := nsutil.SetupBridgeNetworking(alloc.ID, spec.Path, getPortMapping(alloc)); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func CleanupNetworking(alloc *structs.Allocation, spec *drivers.NetworkIsolationSpec) error {
|
||||
if err := nsutil.TeardownBridgeNetworking(alloc.ID, spec.Path, getPortMapping(alloc)); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,97 @@
|
|||
package nsutil
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
|
||||
"github.com/containernetworking/cni/libcni"
|
||||
)
|
||||
|
||||
const (
|
||||
EnvCNIPath = "CNI_PATH"
|
||||
)
|
||||
|
||||
type PortMapping struct {
|
||||
Host int `json:"hostPort"`
|
||||
Container int `json:"containerPort"`
|
||||
Proto string `json:"protocol"`
|
||||
}
|
||||
|
||||
func SetupBridgeNetworking(allocID string, nsPath string, portMappings []*PortMapping) error {
|
||||
netconf, err := libcni.ConfListFromBytes([]byte(nomadCNIConfig))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
containerID := fmt.Sprintf("nomad-%s", allocID[:8])
|
||||
cninet := libcni.NewCNIConfig(filepath.SplitList(os.Getenv(EnvCNIPath)), nil)
|
||||
|
||||
rt := &libcni.RuntimeConf{
|
||||
ContainerID: containerID,
|
||||
NetNS: nsPath,
|
||||
IfName: "eth0",
|
||||
CapabilityArgs: map[string]interface{}{
|
||||
"portMappings": portMappings,
|
||||
},
|
||||
}
|
||||
|
||||
result, err := cninet.AddNetworkList(context.TODO(), netconf, rt)
|
||||
if result != nil {
|
||||
result.Print()
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func TeardownBridgeNetworking(allocID, nsPath string, portMappings []*PortMapping) error {
|
||||
netconf, err := libcni.ConfListFromBytes([]byte(nomadCNIConfig))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
containerID := fmt.Sprintf("nomad-%s", allocID[:8])
|
||||
cninet := libcni.NewCNIConfig(filepath.SplitList(os.Getenv(EnvCNIPath)), nil)
|
||||
rt := &libcni.RuntimeConf{
|
||||
ContainerID: containerID,
|
||||
NetNS: nsPath,
|
||||
IfName: "eth0",
|
||||
CapabilityArgs: map[string]interface{}{
|
||||
"portMappings": portMappings,
|
||||
},
|
||||
}
|
||||
err = cninet.DelNetworkList(context.TODO(), netconf, rt)
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
const nomadCNIConfig = `{
|
||||
"cniVersion": "0.4.0",
|
||||
"name": "nomad",
|
||||
"plugins": [
|
||||
{
|
||||
"type": "bridge",
|
||||
"bridge": "nomad",
|
||||
"isDefaultGateway": true,
|
||||
"ipMasq": true,
|
||||
"ipam": {
|
||||
"type": "host-local",
|
||||
"ranges": [
|
||||
[
|
||||
{
|
||||
"subnet": "172.26.66.0/23"
|
||||
}
|
||||
]
|
||||
]
|
||||
}
|
||||
},
|
||||
{
|
||||
"type": "firewall"
|
||||
},
|
||||
{
|
||||
"type": "portmap",
|
||||
"capabilities": {"portMappings": true}
|
||||
}
|
||||
]
|
||||
}
|
||||
`
|
|
@ -484,10 +484,8 @@ func (s *GenericScheduler) computePlacements(destructive, place []placementResul
|
|||
// Set fields based on if we found an allocation option
|
||||
if option != nil {
|
||||
resources := &structs.AllocatedResources{
|
||||
Tasks: option.TaskResources,
|
||||
Shared: structs.AllocatedSharedResources{
|
||||
DiskMB: int64(tg.EphemeralDisk.SizeMB),
|
||||
},
|
||||
Tasks: option.TaskResources,
|
||||
Shared: *option.GroupResources,
|
||||
}
|
||||
|
||||
// Create an allocation for this
|
||||
|
@ -507,7 +505,8 @@ func (s *GenericScheduler) computePlacements(destructive, place []placementResul
|
|||
DesiredStatus: structs.AllocDesiredStatusRun,
|
||||
ClientStatus: structs.AllocClientStatusPending,
|
||||
SharedResources: &structs.Resources{
|
||||
DiskMB: tg.EphemeralDisk.SizeMB,
|
||||
DiskMB: tg.EphemeralDisk.SizeMB,
|
||||
Networks: tg.Networks,
|
||||
},
|
||||
}
|
||||
|
||||
|
|
|
@ -17,10 +17,11 @@ const (
|
|||
// along with a node when iterating. This state can be modified as
|
||||
// various rank methods are applied.
|
||||
type RankedNode struct {
|
||||
Node *structs.Node
|
||||
FinalScore float64
|
||||
Scores []float64
|
||||
TaskResources map[string]*structs.AllocatedTaskResources
|
||||
Node *structs.Node
|
||||
FinalScore float64
|
||||
Scores []float64
|
||||
TaskResources map[string]*structs.AllocatedTaskResources
|
||||
GroupResources *structs.AllocatedSharedResources
|
||||
|
||||
// Allocs is used to cache the proposed allocations on the
|
||||
// node. This can be shared between iterators that require it.
|
||||
|
@ -224,6 +225,59 @@ OUTER:
|
|||
}
|
||||
preemptor.SetPreemptions(currentPreemptions)
|
||||
|
||||
// Check if we need task group network resource
|
||||
if len(iter.taskGroup.Networks) > 0 {
|
||||
ask := iter.taskGroup.Networks[0].Copy()
|
||||
offer, err := netIdx.AssignNetwork(ask)
|
||||
if offer == nil {
|
||||
// If eviction is not enabled, mark this node as exhausted and continue
|
||||
if !iter.evict {
|
||||
iter.ctx.Metrics().ExhaustedNode(option.Node,
|
||||
fmt.Sprintf("network: %s", err))
|
||||
netIdx.Release()
|
||||
continue OUTER
|
||||
}
|
||||
|
||||
// Look for preemptible allocations to satisfy the network resource for this task
|
||||
preemptor.SetCandidates(proposed)
|
||||
|
||||
netPreemptions := preemptor.PreemptForNetwork(ask, netIdx)
|
||||
if netPreemptions == nil {
|
||||
iter.ctx.Logger().Named("binpack").Error("preemption not possible ", "network_resource", ask)
|
||||
netIdx.Release()
|
||||
continue OUTER
|
||||
}
|
||||
allocsToPreempt = append(allocsToPreempt, netPreemptions...)
|
||||
|
||||
// First subtract out preempted allocations
|
||||
proposed = structs.RemoveAllocs(proposed, netPreemptions)
|
||||
|
||||
// Reset the network index and try the offer again
|
||||
netIdx.Release()
|
||||
netIdx = structs.NewNetworkIndex()
|
||||
netIdx.SetNode(option.Node)
|
||||
netIdx.AddAllocs(proposed)
|
||||
|
||||
offer, err = netIdx.AssignNetwork(ask)
|
||||
if offer == nil {
|
||||
iter.ctx.Logger().Named("binpack").Error("unexpected error, unable to create network offer after considering preemption", "error", err)
|
||||
netIdx.Release()
|
||||
continue OUTER
|
||||
}
|
||||
}
|
||||
|
||||
// Reserve this to prevent another task from colliding
|
||||
netIdx.AddReserved(offer)
|
||||
|
||||
// Update the network ask to the offer
|
||||
total.Shared.Networks = []*structs.NetworkResource{offer}
|
||||
option.GroupResources = &structs.AllocatedSharedResources{
|
||||
Networks: []*structs.NetworkResource{offer},
|
||||
DiskMB: int64(iter.taskGroup.EphemeralDisk.SizeMB),
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
for _, task := range iter.taskGroup.Tasks {
|
||||
// Allocate the resources
|
||||
taskResources := &structs.AllocatedTaskResources{
|
||||
|
|
|
@ -835,10 +835,8 @@ func genericAllocUpdateFn(ctx Context, stack Stack, evalID string) allocUpdateTy
|
|||
newAlloc.Job = nil // Use the Job in the Plan
|
||||
newAlloc.Resources = nil // Computed in Plan Apply
|
||||
newAlloc.AllocatedResources = &structs.AllocatedResources{
|
||||
Tasks: option.TaskResources,
|
||||
Shared: structs.AllocatedSharedResources{
|
||||
DiskMB: int64(newTG.EphemeralDisk.SizeMB),
|
||||
},
|
||||
Tasks: option.TaskResources,
|
||||
Shared: *option.GroupResources,
|
||||
}
|
||||
// Use metrics from existing alloc for in place upgrade
|
||||
// This is because if the inplace upgrade succeeded, any scoring metadata from
|
||||
|
|
Loading…
Reference in New Issue