diff --git a/client/allocrunner/network_hook.go b/client/allocrunner/network_hook.go index d06a1da47..bda1efc24 100644 --- a/client/allocrunner/network_hook.go +++ b/client/allocrunner/network_hook.go @@ -44,7 +44,7 @@ func (h *networkHook) Name() string { func (h *networkHook) Prerun() error { if h.manager == nil { - h.logger.Debug("shared network namespaces are not supported on this platform, skipping network hook") + h.logger.Trace("shared network namespaces are not supported on this platform, skipping network hook") return nil } tg := h.alloc.Job.LookupTaskGroup(h.alloc.TaskGroup) @@ -62,6 +62,9 @@ func (h *networkHook) Prerun() error { h.setter.SetNetworkIsolation(spec) } + if err := ConfigureNetworking(h.alloc, spec); err != nil { + return fmt.Errorf("failed to configure networking for alloc: %v", err) + } return nil } @@ -70,5 +73,8 @@ func (h *networkHook) Postrun() error { return nil } + if err := CleanupNetworking(h.alloc, h.spec); err != nil { + h.logger.Error("failed to cleanup network for allocation, resources may have leaked", "alloc", h.alloc.ID, "error", err) + } return h.manager.DestroyNetwork(h.alloc.ID, h.spec) } diff --git a/client/allocrunner/network_manager_linux.go b/client/allocrunner/network_manager_linux.go index b37ec0f39..9d9bd62b6 100644 --- a/client/allocrunner/network_manager_linux.go +++ b/client/allocrunner/network_manager_linux.go @@ -118,3 +118,38 @@ func netModeToIsolationMode(netMode string) drivers.NetIsolationMode { return drivers.NetIsolationModeHost } } + +func getPortMapping(alloc *structs.Allocation) []*nsutil.PortMapping { + ports := []*nsutil.PortMapping{} + for _, network := range alloc.AllocatedResources.Shared.Networks { + for _, port := range append(network.DynamicPorts, network.ReservedPorts...) { + for _, proto := range []string{"tcp", "udp"} { + ports = append(ports, &nsutil.PortMapping{ + Host: port.Value, + Container: port.To, + Proto: proto, + }) + } + } + } + return ports +} + +func ConfigureNetworking(alloc *structs.Allocation, spec *drivers.NetworkIsolationSpec) error { + + // TODO: CNI support + if err := nsutil.SetupBridgeNetworking(alloc.ID, spec.Path, getPortMapping(alloc)); err != nil { + return err + } + + return nil +} + +func CleanupNetworking(alloc *structs.Allocation, spec *drivers.NetworkIsolationSpec) error { + if err := nsutil.TeardownBridgeNetworking(alloc.ID, spec.Path, getPortMapping(alloc)); err != nil { + return err + } + + return nil + +} diff --git a/client/lib/nsutil/bridge_linux.go b/client/lib/nsutil/bridge_linux.go new file mode 100644 index 000000000..33e6d5f41 --- /dev/null +++ b/client/lib/nsutil/bridge_linux.go @@ -0,0 +1,97 @@ +package nsutil + +import ( + "context" + "fmt" + "os" + "path/filepath" + + "github.com/containernetworking/cni/libcni" +) + +const ( + EnvCNIPath = "CNI_PATH" +) + +type PortMapping struct { + Host int `json:"hostPort"` + Container int `json:"containerPort"` + Proto string `json:"protocol"` +} + +func SetupBridgeNetworking(allocID string, nsPath string, portMappings []*PortMapping) error { + netconf, err := libcni.ConfListFromBytes([]byte(nomadCNIConfig)) + if err != nil { + return err + } + containerID := fmt.Sprintf("nomad-%s", allocID[:8]) + cninet := libcni.NewCNIConfig(filepath.SplitList(os.Getenv(EnvCNIPath)), nil) + + rt := &libcni.RuntimeConf{ + ContainerID: containerID, + NetNS: nsPath, + IfName: "eth0", + CapabilityArgs: map[string]interface{}{ + "portMappings": portMappings, + }, + } + + result, err := cninet.AddNetworkList(context.TODO(), netconf, rt) + if result != nil { + result.Print() + } + + return err +} + +func TeardownBridgeNetworking(allocID, nsPath string, portMappings []*PortMapping) error { + netconf, err := libcni.ConfListFromBytes([]byte(nomadCNIConfig)) + if err != nil { + return err + } + + containerID := fmt.Sprintf("nomad-%s", allocID[:8]) + cninet := libcni.NewCNIConfig(filepath.SplitList(os.Getenv(EnvCNIPath)), nil) + rt := &libcni.RuntimeConf{ + ContainerID: containerID, + NetNS: nsPath, + IfName: "eth0", + CapabilityArgs: map[string]interface{}{ + "portMappings": portMappings, + }, + } + err = cninet.DelNetworkList(context.TODO(), netconf, rt) + + return err +} + +const nomadCNIConfig = `{ + "cniVersion": "0.4.0", + "name": "nomad", + "plugins": [ + { + "type": "bridge", + "bridge": "nomad", + "isDefaultGateway": true, + "ipMasq": true, + "ipam": { + "type": "host-local", + "ranges": [ + [ + { + "subnet": "172.26.66.0/23" + } + ] + ] + } + }, + { + "type": "firewall" + }, + { + "type": "portmap", + "capabilities": {"portMappings": true} + } + ] +} +` diff --git a/scheduler/generic_sched.go b/scheduler/generic_sched.go index 23e0745be..0dbda31c5 100644 --- a/scheduler/generic_sched.go +++ b/scheduler/generic_sched.go @@ -484,10 +484,8 @@ func (s *GenericScheduler) computePlacements(destructive, place []placementResul // Set fields based on if we found an allocation option if option != nil { resources := &structs.AllocatedResources{ - Tasks: option.TaskResources, - Shared: structs.AllocatedSharedResources{ - DiskMB: int64(tg.EphemeralDisk.SizeMB), - }, + Tasks: option.TaskResources, + Shared: *option.GroupResources, } // Create an allocation for this @@ -507,7 +505,8 @@ func (s *GenericScheduler) computePlacements(destructive, place []placementResul DesiredStatus: structs.AllocDesiredStatusRun, ClientStatus: structs.AllocClientStatusPending, SharedResources: &structs.Resources{ - DiskMB: tg.EphemeralDisk.SizeMB, + DiskMB: tg.EphemeralDisk.SizeMB, + Networks: tg.Networks, }, } diff --git a/scheduler/rank.go b/scheduler/rank.go index a35c691a0..b151f1fca 100644 --- a/scheduler/rank.go +++ b/scheduler/rank.go @@ -17,10 +17,11 @@ const ( // along with a node when iterating. This state can be modified as // various rank methods are applied. type RankedNode struct { - Node *structs.Node - FinalScore float64 - Scores []float64 - TaskResources map[string]*structs.AllocatedTaskResources + Node *structs.Node + FinalScore float64 + Scores []float64 + TaskResources map[string]*structs.AllocatedTaskResources + GroupResources *structs.AllocatedSharedResources // Allocs is used to cache the proposed allocations on the // node. This can be shared between iterators that require it. @@ -224,6 +225,59 @@ OUTER: } preemptor.SetPreemptions(currentPreemptions) + // Check if we need task group network resource + if len(iter.taskGroup.Networks) > 0 { + ask := iter.taskGroup.Networks[0].Copy() + offer, err := netIdx.AssignNetwork(ask) + if offer == nil { + // If eviction is not enabled, mark this node as exhausted and continue + if !iter.evict { + iter.ctx.Metrics().ExhaustedNode(option.Node, + fmt.Sprintf("network: %s", err)) + netIdx.Release() + continue OUTER + } + + // Look for preemptible allocations to satisfy the network resource for this task + preemptor.SetCandidates(proposed) + + netPreemptions := preemptor.PreemptForNetwork(ask, netIdx) + if netPreemptions == nil { + iter.ctx.Logger().Named("binpack").Error("preemption not possible ", "network_resource", ask) + netIdx.Release() + continue OUTER + } + allocsToPreempt = append(allocsToPreempt, netPreemptions...) + + // First subtract out preempted allocations + proposed = structs.RemoveAllocs(proposed, netPreemptions) + + // Reset the network index and try the offer again + netIdx.Release() + netIdx = structs.NewNetworkIndex() + netIdx.SetNode(option.Node) + netIdx.AddAllocs(proposed) + + offer, err = netIdx.AssignNetwork(ask) + if offer == nil { + iter.ctx.Logger().Named("binpack").Error("unexpected error, unable to create network offer after considering preemption", "error", err) + netIdx.Release() + continue OUTER + } + } + + // Reserve this to prevent another task from colliding + netIdx.AddReserved(offer) + + // Update the network ask to the offer + total.Shared.Networks = []*structs.NetworkResource{offer} + option.GroupResources = &structs.AllocatedSharedResources{ + Networks: []*structs.NetworkResource{offer}, + DiskMB: int64(iter.taskGroup.EphemeralDisk.SizeMB), + } + + } + for _, task := range iter.taskGroup.Tasks { // Allocate the resources taskResources := &structs.AllocatedTaskResources{ diff --git a/scheduler/util.go b/scheduler/util.go index 72015fdca..f21575d36 100644 --- a/scheduler/util.go +++ b/scheduler/util.go @@ -835,10 +835,8 @@ func genericAllocUpdateFn(ctx Context, stack Stack, evalID string) allocUpdateTy newAlloc.Job = nil // Use the Job in the Plan newAlloc.Resources = nil // Computed in Plan Apply newAlloc.AllocatedResources = &structs.AllocatedResources{ - Tasks: option.TaskResources, - Shared: structs.AllocatedSharedResources{ - DiskMB: int64(newTG.EphemeralDisk.SizeMB), - }, + Tasks: option.TaskResources, + Shared: *option.GroupResources, } // Use metrics from existing alloc for in place upgrade // This is because if the inplace upgrade succeeded, any scoring metadata from