Implemented port labeling and driver configs

This commit is contained in:
Diptanu Choudhury 2015-11-13 18:09:42 -08:00 committed by Alex Dadgar
parent 0126c80961
commit 4e05b27111
12 changed files with 208 additions and 165 deletions

View file

@ -16,6 +16,7 @@ import (
"github.com/hashicorp/nomad/client/driver/args"
"github.com/hashicorp/nomad/client/fingerprint"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/mitchellh/mapstructure"
)
type DockerDriver struct {
@ -23,6 +24,28 @@ type DockerDriver struct {
fingerprint.StaticFingerprinter
}
type dockerDriverConfig struct {
ImageName string `mapstructure:"image"`
Command string `mapstructure:"command"`
Args string `mapstructure:"args"`
NetworkMode string `mapstructure:"network_mode"`
PortMap map[string]int `mapstructure:"port_map"`
UserName string `mapstructure:"auth.username"`
Password string `mapstructure:"auth.password`
Email string `mapstructure:"auth.email"`
ServerAddress string `mapstructure:"auth.server_address`
Privileged bool `mapstructure:"privileged"`
DNS string `mapstructure:"dns_server"`
SearchDomains string `mapstructure:"search_domains"`
}
func (c *dockerDriverConfig) Validate() error {
if c.ImageName == "" {
return fmt.Errorf("Docker Driver needs an image name")
}
return nil
}
type dockerPID struct {
ImageID string
ContainerID string
@ -116,7 +139,7 @@ func (d *DockerDriver) containerBinds(alloc *allocdir.AllocDir, task *structs.Ta
}
// createContainer initializes a struct needed to call docker.client.CreateContainer()
func (d *DockerDriver) createContainer(ctx *ExecContext, task *structs.Task) (docker.CreateContainerOptions, error) {
func (d *DockerDriver) createContainer(ctx *ExecContext, task *structs.Task, driverConfig *dockerDriverConfig) (docker.CreateContainerOptions, error) {
var c docker.CreateContainerOptions
if task.Resources == nil {
d.logger.Printf("[ERR] driver.docker: task.Resources is empty")
@ -134,8 +157,7 @@ func (d *DockerDriver) createContainer(ctx *ExecContext, task *structs.Task) (do
env.SetTaskLocalDir(filepath.Join("/", allocdir.TaskLocal))
config := &docker.Config{
Env: env.List(),
Image: task.Config["image"],
Image: driverConfig.ImageName,
}
hostConfig := &docker.HostConfig{
@ -184,11 +206,7 @@ func (d *DockerDriver) createContainer(ctx *ExecContext, task *structs.Task) (do
return c, fmt.Errorf("Unable to parse docker.privileged.enabled: %s", err)
}
if v, ok := task.Config["privileged"]; ok {
taskPrivileged, err := strconv.ParseBool(v)
if err != nil {
return c, fmt.Errorf("Unable to parse boolean value from task config option 'privileged': %v", err)
}
if taskPrivileged := driverConfig.Privileged; taskPrivileged {
if taskPrivileged && !hostPrivileged {
return c, fmt.Errorf(`Unable to set privileged flag since "docker.privileged.enabled" is false`)
}
@ -197,9 +215,9 @@ func (d *DockerDriver) createContainer(ctx *ExecContext, task *structs.Task) (do
}
// set DNS servers
dns, ok := task.Config["dns-servers"]
dns := driverConfig.DNS
if ok && dns != "" {
if dns != "" {
for _, v := range strings.Split(dns, ",") {
ip := strings.TrimSpace(v)
if net.ParseIP(ip) != nil {
@ -211,16 +229,16 @@ func (d *DockerDriver) createContainer(ctx *ExecContext, task *structs.Task) (do
}
// set DNS search domains
dnsSearch, ok := task.Config["search-domains"]
dnsSearch := driverConfig.SearchDomains
if ok && dnsSearch != "" {
if dnsSearch != "" {
for _, v := range strings.Split(dnsSearch, ",") {
hostConfig.DNSSearch = append(hostConfig.DNSSearch, strings.TrimSpace(v))
}
}
mode, ok := task.Config["network_mode"]
if !ok || mode == "" {
mode := driverConfig.NetworkMode
if mode == "" {
// docker default
d.logger.Printf("[WARN] driver.docker: no mode specified for networking, defaulting to bridge")
mode = "bridge"
@ -245,45 +263,38 @@ func (d *DockerDriver) createContainer(ctx *ExecContext, task *structs.Task) (do
publishedPorts := map[docker.Port][]docker.PortBinding{}
exposedPorts := map[docker.Port]struct{}{}
for _, port := range network.ListStaticPorts() {
publishedPorts[docker.Port(strconv.Itoa(port)+"/tcp")] = []docker.PortBinding{docker.PortBinding{HostIP: network.IP, HostPort: strconv.Itoa(port)}}
publishedPorts[docker.Port(strconv.Itoa(port)+"/udp")] = []docker.PortBinding{docker.PortBinding{HostIP: network.IP, HostPort: strconv.Itoa(port)}}
for _, port := range network.ReservedPorts {
publishedPorts[docker.Port(strconv.Itoa(port.Value)+"/tcp")] = []docker.PortBinding{docker.PortBinding{HostIP: network.IP, HostPort: strconv.Itoa(port.Value)}}
publishedPorts[docker.Port(strconv.Itoa(port.Value)+"/udp")] = []docker.PortBinding{docker.PortBinding{HostIP: network.IP, HostPort: strconv.Itoa(port.Value)}}
d.logger.Printf("[DEBUG] driver.docker: allocated port %s:%d -> %d (static)\n", network.IP, port, port)
exposedPorts[docker.Port(strconv.Itoa(port)+"/tcp")] = struct{}{}
exposedPorts[docker.Port(strconv.Itoa(port)+"/udp")] = struct{}{}
exposedPorts[docker.Port(strconv.Itoa(port.Value)+"/tcp")] = struct{}{}
exposedPorts[docker.Port(strconv.Itoa(port.Value)+"/udp")] = struct{}{}
d.logger.Printf("[DEBUG] driver.docker: exposed port %d\n", port)
}
for label, port := range network.MapDynamicPorts() {
// If the label is numeric we expect that there is a service
// listening on that port inside the container. In this case we'll
// setup a mapping from our random host port to the label port.
//
// Otherwise we'll setup a direct 1:1 mapping from the host port to
// the container, and assume that the process inside will read the
// environment variable and bind to the correct port.
if _, err := strconv.Atoi(label); err == nil {
publishedPorts[docker.Port(label+"/tcp")] = []docker.PortBinding{docker.PortBinding{HostIP: network.IP, HostPort: strconv.Itoa(port)}}
publishedPorts[docker.Port(label+"/udp")] = []docker.PortBinding{docker.PortBinding{HostIP: network.IP, HostPort: strconv.Itoa(port)}}
d.logger.Printf("[DEBUG] driver.docker: allocated port %s:%d -> %s (mapped)", network.IP, port, label)
exposedPorts[docker.Port(label+"/tcp")] = struct{}{}
exposedPorts[docker.Port(label+"/udp")] = struct{}{}
d.logger.Printf("[DEBUG] driver.docker: exposed port %d\n", port)
} else {
publishedPorts[docker.Port(strconv.Itoa(port)+"/tcp")] = []docker.PortBinding{docker.PortBinding{HostIP: network.IP, HostPort: strconv.Itoa(port)}}
publishedPorts[docker.Port(strconv.Itoa(port)+"/udp")] = []docker.PortBinding{docker.PortBinding{HostIP: network.IP, HostPort: strconv.Itoa(port)}}
d.logger.Printf("[DEBUG] driver.docker: allocated port %s:%d -> %d for label %s\n", network.IP, port, port, label)
exposedPorts[docker.Port(strconv.Itoa(port)+"/tcp")] = struct{}{}
exposedPorts[docker.Port(strconv.Itoa(port)+"/udp")] = struct{}{}
d.logger.Printf("[DEBUG] driver.docker: exposed port %d\n", port)
containerToHostPortMap := make(map[string]int)
for _, port := range network.DynamicPorts {
containerPort, ok := driverConfig.PortMap[port.Label]
if !ok {
containerPort = port.Value
}
cp := strconv.Itoa(containerPort)
hostPort := strconv.Itoa(port.Value)
publishedPorts[docker.Port(cp+"/tcp")] = []docker.PortBinding{docker.PortBinding{HostIP: network.IP, HostPort: hostPort}}
publishedPorts[docker.Port(cp+"/udp")] = []docker.PortBinding{docker.PortBinding{HostIP: network.IP, HostPort: hostPort}}
d.logger.Printf("[DEBUG] driver.docker: allocated port %s:%d -> %d (mapped)", network.IP, port.Value, containerPort)
exposedPorts[docker.Port(cp+"/tcp")] = struct{}{}
exposedPorts[docker.Port(cp+"/udp")] = struct{}{}
d.logger.Printf("[DEBUG] driver.docker: exposed port %d\n", hostPort)
containerToHostPortMap[cp] = port.Value
}
env.SetPorts(containerToHostPortMap)
hostConfig.PortBindings = publishedPorts
config.ExposedPorts = exposedPorts
}
rawArgs, hasArgs := task.Config["args"]
rawArgs := driverConfig.Args
parsedArgs, err := args.ParseAndReplace(rawArgs, env.Map())
if err != nil {
return c, err
@ -291,16 +302,17 @@ func (d *DockerDriver) createContainer(ctx *ExecContext, task *structs.Task) (do
// If the user specified a custom command to run as their entrypoint, we'll
// inject it here.
if command, ok := task.Config["command"]; ok {
if command := driverConfig.Command; command != "" {
cmd := []string{command}
if hasArgs {
if driverConfig.Args != "" {
cmd = append(cmd, parsedArgs...)
}
config.Cmd = cmd
} else if hasArgs {
} else if driverConfig.Args != "" {
d.logger.Println("[DEBUG] driver.docker: ignoring args because command not specified")
}
config.Env = env.List()
return docker.CreateContainerOptions{
Config: config,
HostConfig: hostConfig,
@ -308,10 +320,14 @@ func (d *DockerDriver) createContainer(ctx *ExecContext, task *structs.Task) (do
}
func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, error) {
// Get the image from config
image, ok := task.Config["image"]
if !ok || image == "" {
return nil, fmt.Errorf("Image not specified")
var driverConfig dockerDriverConfig
if err := mapstructure.WeakDecode(task.Config, &driverConfig); err != nil {
return nil, err
}
image := driverConfig.ImageName
if err := driverConfig.Validate(); err != nil {
return nil, err
}
if task.Resources == nil {
return nil, fmt.Errorf("Resources are not specified")
@ -361,10 +377,10 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle
}
authOptions := docker.AuthConfiguration{
Username: task.Config["auth.username"],
Password: task.Config["auth.password"],
Email: task.Config["auth.email"],
ServerAddress: task.Config["auth.server-address"],
Username: driverConfig.UserName,
Password: driverConfig.Password,
Email: driverConfig.Email,
ServerAddress: driverConfig.ServerAddress,
}
err = client.PullImage(pullOptions, authOptions)
@ -384,7 +400,7 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle
d.logger.Printf("[DEBUG] driver.docker: using image %s", dockerImage.ID)
d.logger.Printf("[INFO] driver.docker: identified image %s as %s", image, dockerImage.ID)
config, err := d.createContainer(ctx, task)
config, err := d.createContainer(ctx, task, &driverConfig)
if err != nil {
d.logger.Printf("[ERR] driver.docker: %s", err)
return nil, fmt.Errorf("Failed to create container config for image %s", image)

View file

@ -133,7 +133,6 @@ func TaskEnvironmentVariables(ctx *ExecContext, task *structs.Task) environment.
if len(task.Resources.Networks) > 0 {
network := task.Resources.Networks[0]
env.SetTaskIp(network.IP)
env.SetPorts(network.MapDynamicPorts())
}
}

View file

@ -13,6 +13,7 @@ import (
"github.com/hashicorp/nomad/client/fingerprint"
"github.com/hashicorp/nomad/client/getter"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/mitchellh/mapstructure"
)
// ExecDriver fork/execs tasks using as many of the underlying OS's isolation
@ -21,6 +22,12 @@ type ExecDriver struct {
DriverContext
fingerprint.StaticFingerprinter
}
type execDriverConfig struct {
ArtifactSource string `mapstructure:"artifact_source`
Checksum string `mapstructure:"checksum"`
Command string `mapstructure:"command"`
Args string `mapstructure:"args"`
}
// execHandle is returned from Start/Open as a handle to the PID
type execHandle struct {
@ -49,9 +56,13 @@ func (d *ExecDriver) Fingerprint(cfg *config.Config, node *structs.Node) (bool,
}
func (d *ExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, error) {
var driverConfig execDriverConfig
if err := mapstructure.WeakDecode(task.Config, &driverConfig); err != nil {
return nil, err
}
// Get the command to be ran
command, ok := task.Config["command"]
if !ok || command == "" {
command := driverConfig.Command
if command == "" {
return nil, fmt.Errorf("missing command for exec driver")
}
@ -67,8 +78,8 @@ func (d *ExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
// Proceed to download an artifact to be executed.
_, err := getter.GetArtifact(
filepath.Join(taskDir, allocdir.TaskLocal),
task.Config["artifact_source"],
task.Config["checksum"],
driverConfig.ArtifactSource,
driverConfig.Checksum,
d.logger,
)
if err != nil {
@ -81,7 +92,7 @@ func (d *ExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
// Look for arguments
var args []string
if argRaw, ok := task.Config["args"]; ok {
if argRaw := driverConfig.Args; argRaw != "" {
args = append(args, argRaw)
}

View file

@ -21,7 +21,7 @@ var (
Networks: []*structs.NetworkResource{
&structs.NetworkResource{
MBits: 50,
DynamicPorts: []string{"http"},
DynamicPorts: []structs.Port{structs.Port{Label: "http"}},
},
},
}

View file

@ -16,6 +16,7 @@ import (
"github.com/hashicorp/nomad/client/fingerprint"
"github.com/hashicorp/nomad/client/getter"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/mitchellh/mapstructure"
)
// JavaDriver is a simple driver to execute applications packaged in Jars.
@ -25,6 +26,13 @@ type JavaDriver struct {
fingerprint.StaticFingerprinter
}
type javaDriverConfig struct {
JvmOpts string `mapstructure:"jvm_options"`
ArtifactSource string `mapstructure:"artifact_source`
Checksum string `mapstructure:"checksum"`
Args string `mapstructure:"args"`
}
// javaHandle is returned from Start/Open as a handle to the PID
type javaHandle struct {
cmd executor.Executor
@ -90,6 +98,10 @@ func (d *JavaDriver) Fingerprint(cfg *config.Config, node *structs.Node) (bool,
}
func (d *JavaDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, error) {
var driverConfig javaDriverConfig
if err := mapstructure.WeakDecode(task.Config, &driverConfig); err != nil {
return nil, err
}
taskDir, ok := ctx.AllocDir.TaskDirs[d.DriverContext.taskName]
if !ok {
return nil, fmt.Errorf("Could not find task directory for task: %v", d.DriverContext.taskName)
@ -98,8 +110,8 @@ func (d *JavaDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
// Proceed to download an artifact to be executed.
path, err := getter.GetArtifact(
filepath.Join(taskDir, allocdir.TaskLocal),
task.Config["artifact_source"],
task.Config["checksum"],
driverConfig.ArtifactSource,
driverConfig.Checksum,
d.logger,
)
if err != nil {
@ -113,15 +125,15 @@ func (d *JavaDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
args := []string{}
// Look for jvm options
jvm_options, ok := task.Config["jvm_options"]
if ok && jvm_options != "" {
jvm_options := driverConfig.JvmOpts
if jvm_options != "" {
d.logger.Printf("[DEBUG] driver.java: found JVM options: %s", jvm_options)
args = append(args, jvm_options)
}
// Build the argument list.
args = append(args, "-jar", filepath.Join(allocdir.TaskLocal, jarName))
if argRaw, ok := task.Config["args"]; ok {
if argRaw := driverConfig.Args; argRaw != "" {
args = append(args, argRaw)
}

View file

@ -16,6 +16,7 @@ import (
"github.com/hashicorp/nomad/client/fingerprint"
"github.com/hashicorp/nomad/client/getter"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/mitchellh/mapstructure"
)
var (
@ -30,6 +31,13 @@ type QemuDriver struct {
fingerprint.StaticFingerprinter
}
type qemuDriverConfig struct {
ArtifactSource string `mapstructure:"artifact_source`
Checksum string `mapstructure:"checksum"`
Accelerator string `mapstructure:"accelerator"`
GuestPorts string `mapstructure:"guest_ports"`
}
// qemuHandle is returned from Start/Open as a handle to the PID
type qemuHandle struct {
cmd executor.Executor
@ -69,6 +77,10 @@ func (d *QemuDriver) Fingerprint(cfg *config.Config, node *structs.Node) (bool,
// Run an existing Qemu image. Start() will pull down an existing, valid Qemu
// image and save it to the Drivers Allocation Dir
func (d *QemuDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, error) {
var driverConfig qemuDriverConfig
if err := mapstructure.WeakDecode(task.Config, &driverConfig); err != nil {
return nil, err
}
// Get the image source
source, ok := task.Config["artifact_source"]
if !ok || source == "" {
@ -90,8 +102,8 @@ func (d *QemuDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
// Proceed to download an artifact to be executed.
vmPath, err := getter.GetArtifact(
filepath.Join(taskDir, allocdir.TaskLocal),
task.Config["artifact_source"],
task.Config["checksum"],
driverConfig.ArtifactSource,
driverConfig.Checksum,
d.logger,
)
if err != nil {
@ -103,7 +115,7 @@ func (d *QemuDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
// Parse configuration arguments
// Create the base arguments
accelerator := "tcg"
if acc, ok := task.Config["accelerator"]; ok {
if acc := driverConfig.Accelerator; acc != "" {
accelerator = acc
}
// TODO: Check a lower bounds, e.g. the default 128 of Qemu
@ -132,7 +144,7 @@ func (d *QemuDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
// the Reserved ports in the Task Resources
// Users can supply guest_hosts as a list of posts to map on the guest vm.
// These map 1:1 with the requested Reserved Ports from the hostmachine.
ports := strings.Split(task.Config["guest_ports"], ",")
ports := strings.Split(driverConfig.GuestPorts, ",")
if len(ports) == 0 {
return nil, fmt.Errorf("[ERR] driver.qemu: Error parsing required Guest Ports")
}
@ -149,7 +161,7 @@ func (d *QemuDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
reservedPorts := task.Resources.Networks[0].ReservedPorts
var forwarding string
for i, p := range ports {
forwarding = fmt.Sprintf("%s,hostfwd=tcp::%s-:%s", forwarding, strconv.Itoa(reservedPorts[i]), p)
forwarding = fmt.Sprintf("%s,hostfwd=tcp::%s-:%s", forwarding, strconv.Itoa(reservedPorts[i].Value), p)
}
if "" == forwarding {

View file

@ -12,6 +12,7 @@ import (
"github.com/hashicorp/nomad/client/fingerprint"
"github.com/hashicorp/nomad/client/getter"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/mitchellh/mapstructure"
)
const (
@ -56,6 +57,10 @@ func (d *RawExecDriver) Fingerprint(cfg *config.Config, node *structs.Node) (boo
}
func (d *RawExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, error) {
var driverConfig execDriverConfig
if err := mapstructure.WeakDecode(task.Config, &driverConfig); err != nil {
return nil, err
}
// Get the tasks local directory.
taskName := d.DriverContext.taskName
taskDir, ok := ctx.AllocDir.TaskDirs[taskName]
@ -64,8 +69,8 @@ func (d *RawExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandl
}
// Get the command to be ran
command, ok := task.Config["command"]
if !ok || command == "" {
command := driverConfig.Command
if command == "" {
return nil, fmt.Errorf("missing command for Raw Exec driver")
}
@ -75,8 +80,8 @@ func (d *RawExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandl
// Proceed to download an artifact to be executed.
_, err := getter.GetArtifact(
filepath.Join(taskDir, allocdir.TaskLocal),
task.Config["artifact_source"],
task.Config["checksum"],
driverConfig.ArtifactSource,
driverConfig.Checksum,
d.logger,
)
if err != nil {
@ -89,7 +94,7 @@ func (d *RawExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandl
// Look for arguments
var args []string
if argRaw, ok := task.Config["args"]; ok {
if argRaw := driverConfig.Args; argRaw != "" {
args = append(args, argRaw)
}

View file

@ -19,6 +19,7 @@ import (
"github.com/hashicorp/nomad/client/driver/args"
"github.com/hashicorp/nomad/client/fingerprint"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/mitchellh/mapstructure"
)
var (
@ -34,6 +35,11 @@ type RktDriver struct {
fingerprint.StaticFingerprinter
}
type rktDriverConfig struct {
ImageName string `mapstructure:"image"`
Args string `mapstructure:"args"`
}
// rktHandle is returned from Start/Open as a handle to the PID
type rktHandle struct {
proc *os.Process
@ -83,9 +89,13 @@ func (d *RktDriver) Fingerprint(cfg *config.Config, node *structs.Node) (bool, e
// Run an existing Rkt image.
func (d *RktDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, error) {
var driverConfig rktDriverConfig
if err := mapstructure.WeakDecode(task.Config, &driverConfig); err != nil {
return nil, err
}
// Validate that the config is valid.
img, ok := task.Config["image"]
if !ok || img == "" {
img := driverConfig.ImageName
if img == "" {
return nil, fmt.Errorf("Missing ACI image for rkt")
}
@ -139,7 +149,7 @@ func (d *RktDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, e
}
// Add user passed arguments.
if userArgs, ok := task.Config["args"]; ok {
if userArgs := driverConfig.Args; userArgs != "" {
parsed, err := args.ParseAndReplace(userArgs, envVars.Map())
if err != nil {
return nil, err

View file

@ -8,7 +8,6 @@ import (
"path/filepath"
"regexp"
"strconv"
"strings"
"github.com/hashicorp/hcl"
"github.com/hashicorp/hcl/hcl/ast"
@ -408,6 +407,7 @@ func parseTasks(result *[]*structs.Task, list *ast.ObjectList) error {
if err := hcl.DecodeObject(&m, o.Val); err != nil {
return err
}
if err := mapstructure.WeakDecode(m, &t.Config); err != nil {
return err
}
@ -496,21 +496,14 @@ func parseResources(result *structs.Resources, list *ast.ObjectList) error {
return err
}
// Keep track of labels we've already seen so we can ensure there
// are no collisions when we turn them into environment variables.
// lowercase:NomalCase so we can get the first for the error message
seenLabel := map[string]string{}
for _, label := range r.DynamicPorts {
if !reDynamicPorts.MatchString(label) {
return errDynamicPorts
}
first, seen := seenLabel[strings.ToLower(label)]
if seen {
return fmt.Errorf("Found a port label collision: `%s` overlaps with previous `%s`", label, first)
} else {
seenLabel[strings.ToLower(label)] = label
}
var networkObj *ast.ObjectList
if ot, ok := o.Items[0].Val.(*ast.ObjectType); ok {
listVal = ot.List
} else {
return fmt.Errorf("resource: should be an object")
}
if err := parsePorts(networkObj, &r); err != nil {
return err
}
result.Networks = []*structs.NetworkResource{&r}
@ -519,6 +512,32 @@ func parseResources(result *structs.Resources, list *ast.ObjectList) error {
return nil
}
func parsePorts(networkObj *ast.ObjectList, nw *structs.NetworkResource) error {
portsObjList := networkObj.Filter("Port")
knownPortLabels := make(map[string]bool)
for _, port := range portsObjList.Items {
label := port.Keys[0].Token.Value().(string)
if knownPortLabels[label] {
return fmt.Errorf("Found a port label collision: %s", label)
}
var p map[string]interface{}
var res structs.Port
if err := hcl.DecodeObject(&p, port.Val); err != nil {
return err
}
if err := mapstructure.WeakDecode(p, &res); err != nil {
return err
}
res.Label = label
if res.Value > 0 {
nw.ReservedPorts = append(nw.ReservedPorts, res)
} else {
nw.DynamicPorts = append(nw.DynamicPorts, res)
}
}
return nil
}
func parseUpdate(result *structs.UpdateStrategy, list *ast.ObjectList) error {
list = list.Elem()
if len(list.Items) > 1 {

View file

@ -37,7 +37,7 @@ func Node() *structs.Node {
&structs.NetworkResource{
Device: "eth0",
IP: "192.168.0.100",
ReservedPorts: []int{22},
ReservedPorts: []structs.Port{structs.Port{Label: "main", Value: 22}},
MBits: 1,
},
},
@ -83,7 +83,7 @@ func Job() *structs.Job {
&structs.Task{
Name: "web",
Driver: "exec",
Config: map[string]string{
Config: map[string]interface{}{
"command": "/bin/date",
"args": "+%s",
},
@ -96,7 +96,7 @@ func Job() *structs.Job {
Networks: []*structs.NetworkResource{
&structs.NetworkResource{
MBits: 50,
DynamicPorts: []string{"http"},
DynamicPorts: []structs.Port{structs.Port{Label: "http"}},
},
},
},
@ -148,7 +148,7 @@ func SystemJob() *structs.Job {
&structs.Task{
Name: "web",
Driver: "exec",
Config: map[string]string{
Config: map[string]interface{}{
"command": "/bin/date",
"args": "+%s",
},
@ -158,7 +158,7 @@ func SystemJob() *structs.Job {
Networks: []*structs.NetworkResource{
&structs.NetworkResource{
MBits: 50,
DynamicPorts: []string{"http"},
DynamicPorts: []structs.Port{structs.Port{Label: "http"}},
},
},
},
@ -200,9 +200,9 @@ func Alloc() *structs.Allocation {
&structs.NetworkResource{
Device: "eth0",
IP: "192.168.0.100",
ReservedPorts: []int{12345},
ReservedPorts: []structs.Port{structs.Port{Label: "main", Value: 12345}},
MBits: 100,
DynamicPorts: []string{"http"},
DynamicPorts: []structs.Port{structs.Port{Label: "http"}},
},
},
},
@ -214,9 +214,9 @@ func Alloc() *structs.Allocation {
&structs.NetworkResource{
Device: "eth0",
IP: "192.168.0.100",
ReservedPorts: []int{5000},
ReservedPorts: []structs.Port{structs.Port{Label: "main", Value: 5000}},
MBits: 50,
DynamicPorts: []string{"http"},
DynamicPorts: []structs.Port{structs.Port{Label: "http"}},
},
},
},

View file

@ -96,10 +96,10 @@ func (idx *NetworkIndex) AddReserved(n *NetworkResource) (collide bool) {
idx.UsedPorts[n.IP] = used
}
for _, port := range n.ReservedPorts {
if _, ok := used[port]; ok {
if _, ok := used[port.Value]; ok {
collide = true
} else {
used[port] = struct{}{}
used[port.Value] = struct{}{}
}
}
@ -151,7 +151,7 @@ func (idx *NetworkIndex) AssignNetwork(ask *NetworkResource) (out *NetworkResour
// Check if any of the reserved ports are in use
for _, port := range ask.ReservedPorts {
if _, ok := idx.UsedPorts[ipStr][port]; ok {
if _, ok := idx.UsedPorts[ipStr][port.Value]; ok {
err = fmt.Errorf("reserved port collision")
return
}
@ -179,10 +179,10 @@ func (idx *NetworkIndex) AssignNetwork(ask *NetworkResource) (out *NetworkResour
if _, ok := idx.UsedPorts[ipStr][randPort]; ok {
goto PICK
}
if IntContains(offer.ReservedPorts, randPort) {
if isPortReserved(offer.ReservedPorts, randPort) {
goto PICK
}
offer.ReservedPorts = append(offer.ReservedPorts, randPort)
offer.DynamicPorts[i].Value = randPort
}
// Stop, we have an offer!
@ -194,9 +194,9 @@ func (idx *NetworkIndex) AssignNetwork(ask *NetworkResource) (out *NetworkResour
}
// IntContains scans an integer slice for a value
func IntContains(haystack []int, needle int) bool {
func isPortReserved(haystack []Port, needle int) bool {
for _, item := range haystack {
if item == needle {
if item.Value == needle {
return true
}
}

View file

@ -620,15 +620,20 @@ func (r *Resources) GoString() string {
return fmt.Sprintf("*%#v", *r)
}
type Port struct {
Label string
Value int `mapstructure:"static"`
}
// NetworkResource is used to represent available network
// resources
type NetworkResource struct {
Device string // Name of the device
CIDR string // CIDR block of addresses
IP string // IP address
MBits int // Throughput
ReservedPorts []int `mapstructure:"reserved_ports"` // Reserved ports
DynamicPorts []string `mapstructure:"dynamic_ports"` // Dynamically assigned ports
Device string // Name of the device
CIDR string // CIDR block of addresses
IP string // IP address
MBits int // Throughput
ReservedPorts []Port // Reserved ports
DynamicPorts []Port // Dynamically assigned ports
}
// Copy returns a deep copy of the network resource
@ -636,7 +641,7 @@ func (n *NetworkResource) Copy() *NetworkResource {
newR := new(NetworkResource)
*newR = *n
if n.ReservedPorts != nil {
newR.ReservedPorts = make([]int, len(n.ReservedPorts))
newR.ReservedPorts = make([]Port, len(n.ReservedPorts))
copy(newR.ReservedPorts, n.ReservedPorts)
}
return newR
@ -656,52 +661,6 @@ func (n *NetworkResource) GoString() string {
return fmt.Sprintf("*%#v", *n)
}
// MapDynamicPorts returns a mapping of Label:PortNumber for dynamic ports
// allocated on this NetworkResource. The ordering of Label:Port pairs is
// random.
//
// Details:
//
// The jobspec lets us ask for two types of ports: Reserved ports and Dynamic
// ports. Reserved ports are identified by the port number, while Dynamic ports
// are identified by a Label.
//
// When we ask nomad to run a job it checks to see if the Reserved ports we
// requested are available. If they are, it then tries to provision any Dynamic
// ports that we have requested. When available ports are found to satisfy our
// dynamic port requirements, they are APPENDED to the reserved ports list. In
// effect, the reserved ports list serves double-duty. First it indicates the
// ports we *want*, and then it indicates the ports we are *using*.
//
// After the the offer process is complete and the job is scheduled we want to
// see which ports were made available to us. To see the dynamic ports that
// were allocated to us we look at the last N ports in our reservation, where N
// is how many dynamic ports we requested.
//
// MapDynamicPorts matches these port numbers with their labels and gives you
// the port mapping.
//
// Also, be aware that this is intended to be called in the context of
// task.Resources after an offer has been made. If you call it in some other
// context the behavior is unspecified, including maybe crashing. So don't do that.
func (n *NetworkResource) MapDynamicPorts() map[string]int {
ports := n.ReservedPorts[len(n.ReservedPorts)-len(n.DynamicPorts):]
mapping := make(map[string]int, len(n.DynamicPorts))
for idx, label := range n.DynamicPorts {
mapping[label] = ports[idx]
}
return mapping
}
// ListStaticPorts returns the list of Static ports allocated to this
// NetworkResource. These are presumed to have known semantics so there is no
// mapping information.
func (n *NetworkResource) ListStaticPorts() []int {
return n.ReservedPorts[:len(n.ReservedPorts)-len(n.DynamicPorts)]
}
const (
// JobTypeNomad is reserved for internal system tasks and is
// always handled by the CoreScheduler.
@ -1032,7 +991,7 @@ type Task struct {
Driver string
// Config is provided to the driver to initialize
Config map[string]string
Config map[string]interface{}
// Map of environment variables to be used by the driver
Env map[string]string