From d156f32f948f9b2c2ab38eeca401b65237222995 Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Wed, 24 Aug 2016 13:51:15 -0500 Subject: [PATCH] Implemented job spec parsing for sticky volumes --- api/tasks.go | 7 +++++ jobspec/parse.go | 50 ++++++++++++++++++++++++++++++--- jobspec/parse_test.go | 24 ++++++++++------ jobspec/test-fixtures/basic.hcl | 5 ++++ nomad/structs/structs.go | 31 ++++++++++++++++++++ 5 files changed, 105 insertions(+), 12 deletions(-) diff --git a/api/tasks.go b/api/tasks.go index ab8886724..fff9d79bf 100644 --- a/api/tasks.go +++ b/api/tasks.go @@ -82,6 +82,12 @@ type Service struct { Checks []ServiceCheck } +// LocalDisk is an ephemeral disk object +type LocalDisk struct { + Sticky bool + DiskMB int `mapstructure:"disk"` +} + // TaskGroup is the unit of scheduling. type TaskGroup struct { Name string @@ -89,6 +95,7 @@ type TaskGroup struct { Constraints []*Constraint Tasks []*Task RestartPolicy *RestartPolicy + LocalDisk *LocalDisk Meta map[string]string } diff --git a/jobspec/parse.go b/jobspec/parse.go index b2d8eb8f7..37fa461e4 100644 --- a/jobspec/parse.go +++ b/jobspec/parse.go @@ -190,9 +190,10 @@ func parseJob(result *structs.Job, list *ast.ObjectList) error { result.TaskGroups = make([]*structs.TaskGroup, len(tasks), len(tasks)*2) for i, t := range tasks { result.TaskGroups[i] = &structs.TaskGroup{ - Name: t.Name, - Count: 1, - Tasks: []*structs.Task{t}, + Name: t.Name, + Count: 1, + LocalDisk: structs.DefaultLocalDisk(), + Tasks: []*structs.Task{t}, } } } @@ -240,6 +241,7 @@ func parseGroups(result *structs.Job, list *ast.ObjectList) error { "restart", "meta", "task", + "local_disk", } if err := checkHCLKeys(listVal, valid); err != nil { return multierror.Prefix(err, fmt.Sprintf("'%s' ->", n)) @@ -253,6 +255,7 @@ func parseGroups(result *structs.Job, list *ast.ObjectList) error { delete(m, "meta") delete(m, "task") delete(m, "restart") + delete(m, "local_disk") // Default count to 1 if not specified if _, ok := m["count"]; !ok { @@ -280,6 +283,14 @@ func parseGroups(result *structs.Job, list *ast.ObjectList) error { } } + // Parse local disk + g.LocalDisk = structs.DefaultLocalDisk() + if o := listVal.Filter("local_disk"); len(o.Items) > 0 { + if err := parseLocalDisk(&g.LocalDisk, o); err != nil { + return multierror.Prefix(err, fmt.Sprintf("'%s', local_disk ->", n)) + } + } + // Parse out meta fields. These are in HCL as a list so we need // to iterate over them and merge them. if metaO := listVal.Filter("meta"); len(metaO.Items) > 0 { @@ -417,6 +428,38 @@ func parseConstraints(result *[]*structs.Constraint, list *ast.ObjectList) error return nil } +func parseLocalDisk(result **structs.LocalDisk, list *ast.ObjectList) error { + list = list.Elem() + if len(list.Items) > 1 { + return fmt.Errorf("only one 'local_disk' block allowed") + } + + // Get our local_disk object + obj := list.Items[0] + + // Check for invalid keys + valid := []string{ + "sticky", + "disk", + } + if err := checkHCLKeys(obj.Val, valid); err != nil { + return err + } + + var m map[string]interface{} + if err := hcl.DecodeObject(&m, obj.Val); err != nil { + return err + } + + var localDisk structs.LocalDisk + if err := mapstructure.WeakDecode(m, &localDisk); err != nil { + return err + } + *result = &localDisk + + return nil +} + // parseBool takes an interface value and tries to convert it to a boolean and // returns an error if the type can't be converted. func parseBool(value interface{}) (bool, error) { @@ -835,7 +878,6 @@ func parseResources(result *structs.Resources, list *ast.ObjectList) error { // Check for invalid keys valid := []string{ "cpu", - "disk", "iops", "memory", "network", diff --git a/jobspec/parse_test.go b/jobspec/parse_test.go index 59ccfc0b2..e8f8821c7 100644 --- a/jobspec/parse_test.go +++ b/jobspec/parse_test.go @@ -49,8 +49,9 @@ func TestParse(t *testing.T) { TaskGroups: []*structs.TaskGroup{ &structs.TaskGroup{ - Name: "outside", - Count: 1, + Name: "outside", + Count: 1, + LocalDisk: structs.DefaultLocalDisk(), Tasks: []*structs.Task{ &structs.Task{ Name: "outside", @@ -87,6 +88,10 @@ func TestParse(t *testing.T) { Delay: 15 * time.Second, Mode: "delay", }, + LocalDisk: &structs.LocalDisk{ + Sticky: true, + DiskMB: 150, + }, Tasks: []*structs.Task{ &structs.Task{ Name: "binstore", @@ -313,8 +318,9 @@ func TestParse(t *testing.T) { TaskGroups: []*structs.TaskGroup{ &structs.TaskGroup{ - Name: "bar", - Count: 1, + Name: "bar", + Count: 1, + LocalDisk: structs.DefaultLocalDisk(), Tasks: []*structs.Task{ &structs.Task{ Name: "bar", @@ -356,8 +362,9 @@ func TestParse(t *testing.T) { TaskGroups: []*structs.TaskGroup{ &structs.TaskGroup{ - Name: "binsl", - Count: 1, + Name: "binsl", + Count: 1, + LocalDisk: structs.DefaultLocalDisk(), Tasks: []*structs.Task{ &structs.Task{ Name: "binstore", @@ -406,8 +413,9 @@ func TestParse(t *testing.T) { Region: "global", TaskGroups: []*structs.TaskGroup{ &structs.TaskGroup{ - Name: "group", - Count: 1, + Name: "group", + Count: 1, + LocalDisk: structs.DefaultLocalDisk(), Tasks: []*structs.Task{ &structs.Task{ Name: "task", diff --git a/jobspec/test-fixtures/basic.hcl b/jobspec/test-fixtures/basic.hcl index ed52a6a29..4371b7d7a 100644 --- a/jobspec/test-fixtures/basic.hcl +++ b/jobspec/test-fixtures/basic.hcl @@ -42,6 +42,11 @@ job "binstore-storagelocker" { mode = "delay" } + local_disk { + sticky = true + disk = 150 + } + task "binstore" { driver = "docker" user = "bob" diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 058ae16d3..8576aea50 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -1541,6 +1541,9 @@ type TaskGroup struct { // Tasks are the collection of tasks that this task group needs to run Tasks []*Task + // LocalDisk is the disk resources that the task group requests + LocalDisk *LocalDisk + // Meta is used to associate arbitrary metadata with this // task group. This is opaque to Nomad. Meta map[string]string @@ -1613,6 +1616,14 @@ func (tg *TaskGroup) Validate() error { mErr.Errors = append(mErr.Errors, fmt.Errorf("Task Group %v should have a restart policy", tg.Name)) } + if tg.LocalDisk != nil { + if err := tg.LocalDisk.Validate(); err != nil { + mErr.Errors = append(mErr.Errors, err) + } + } else { + mErr.Errors = append(mErr.Errors, fmt.Errorf("Task Group %v should have a local disk object", tg.Name)) + } + // Check for duplicate tasks tasks := make(map[string]int) for idx, task := range tg.Tasks { @@ -2552,6 +2563,26 @@ func (c *Constraint) Validate() error { return mErr.ErrorOrNil() } +// LocalDisk is an ephemeral disk object +type LocalDisk struct { + Sticky bool + DiskMB int `mapstructure:"disk"` +} + +// DefaultLocalDisk returns a LocalDisk with default configurations +func DefaultLocalDisk() *LocalDisk { + return &LocalDisk{ + DiskMB: 300, + } +} + +func (d *LocalDisk) Validate() error { + if d.DiskMB < 10 { + return fmt.Errorf("minimum DiskMB value is 10; got %d", d.DiskMB) + } + return nil +} + // Vault stores the set of premissions a task needs access to from Vault. type Vault struct { // Policies is the set of policies that the task needs access to