Merge pull request #46 from hashicorp/f-job-parser

Job Spec Parser
This commit is contained in:
Armon Dadgar 2015-09-15 10:16:28 -07:00
commit cd50f9112b
7 changed files with 660 additions and 5 deletions

363
jobspec/parse.go Normal file
View file

@ -0,0 +1,363 @@
package jobspec
import (
"bytes"
"fmt"
"io"
"os"
"path/filepath"
"github.com/hashicorp/hcl"
hclobj "github.com/hashicorp/hcl/hcl"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/mitchellh/mapstructure"
)
// Parse parses the job spec from the given io.Reader.
//
// Due to current internal limitations, the entire contents of the
// io.Reader will be copied into memory first before parsing.
func Parse(r io.Reader) (*structs.Job, error) {
// Copy the reader into an in-memory buffer first since HCL requires it.
var buf bytes.Buffer
if _, err := io.Copy(&buf, r); err != nil {
return nil, err
}
// Parse the buffer
obj, err := hcl.Parse(buf.String())
if err != nil {
return nil, fmt.Errorf("error parsing: %s", err)
}
buf.Reset()
var job structs.Job
// Parse the job out
jobO := obj.Get("job", false)
if jobO == nil {
return nil, fmt.Errorf("'job' stanza not found")
}
if err := parseJob(&job, jobO); err != nil {
return nil, fmt.Errorf("error parsing 'job': %s", err)
}
return &job, nil
}
// ParseFile parses the given path as a job spec.
func ParseFile(path string) (*structs.Job, error) {
path, err := filepath.Abs(path)
if err != nil {
return nil, err
}
f, err := os.Open(path)
if err != nil {
return nil, err
}
defer f.Close()
return Parse(f)
}
func parseJob(result *structs.Job, obj *hclobj.Object) error {
if obj.Len() > 1 {
return fmt.Errorf("only one 'job' block allowed")
}
// Get our job object
obj = obj.Elem(true)[0]
// Decode the full thing into a map[string]interface for ease
var m map[string]interface{}
if err := hcl.DecodeObject(&m, obj); err != nil {
return err
}
delete(m, "constraint")
delete(m, "meta")
// Set the name to the object key
result.Name = obj.Key
// Defaults
result.Priority = 50
result.Region = "global"
result.Type = "service"
// Decode the rest
if err := mapstructure.WeakDecode(m, result); err != nil {
return err
}
// Parse constraints
if o := obj.Get("constraint", false); o != nil {
if err := parseConstraints(&result.Constraints, o); err != nil {
return err
}
}
// Parse out meta fields. These are in HCL as a list so we need
// to iterate over them and merge them.
if metaO := obj.Get("meta", false); metaO != nil {
for _, o := range metaO.Elem(false) {
var m map[string]interface{}
if err := hcl.DecodeObject(&m, o); err != nil {
return err
}
if err := mapstructure.WeakDecode(m, &result.Meta); err != nil {
return err
}
}
}
// If we have tasks outside, do those
if o := obj.Get("task", false); o != nil {
var tasks []*structs.Task
if err := parseTasks(&tasks, o); err != nil {
return err
}
result.TaskGroups = make([]*structs.TaskGroup, len(tasks), len(tasks)*2)
for i, t := range tasks {
result.TaskGroups[i] = &structs.TaskGroup{
Name: t.Name,
Count: 1,
Tasks: []*structs.Task{t},
}
}
}
// Parse the task groups
if o := obj.Get("group", false); o != nil {
if err := parseGroups(result, o); err != nil {
return fmt.Errorf("error parsing 'group': %s", err)
}
}
return nil
}
func parseGroups(result *structs.Job, obj *hclobj.Object) error {
// Get all the maps of keys to the actual object
objects := make(map[string]*hclobj.Object)
for _, o1 := range obj.Elem(false) {
for _, o2 := range o1.Elem(true) {
if _, ok := objects[o2.Key]; ok {
return fmt.Errorf(
"group '%s' defined more than once",
o2.Key)
}
objects[o2.Key] = o2
}
}
if len(objects) == 0 {
return nil
}
// Go through each object and turn it into an actual result.
collection := make([]*structs.TaskGroup, 0, len(objects))
for n, o := range objects {
var m map[string]interface{}
if err := hcl.DecodeObject(&m, o); err != nil {
return err
}
delete(m, "constraint")
delete(m, "meta")
delete(m, "task")
// Build the group with the basic decode
var g structs.TaskGroup
g.Name = n
if err := mapstructure.WeakDecode(m, &g); err != nil {
return err
}
// Parse constraints
if o := o.Get("constraint", false); o != nil {
if err := parseConstraints(&g.Constraints, o); err != nil {
return err
}
}
// Parse out meta fields. These are in HCL as a list so we need
// to iterate over them and merge them.
if metaO := o.Get("meta", false); metaO != nil {
for _, o := range metaO.Elem(false) {
var m map[string]interface{}
if err := hcl.DecodeObject(&m, o); err != nil {
return err
}
if err := mapstructure.WeakDecode(m, &g.Meta); err != nil {
return err
}
}
}
// Parse tasks
if o := o.Get("task", false); o != nil {
if err := parseTasks(&g.Tasks, o); err != nil {
return err
}
}
collection = append(collection, &g)
}
result.TaskGroups = append(result.TaskGroups, collection...)
return nil
}
func parseConstraints(result *[]*structs.Constraint, obj *hclobj.Object) error {
for _, o := range obj.Elem(false) {
var m map[string]interface{}
if err := hcl.DecodeObject(&m, o); err != nil {
return err
}
m["LTarget"] = m["attribute"]
m["RTarget"] = m["value"]
m["Operand"] = m["operator"]
// Build the constraint
var c structs.Constraint
if err := mapstructure.WeakDecode(m, &c); err != nil {
return err
}
if c.Operand == "" {
c.Operand = "="
}
*result = append(*result, &c)
}
return nil
}
func parseTasks(result *[]*structs.Task, obj *hclobj.Object) error {
// Get all the maps of keys to the actual object
objects := make([]*hclobj.Object, 0, 5)
set := make(map[string]struct{})
for _, o1 := range obj.Elem(false) {
for _, o2 := range o1.Elem(true) {
if _, ok := set[o2.Key]; ok {
return fmt.Errorf(
"group '%s' defined more than once",
o2.Key)
}
objects = append(objects, o2)
set[o2.Key] = struct{}{}
}
}
if len(objects) == 0 {
return nil
}
for _, o := range objects {
var m map[string]interface{}
if err := hcl.DecodeObject(&m, o); err != nil {
return err
}
delete(m, "config")
delete(m, "constraint")
delete(m, "meta")
delete(m, "resources")
// Build the task
var t structs.Task
t.Name = o.Key
if err := mapstructure.WeakDecode(m, &t); err != nil {
return err
}
// If we have config, then parse that
if o := o.Get("config", false); o != nil {
for _, o := range o.Elem(false) {
var m map[string]interface{}
if err := hcl.DecodeObject(&m, o); err != nil {
return err
}
if err := mapstructure.WeakDecode(m, &t.Config); err != nil {
return err
}
}
}
// Parse constraints
if o := o.Get("constraint", false); o != nil {
if err := parseConstraints(&t.Constraints, o); err != nil {
return err
}
}
// Parse out meta fields. These are in HCL as a list so we need
// to iterate over them and merge them.
if metaO := o.Get("meta", false); metaO != nil {
for _, o := range metaO.Elem(false) {
var m map[string]interface{}
if err := hcl.DecodeObject(&m, o); err != nil {
return err
}
if err := mapstructure.WeakDecode(m, &t.Meta); err != nil {
return err
}
}
}
// If we have resources, then parse that
if o := o.Get("resources", false); o != nil {
var r structs.Resources
if err := parseResources(&r, o); err != nil {
return fmt.Errorf("task '%s': %s", t.Name, err)
}
t.Resources = &r
}
*result = append(*result, &t)
}
return nil
}
func parseResources(result *structs.Resources, obj *hclobj.Object) error {
if obj.Len() > 1 {
return fmt.Errorf("only one 'resource' block allowed per task")
}
for _, o := range obj.Elem(false) {
var m map[string]interface{}
if err := hcl.DecodeObject(&m, o); err != nil {
return err
}
delete(m, "network")
if err := mapstructure.WeakDecode(m, result); err != nil {
return err
}
// Parse the network resources
if o := o.Get("network", false); o != nil {
if o.Len() > 1 {
return fmt.Errorf("only one 'network' resource allowed")
}
var r structs.NetworkResource
var m map[string]interface{}
if err := hcl.DecodeObject(&m, o); err != nil {
return err
}
if err := mapstructure.WeakDecode(m, &r); err != nil {
return err
}
result.Networks = []*structs.NetworkResource{&r}
}
}
return nil
}

157
jobspec/parse_test.go Normal file
View file

@ -0,0 +1,157 @@
package jobspec
import (
"path/filepath"
"reflect"
"testing"
"github.com/hashicorp/nomad/nomad/structs"
)
func TestParse(t *testing.T) {
cases := []struct {
File string
Result *structs.Job
Err bool
}{
{
"basic.hcl",
&structs.Job{
Name: "binstore-storagelocker",
Type: "service",
Priority: 50,
AllAtOnce: true,
Datacenters: []string{"us2", "eu1"},
Region: "global",
Meta: map[string]string{
"foo": "bar",
},
Constraints: []*structs.Constraint{
&structs.Constraint{
LTarget: "kernel.os",
RTarget: "windows",
Operand: "=",
},
},
TaskGroups: []*structs.TaskGroup{
&structs.TaskGroup{
Name: "outside",
Count: 1,
Tasks: []*structs.Task{
&structs.Task{
Name: "outside",
Driver: "java",
Config: map[string]string{
"jar": "s3://my-cool-store/foo.jar",
},
Meta: map[string]string{
"my-cool-key": "foobar",
},
},
},
},
&structs.TaskGroup{
Name: "binsl",
Count: 5,
Constraints: []*structs.Constraint{
&structs.Constraint{
LTarget: "kernel.os",
RTarget: "linux",
Operand: "=",
},
},
Meta: map[string]string{
"elb_mode": "tcp",
"elb_interval": "10",
"elb_checks": "3",
},
Tasks: []*structs.Task{
&structs.Task{
Name: "binstore",
Driver: "docker",
Config: map[string]string{
"image": "hashicorp/binstore",
},
Resources: &structs.Resources{
CPU: 500,
MemoryMB: 128,
Networks: []*structs.NetworkResource{
&structs.NetworkResource{
MBits: 100,
ReservedPorts: []int{1, 2, 3},
DynamicPorts: 3,
},
},
},
},
&structs.Task{
Name: "storagelocker",
Driver: "java",
Config: map[string]string{
"image": "hashicorp/storagelocker",
},
Resources: &structs.Resources{
CPU: 500,
MemoryMB: 128,
},
Constraints: []*structs.Constraint{
&structs.Constraint{
LTarget: "kernel.arch",
RTarget: "amd64",
Operand: "=",
},
},
},
},
},
},
},
false,
},
{
"multi-network.hcl",
nil,
true,
},
{
"multi-resource.hcl",
nil,
true,
},
{
"default-job.hcl",
&structs.Job{
Name: "foo",
Priority: 50,
Region: "global",
Type: "service",
},
false,
},
}
for _, tc := range cases {
path, err := filepath.Abs(filepath.Join("./test-fixtures", tc.File))
if err != nil {
t.Fatalf("file: %s\n\n%s", tc.File, err)
continue
}
actual, err := ParseFile(path)
if (err != nil) != tc.Err {
t.Fatalf("file: %s\n\n%s", tc.File, err)
continue
}
if !reflect.DeepEqual(actual, tc.Result) {
t.Fatalf("file: %s\n\n%#v\n\n%#v", tc.File, actual, tc.Result)
}
}
}

View file

@ -0,0 +1,72 @@
job "binstore-storagelocker" {
region = "global"
type = "service"
priority = 50
all_at_once = true
datacenters = ["us2", "eu1"]
meta {
foo = "bar"
}
constraint {
attribute = "kernel.os"
value = "windows"
}
task "outside" {
driver = "java"
config {
jar = "s3://my-cool-store/foo.jar"
}
meta {
my-cool-key = "foobar"
}
}
group "binsl" {
count = 5
task "binstore" {
driver = "docker"
config {
image = "hashicorp/binstore"
}
resources {
cpu = 500
memory = 128
network {
mbits = "100"
reserved_ports = [1,2,3]
dynamic_ports = 3
}
}
}
task "storagelocker" {
driver = "java"
config {
image = "hashicorp/storagelocker"
}
resources {
cpu = 500
memory = 128
}
constraint {
attribute = "kernel.arch"
value = "amd64"
}
}
constraint {
attribute = "kernel.os"
value = "linux"
}
meta {
elb_mode = "tcp"
elb_interval = 10
elb_checks = 3
}
}
}

View file

@ -0,0 +1 @@
job "foo" {}

View file

@ -0,0 +1,25 @@
job "binstore-storagelocker" {
group "binsl" {
count = 5
task "binstore" {
driver = "docker"
resources {
cpu = 500
memory = 128
network {
mbits = "100"
reserved_ports = [1,2,3]
dynamic_ports = 3
}
network {
mbits = "128"
reserved_ports = [1,2,3]
dynamic_ports = 3
}
}
}
}
}

View file

@ -0,0 +1,18 @@
job "binstore-storagelocker" {
group "binsl" {
count = 5
task "binstore" {
driver = "docker"
resources {
cpu = 500
memory = 128
}
resources {
cpu = 500
memory = 128
}
}
}
}

View file

@ -532,8 +532,8 @@ type NodeListStub struct {
// on a client
type Resources struct {
CPU float64
MemoryMB int
DiskMB int
MemoryMB int `mapstructure:"memory"`
DiskMB int `mapstructure:"disk"`
IOPS int
Networks []*NetworkResource
}
@ -602,15 +602,19 @@ func (r *Resources) Add(delta *Resources) error {
return nil
}
func (r *Resources) GoString() string {
return fmt.Sprintf("*%#v", *r)
}
// NetworkResource is used to represesent available network
// resources
type NetworkResource struct {
Device string // Name of the device
CIDR string // CIDR block of addresses
IP string // IP address
ReservedPorts []int // Reserved ports
MBits int // Throughput
DynamicPorts int // Dynamically assigned ports
ReservedPorts []int `mapstructure:"reserved_ports"` // Reserved ports
DynamicPorts int `mapstructure:"dynamic_ports"` // Dynamically assigned ports
}
// Copy returns a deep copy of the network resource
@ -634,6 +638,10 @@ func (n *NetworkResource) Add(delta *NetworkResource) {
n.DynamicPorts += delta.DynamicPorts
}
func (n *NetworkResource) GoString() string {
return fmt.Sprintf("*%#v", *n)
}
const (
// JobTypeNomad is reserved for internal system tasks and is
// always handled by the CoreScheduler.
@ -692,11 +700,14 @@ type Job struct {
// AllAtOnce is used to control if incremental scheduling of task groups
// is allowed or if we must do a gang scheduling of the entire job. This
// can slow down larger jobs if resources are not available.
AllAtOnce bool
AllAtOnce bool `mapstructure:"all_at_once"`
// Datacenters contains all the datacenters this job is allowed to span
Datacenters []string
// Region is the Nomad region that handles scheduling this job
Region string
// Constraints can be specified at a job level and apply to
// all the task groups and tasks.
Constraints []*Constraint
@ -807,6 +818,10 @@ func (tg *TaskGroup) LookupTask(name string) *Task {
return nil
}
func (tg *TaskGroup) GoString() string {
return fmt.Sprintf("*%#v", *tg)
}
// Task is a single process typically that is executed as part of a task group.
type Task struct {
// Name of the task
@ -830,6 +845,10 @@ type Task struct {
Meta map[string]string
}
func (t *Task) GoString() string {
return fmt.Sprintf("*%#v", *t)
}
// Constraints are used to restrict placement options in the case of
// a hard constraint, and used to prefer a placement in the case of
// a soft constraint.