diff --git a/api/tasks.go b/api/tasks.go index ab8886724..6b37356ce 100644 --- a/api/tasks.go +++ b/api/tasks.go @@ -82,6 +82,12 @@ type Service struct { Checks []ServiceCheck } +// LocalDisk is an ephemeral disk object +type LocalDisk struct { + Sticky bool + DiskMB int `mapstructure:"disk"` +} + // TaskGroup is the unit of scheduling. type TaskGroup struct { Name string @@ -89,6 +95,7 @@ type TaskGroup struct { Constraints []*Constraint Tasks []*Task RestartPolicy *RestartPolicy + LocalDisk *LocalDisk Meta map[string]string } @@ -121,6 +128,12 @@ func (g *TaskGroup) AddTask(t *Task) *TaskGroup { return g } +// RequireDisk adds a local disk to the task group +func (g *TaskGroup) RequireDisk(disk *LocalDisk) *TaskGroup { + g.LocalDisk = disk + return g +} + // LogConfig provides configuration for log rotation type LogConfig struct { MaxFiles int diff --git a/api/util_test.go b/api/util_test.go index c91ae0738..ab2cefe80 100644 --- a/api/util_test.go +++ b/api/util_test.go @@ -1,8 +1,6 @@ package api -import ( - "testing" -) +import "testing" func assertQueryMeta(t *testing.T, qm *QueryMeta) { if qm.LastIndex == 0 { @@ -25,7 +23,6 @@ func testJob() *Job { Require(&Resources{ CPU: 100, MemoryMB: 256, - DiskMB: 25, IOPS: 10, }). SetLogConfig(&LogConfig{ @@ -34,7 +31,10 @@ func testJob() *Job { }) group := NewTaskGroup("group1", 1). - AddTask(task) + AddTask(task). + RequireDisk(&LocalDisk{ + DiskMB: 25, + }) job := NewBatchJob("job1", "redis", "region1", 1). AddDatacenter("dc1"). diff --git a/command/plan_test.go b/command/plan_test.go index 433531aa1..3bd7511ba 100644 --- a/command/plan_test.go +++ b/command/plan_test.go @@ -85,7 +85,6 @@ job "job1" { driver = "exec" resources = { cpu = 1000 - disk = 150 memory = 512 } } @@ -125,7 +124,6 @@ job "job1" { driver = "exec" resources = { cpu = 1000 - disk = 150 memory = 512 } } diff --git a/command/run_test.go b/command/run_test.go index 840925e45..f8a5212be 100644 --- a/command/run_test.go +++ b/command/run_test.go @@ -32,7 +32,6 @@ job "job1" { driver = "exec" resources = { cpu = 1000 - disk = 150 memory = 512 } } @@ -121,7 +120,6 @@ job "job1" { driver = "exec" resources = { cpu = 1000 - disk = 150 memory = 512 } } @@ -171,7 +169,6 @@ job "job1" { driver = "exec" resources = { cpu = 1000 - disk = 150 memory = 512 } } diff --git a/command/util_test.go b/command/util_test.go index 00fdd339d..3a0479cf6 100644 --- a/command/util_test.go +++ b/command/util_test.go @@ -45,7 +45,6 @@ func testJob(jobID string) *api.Job { SetConfig("exit_code", 0). Require(&api.Resources{ MemoryMB: 256, - DiskMB: 20, CPU: 100, }). SetLogConfig(&api.LogConfig{ @@ -54,7 +53,10 @@ func testJob(jobID string) *api.Job { }) group := api.NewTaskGroup("group1", 1). - AddTask(task) + AddTask(task). + RequireDisk(&api.LocalDisk{ + DiskMB: 20, + }) job := api.NewBatchJob(jobID, jobID, "region1", 1). AddDatacenter("dc1"). diff --git a/command/validate_test.go b/command/validate_test.go index ff9f42b5e..11be97511 100644 --- a/command/validate_test.go +++ b/command/validate_test.go @@ -32,7 +32,6 @@ job "job1" { driver = "exec" resources = { cpu = 1000 - disk = 150 memory = 512 } } @@ -126,7 +125,6 @@ job "job1" { driver = "exec" resources = { cpu = 1000 - disk = 150 memory = 512 } } diff --git a/jobspec/parse.go b/jobspec/parse.go index b2d8eb8f7..37fa461e4 100644 --- a/jobspec/parse.go +++ b/jobspec/parse.go @@ -190,9 +190,10 @@ func parseJob(result *structs.Job, list *ast.ObjectList) error { result.TaskGroups = make([]*structs.TaskGroup, len(tasks), len(tasks)*2) for i, t := range tasks { result.TaskGroups[i] = &structs.TaskGroup{ - Name: t.Name, - Count: 1, - Tasks: []*structs.Task{t}, + Name: t.Name, + Count: 1, + LocalDisk: structs.DefaultLocalDisk(), + Tasks: []*structs.Task{t}, } } } @@ -240,6 +241,7 @@ func parseGroups(result *structs.Job, list *ast.ObjectList) error { "restart", "meta", "task", + "local_disk", } if err := checkHCLKeys(listVal, valid); err != nil { return multierror.Prefix(err, fmt.Sprintf("'%s' ->", n)) @@ -253,6 +255,7 @@ func parseGroups(result *structs.Job, list *ast.ObjectList) error { delete(m, "meta") delete(m, "task") delete(m, "restart") + delete(m, "local_disk") // Default count to 1 if not specified if _, ok := m["count"]; !ok { @@ -280,6 +283,14 @@ func parseGroups(result *structs.Job, list *ast.ObjectList) error { } } + // Parse local disk + g.LocalDisk = structs.DefaultLocalDisk() + if o := listVal.Filter("local_disk"); len(o.Items) > 0 { + if err := parseLocalDisk(&g.LocalDisk, o); err != nil { + return multierror.Prefix(err, fmt.Sprintf("'%s', local_disk ->", n)) + } + } + // Parse out meta fields. These are in HCL as a list so we need // to iterate over them and merge them. if metaO := listVal.Filter("meta"); len(metaO.Items) > 0 { @@ -417,6 +428,38 @@ func parseConstraints(result *[]*structs.Constraint, list *ast.ObjectList) error return nil } +func parseLocalDisk(result **structs.LocalDisk, list *ast.ObjectList) error { + list = list.Elem() + if len(list.Items) > 1 { + return fmt.Errorf("only one 'local_disk' block allowed") + } + + // Get our local_disk object + obj := list.Items[0] + + // Check for invalid keys + valid := []string{ + "sticky", + "disk", + } + if err := checkHCLKeys(obj.Val, valid); err != nil { + return err + } + + var m map[string]interface{} + if err := hcl.DecodeObject(&m, obj.Val); err != nil { + return err + } + + var localDisk structs.LocalDisk + if err := mapstructure.WeakDecode(m, &localDisk); err != nil { + return err + } + *result = &localDisk + + return nil +} + // parseBool takes an interface value and tries to convert it to a boolean and // returns an error if the type can't be converted. func parseBool(value interface{}) (bool, error) { @@ -835,7 +878,6 @@ func parseResources(result *structs.Resources, list *ast.ObjectList) error { // Check for invalid keys valid := []string{ "cpu", - "disk", "iops", "memory", "network", diff --git a/jobspec/parse_test.go b/jobspec/parse_test.go index 59ccfc0b2..d429551dc 100644 --- a/jobspec/parse_test.go +++ b/jobspec/parse_test.go @@ -49,8 +49,9 @@ func TestParse(t *testing.T) { TaskGroups: []*structs.TaskGroup{ &structs.TaskGroup{ - Name: "outside", - Count: 1, + Name: "outside", + Count: 1, + LocalDisk: structs.DefaultLocalDisk(), Tasks: []*structs.Task{ &structs.Task{ Name: "outside", @@ -87,6 +88,10 @@ func TestParse(t *testing.T) { Delay: 15 * time.Second, Mode: "delay", }, + LocalDisk: &structs.LocalDisk{ + Sticky: true, + DiskMB: 150, + }, Tasks: []*structs.Task{ &structs.Task{ Name: "binstore", @@ -123,7 +128,6 @@ func TestParse(t *testing.T) { Resources: &structs.Resources{ CPU: 500, MemoryMB: 128, - DiskMB: 300, IOPS: 0, Networks: []*structs.NetworkResource{ &structs.NetworkResource{ @@ -168,7 +172,6 @@ func TestParse(t *testing.T) { Resources: &structs.Resources{ CPU: 500, MemoryMB: 128, - DiskMB: 300, IOPS: 30, }, Constraints: []*structs.Constraint{ @@ -313,8 +316,9 @@ func TestParse(t *testing.T) { TaskGroups: []*structs.TaskGroup{ &structs.TaskGroup{ - Name: "bar", - Count: 1, + Name: "bar", + Count: 1, + LocalDisk: structs.DefaultLocalDisk(), Tasks: []*structs.Task{ &structs.Task{ Name: "bar", @@ -356,8 +360,9 @@ func TestParse(t *testing.T) { TaskGroups: []*structs.TaskGroup{ &structs.TaskGroup{ - Name: "binsl", - Count: 1, + Name: "binsl", + Count: 1, + LocalDisk: structs.DefaultLocalDisk(), Tasks: []*structs.Task{ &structs.Task{ Name: "binstore", @@ -365,7 +370,6 @@ func TestParse(t *testing.T) { Resources: &structs.Resources{ CPU: 100, MemoryMB: 10, - DiskMB: 300, IOPS: 0, }, LogConfig: &structs.LogConfig{ @@ -406,8 +410,9 @@ func TestParse(t *testing.T) { Region: "global", TaskGroups: []*structs.TaskGroup{ &structs.TaskGroup{ - Name: "group", - Count: 1, + Name: "group", + Count: 1, + LocalDisk: structs.DefaultLocalDisk(), Tasks: []*structs.Task{ &structs.Task{ Name: "task", diff --git a/jobspec/test-fixtures/basic.hcl b/jobspec/test-fixtures/basic.hcl index ed52a6a29..4371b7d7a 100644 --- a/jobspec/test-fixtures/basic.hcl +++ b/jobspec/test-fixtures/basic.hcl @@ -42,6 +42,11 @@ job "binstore-storagelocker" { mode = "delay" } + local_disk { + sticky = true + disk = 150 + } + task "binstore" { driver = "docker" user = "bob" diff --git a/jobspec/test-fixtures/multi-resource.hcl b/jobspec/test-fixtures/multi-resource.hcl index 3058ea485..5e3197487 100644 --- a/jobspec/test-fixtures/multi-resource.hcl +++ b/jobspec/test-fixtures/multi-resource.hcl @@ -1,5 +1,11 @@ job "binstore-storagelocker" { group "binsl" { + local_disk { + disk = 500 + } + local_disk { + disk = 100 + } count = 5 task "binstore" { driver = "docker" diff --git a/nomad/fsm.go b/nomad/fsm.go index 318ad8bf3..c0783e18c 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -390,6 +390,14 @@ func (n *nomadFSM) applyAllocUpdate(buf []byte, index uint64) interface{} { // denormalized prior to being inserted into MemDB. for _, alloc := range req.Alloc { if alloc.Resources != nil { + // COMPAT 0.4.1 -> 0.5 + // Set the shared resources for allocations which don't have them + if alloc.SharedResources == nil { + alloc.SharedResources = &structs.Resources{ + DiskMB: alloc.Resources.DiskMB, + } + } + continue } @@ -397,6 +405,9 @@ func (n *nomadFSM) applyAllocUpdate(buf []byte, index uint64) interface{} { for _, task := range alloc.TaskResources { alloc.Resources.Add(task) } + + // Add the shared resources + alloc.Resources.Add(alloc.SharedResources) } if err := n.state.UpsertAllocs(index, req.Alloc); err != nil { diff --git a/nomad/fsm_test.go b/nomad/fsm_test.go index 805e365c4..1c3bd230d 100644 --- a/nomad/fsm_test.go +++ b/nomad/fsm_test.go @@ -644,6 +644,7 @@ func TestFSM_UpsertAllocs_StrippedResources(t *testing.T) { alloc.AllocModifyIndex = out.AllocModifyIndex // Resources should be recomputed + resources.DiskMB = alloc.Job.TaskGroups[0].LocalDisk.DiskMB alloc.Resources = resources if !reflect.DeepEqual(alloc, out) { t.Fatalf("bad: %#v %#v", alloc, out) @@ -933,6 +934,35 @@ func TestFSM_SnapshotRestore_Allocs(t *testing.T) { } } +func TestFSM_SnapshotRestore_Allocs_NoSharedResources(t *testing.T) { + // Add some state + fsm := testFSM(t) + state := fsm.State() + alloc1 := mock.Alloc() + alloc2 := mock.Alloc() + alloc1.SharedResources = nil + alloc2.SharedResources = nil + state.UpsertJobSummary(998, mock.JobSummary(alloc1.JobID)) + state.UpsertJobSummary(999, mock.JobSummary(alloc2.JobID)) + state.UpsertAllocs(1000, []*structs.Allocation{alloc1}) + state.UpsertAllocs(1001, []*structs.Allocation{alloc2}) + + // Verify the contents + fsm2 := testSnapshotRestore(t, fsm) + state2 := fsm2.State() + out1, _ := state2.AllocByID(alloc1.ID) + out2, _ := state2.AllocByID(alloc2.ID) + alloc1.SharedResources = &structs.Resources{DiskMB: 150} + alloc2.SharedResources = &structs.Resources{DiskMB: 150} + + if !reflect.DeepEqual(alloc1, out1) { + t.Fatalf("bad: \n%#v\n%#v", out1, alloc1) + } + if !reflect.DeepEqual(alloc2, out2) { + t.Fatalf("bad: \n%#v\n%#v", out2, alloc2) + } +} + func TestFSM_SnapshotRestore_Indexes(t *testing.T) { // Add some state fsm := testFSM(t) diff --git a/nomad/mock/mock.go b/nomad/mock/mock.go index 937d09783..4d3c129f2 100644 --- a/nomad/mock/mock.go +++ b/nomad/mock/mock.go @@ -79,6 +79,9 @@ func Job() *structs.Job { &structs.TaskGroup{ Name: "web", Count: 10, + LocalDisk: &structs.LocalDisk{ + DiskMB: 150, + }, RestartPolicy: &structs.RestartPolicy{ Attempts: 3, Interval: 10 * time.Minute, @@ -120,7 +123,6 @@ func Job() *structs.Job { Resources: &structs.Resources{ CPU: 500, MemoryMB: 256, - DiskMB: 150, Networks: []*structs.NetworkResource{ &structs.NetworkResource{ MBits: 50, @@ -178,6 +180,7 @@ func SystemJob() *structs.Job { Delay: 1 * time.Minute, Mode: structs.RestartPolicyModeDelay, }, + LocalDisk: structs.DefaultLocalDisk(), Tasks: []*structs.Task{ &structs.Task{ Name: "web", @@ -255,7 +258,7 @@ func Alloc() *structs.Allocation { Resources: &structs.Resources{ CPU: 500, MemoryMB: 256, - DiskMB: 10, + DiskMB: 150, Networks: []*structs.NetworkResource{ &structs.NetworkResource{ Device: "eth0", @@ -270,7 +273,6 @@ func Alloc() *structs.Allocation { "web": &structs.Resources{ CPU: 500, MemoryMB: 256, - DiskMB: 10, Networks: []*structs.NetworkResource{ &structs.NetworkResource{ Device: "eth0", @@ -282,6 +284,9 @@ func Alloc() *structs.Allocation { }, }, }, + SharedResources: &structs.Resources{ + DiskMB: 150, + }, Job: Job(), DesiredStatus: structs.AllocDesiredStatusRun, ClientStatus: structs.AllocClientStatusPending, diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index cee5935f3..a5a09d3ae 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -357,6 +357,24 @@ func (s *StateStore) UpsertJob(index uint64, job *structs.Job) error { return fmt.Errorf("unable to create job summary: %v", err) } + // COMPAT 0.4.1 -> 0.5 Create the LocalDisk if it's nil by adding up DiskMB + // from task resources + for _, tg := range job.TaskGroups { + if tg.LocalDisk != nil { + continue + } + var diskMB int + for _, task := range tg.Tasks { + if task.Resources != nil { + diskMB += task.Resources.DiskMB + task.Resources.DiskMB = 0 + } + } + tg.LocalDisk = &structs.LocalDisk{ + DiskMB: diskMB, + } + } + // Insert the job if err := txn.Insert("jobs", job); err != nil { return fmt.Errorf("job insert failed: %v", err) @@ -1690,6 +1708,25 @@ func (r *StateRestore) NodeRestore(node *structs.Node) error { func (r *StateRestore) JobRestore(job *structs.Job) error { r.items.Add(watch.Item{Table: "jobs"}) r.items.Add(watch.Item{Job: job.ID}) + + // COMPAT 0.4.1 -> 0.5 Create the LocalDisk if it's nil by adding up DiskMB + // from task resources + for _, tg := range job.TaskGroups { + if tg.LocalDisk != nil { + continue + } + var diskMB int + for _, task := range tg.Tasks { + if task.Resources != nil { + diskMB += task.Resources.DiskMB + task.Resources.DiskMB = 0 + } + } + tg.LocalDisk = &structs.LocalDisk{ + DiskMB: diskMB, + } + } + if err := r.txn.Insert("jobs", job); err != nil { return fmt.Errorf("job insert failed: %v", err) } @@ -1713,6 +1750,15 @@ func (r *StateRestore) AllocRestore(alloc *structs.Allocation) error { r.items.Add(watch.Item{AllocEval: alloc.EvalID}) r.items.Add(watch.Item{AllocJob: alloc.JobID}) r.items.Add(watch.Item{AllocNode: alloc.NodeID}) + + //COMPAT 0.4.1 -> 0.5 + // Set the shared resources if it's not present + if alloc.SharedResources == nil { + alloc.SharedResources = &structs.Resources{ + DiskMB: alloc.Resources.DiskMB, + } + } + if err := r.txn.Insert("allocs", alloc); err != nil { return fmt.Errorf("alloc insert failed: %v", err) } diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index f272b9fb1..ddd31353f 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -433,6 +433,40 @@ func TestStateStore_UpdateUpsertJob_Job(t *testing.T) { notify.verify(t) } +// This test ensures that UpsertJob creates the LocalDisk is a job doesn't have +// one and clear out the task's disk resource asks +// COMPAT 0.4.1 -> 0.5 +func TestStateStore_UpsertJob_NoLocalDisk(t *testing.T) { + state := testStateStore(t) + job := mock.Job() + + // Set the LocalDisk to nil and set the tasks's DiskMB to 150 + job.TaskGroups[0].LocalDisk = nil + job.TaskGroups[0].Tasks[0].Resources.DiskMB = 150 + + err := state.UpsertJob(1000, job) + if err != nil { + t.Fatalf("err: %v", err) + } + + out, err := state.JobByID(job.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Expect the state store to create the LocalDisk and clear out Tasks's + // DiskMB + expected := job.Copy() + expected.TaskGroups[0].LocalDisk = &structs.LocalDisk{ + DiskMB: 150, + } + expected.TaskGroups[0].Tasks[0].Resources.DiskMB = 0 + + if !reflect.DeepEqual(expected, out) { + t.Fatalf("bad: %#v %#v", expected, out) + } +} + func TestStateStore_DeleteJob_Job(t *testing.T) { state := testStateStore(t) job := mock.Job() @@ -813,6 +847,51 @@ func TestStateStore_RestoreJob(t *testing.T) { notify.verify(t) } +// This test ensures that the state restore creates the LocalDisk for a job if +// it doesn't have one +// COMPAT 0.4.1 -> 0.5 +func TestStateStore_Jobs_NoLocalDisk(t *testing.T) { + state := testStateStore(t) + job := mock.Job() + + // Set LocalDisk to nil and set the DiskMB to 150 + job.TaskGroups[0].LocalDisk = nil + job.TaskGroups[0].Tasks[0].Resources.DiskMB = 150 + + notify := setupNotifyTest( + state, + watch.Item{Table: "jobs"}, + watch.Item{Job: job.ID}) + + restore, err := state.Restore() + if err != nil { + t.Fatalf("err: %v", err) + } + + err = restore.JobRestore(job) + if err != nil { + t.Fatalf("err: %v", err) + } + restore.Commit() + + out, err := state.JobByID(job.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Expect job to have local disk and clear out the task's disk resource ask + expected := job.Copy() + expected.TaskGroups[0].LocalDisk = &structs.LocalDisk{ + DiskMB: 150, + } + expected.TaskGroups[0].Tasks[0].Resources.DiskMB = 0 + if !reflect.DeepEqual(out, expected) { + t.Fatalf("Bad: %#v %#v", out, job) + } + + notify.verify(t) +} + func TestStateStore_UpsertPeriodicLaunch(t *testing.T) { state := testStateStore(t) job := mock.Job() diff --git a/nomad/structs/diff.go b/nomad/structs/diff.go index eabc3ec4b..132215ae2 100644 --- a/nomad/structs/diff.go +++ b/nomad/structs/diff.go @@ -223,6 +223,12 @@ func (tg *TaskGroup) Diff(other *TaskGroup, contextual bool) (*TaskGroupDiff, er diff.Objects = append(diff.Objects, rDiff) } + // LocalDisk diff + diskDiff := primitiveObjectDiff(tg.LocalDisk, other.LocalDisk, nil, "LocalDisk", contextual) + if diskDiff != nil { + diff.Objects = append(diff.Objects, diskDiff) + } + // Tasks diff tasks, err := taskDiffs(tg.Tasks, other.Tasks, contextual) if err != nil { diff --git a/nomad/structs/diff_test.go b/nomad/structs/diff_test.go index eec7979b2..edd73a913 100644 --- a/nomad/structs/diff_test.go +++ b/nomad/structs/diff_test.go @@ -1276,6 +1276,151 @@ func TestTaskGroupDiff(t *testing.T) { }, }, }, + { + // LocalDisk added + Old: &TaskGroup{}, + New: &TaskGroup{ + LocalDisk: &LocalDisk{ + Sticky: true, + DiskMB: 100, + }, + }, + Expected: &TaskGroupDiff{ + Type: DiffTypeEdited, + Objects: []*ObjectDiff{ + { + Type: DiffTypeAdded, + Name: "LocalDisk", + Fields: []*FieldDiff{ + { + Type: DiffTypeAdded, + Name: "DiskMB", + Old: "", + New: "100", + }, + { + Type: DiffTypeAdded, + Name: "Sticky", + Old: "", + New: "true", + }, + }, + }, + }, + }, + }, + { + // LocalDisk deleted + Old: &TaskGroup{ + LocalDisk: &LocalDisk{ + Sticky: true, + DiskMB: 100, + }, + }, + New: &TaskGroup{}, + Expected: &TaskGroupDiff{ + Type: DiffTypeEdited, + Objects: []*ObjectDiff{ + { + Type: DiffTypeDeleted, + Name: "LocalDisk", + Fields: []*FieldDiff{ + { + Type: DiffTypeDeleted, + Name: "DiskMB", + Old: "100", + New: "", + }, + { + Type: DiffTypeDeleted, + Name: "Sticky", + Old: "true", + New: "", + }, + }, + }, + }, + }, + }, + { + // LocalDisk edited + Old: &TaskGroup{ + LocalDisk: &LocalDisk{ + Sticky: true, + DiskMB: 150, + }, + }, + New: &TaskGroup{ + LocalDisk: &LocalDisk{ + Sticky: false, + DiskMB: 90, + }, + }, + Expected: &TaskGroupDiff{ + Type: DiffTypeEdited, + Objects: []*ObjectDiff{ + { + Type: DiffTypeEdited, + Name: "LocalDisk", + Fields: []*FieldDiff{ + { + Type: DiffTypeEdited, + Name: "DiskMB", + Old: "150", + New: "90", + }, + + { + Type: DiffTypeEdited, + Name: "Sticky", + Old: "true", + New: "false", + }, + }, + }, + }, + }, + }, + { + // LocalDisk edited with context + Contextual: true, + Old: &TaskGroup{ + LocalDisk: &LocalDisk{ + Sticky: false, + DiskMB: 100, + }, + }, + New: &TaskGroup{ + LocalDisk: &LocalDisk{ + Sticky: true, + DiskMB: 90, + }, + }, + Expected: &TaskGroupDiff{ + Type: DiffTypeEdited, + Objects: []*ObjectDiff{ + { + Type: DiffTypeEdited, + Name: "LocalDisk", + Fields: []*FieldDiff{ + { + Type: DiffTypeEdited, + Name: "DiskMB", + Old: "100", + New: "90", + }, + + { + Type: DiffTypeEdited, + Name: "Sticky", + Old: "false", + New: "true", + }, + }, + }, + }, + }, + }, { // Tasks edited Old: &TaskGroup{ diff --git a/nomad/structs/funcs.go b/nomad/structs/funcs.go index 37df72c33..08da1bd85 100644 --- a/nomad/structs/funcs.go +++ b/nomad/structs/funcs.go @@ -63,6 +63,12 @@ func AllocsFit(node *Node, allocs []*Allocation, netIdx *NetworkIndex) (bool, st return false, "", nil, err } } else if alloc.TaskResources != nil { + + // Adding the shared resource asks for the allocation to the used + // resources + if err := used.Add(alloc.SharedResources); err != nil { + return false, "", nil, err + } // Allocations within the plan have the combined resources stripped // to save space, so sum up the individual task resources. for _, taskResource := range alloc.TaskResources { diff --git a/nomad/structs/funcs_test.go b/nomad/structs/funcs_test.go index be34a8308..ad5d044e0 100644 --- a/nomad/structs/funcs_test.go +++ b/nomad/structs/funcs_test.go @@ -61,6 +61,14 @@ func TestAllocsFit_PortsOvercommitted(t *testing.T) { } a1 := &Allocation{ + Job: &Job{ + TaskGroups: []*TaskGroup{ + { + Name: "web", + LocalDisk: DefaultLocalDisk(), + }, + }, + }, TaskResources: map[string]*Resources{ "web": &Resources{ Networks: []*NetworkResource{ diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 058ae16d3..791bdcedd 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -772,7 +772,6 @@ func DefaultResources() *Resources { return &Resources{ CPU: 100, MemoryMB: 10, - DiskMB: 300, IOPS: 0, } } @@ -823,9 +822,6 @@ func (r *Resources) MeetsMinResources() error { if r.MemoryMB < 10 { mErr.Errors = append(mErr.Errors, fmt.Errorf("minimum MemoryMB value is 10; got %d", r.MemoryMB)) } - if r.DiskMB < 10 { - mErr.Errors = append(mErr.Errors, fmt.Errorf("minimum DiskMB value is 10; got %d", r.DiskMB)) - } if r.IOPS < 0 { mErr.Errors = append(mErr.Errors, fmt.Errorf("minimum IOPS value is 0; got %d", r.IOPS)) } @@ -1541,6 +1537,9 @@ type TaskGroup struct { // Tasks are the collection of tasks that this task group needs to run Tasks []*Task + // LocalDisk is the disk resources that the task group requests + LocalDisk *LocalDisk + // Meta is used to associate arbitrary metadata with this // task group. This is opaque to Nomad. Meta map[string]string @@ -1565,6 +1564,10 @@ func (tg *TaskGroup) Copy() *TaskGroup { } ntg.Meta = CopyMapStringString(ntg.Meta) + + if tg.LocalDisk != nil { + ntg.LocalDisk = tg.LocalDisk.Copy() + } return ntg } @@ -1613,6 +1616,14 @@ func (tg *TaskGroup) Validate() error { mErr.Errors = append(mErr.Errors, fmt.Errorf("Task Group %v should have a restart policy", tg.Name)) } + if tg.LocalDisk != nil { + if err := tg.LocalDisk.Validate(); err != nil { + mErr.Errors = append(mErr.Errors, err) + } + } else { + mErr.Errors = append(mErr.Errors, fmt.Errorf("Task Group %v should have a local disk object", tg.Name)) + } + // Check for duplicate tasks tasks := make(map[string]int) for idx, task := range tg.Tasks { @@ -1627,7 +1638,7 @@ func (tg *TaskGroup) Validate() error { // Validate the tasks for _, task := range tg.Tasks { - if err := task.Validate(); err != nil { + if err := task.Validate(tg.LocalDisk); err != nil { outer := fmt.Errorf("Task %s validation failed: %s", task.Name, err) mErr.Errors = append(mErr.Errors, outer) } @@ -2025,7 +2036,7 @@ func (t *Task) FindHostAndPortFor(portLabel string) (string, int) { } // Validate is used to sanity check a task -func (t *Task) Validate() error { +func (t *Task) Validate(localDisk *LocalDisk) error { var mErr multierror.Error if t.Name == "" { mErr.Errors = append(mErr.Errors, errors.New("Missing task name")) @@ -2049,6 +2060,13 @@ func (t *Task) Validate() error { mErr.Errors = append(mErr.Errors, err) } + // Ensure the task isn't asking for disk resources + if t.Resources != nil { + if t.Resources.DiskMB > 0 { + mErr.Errors = append(mErr.Errors, errors.New("Task can't ask for disk resources, they have to be specified at the task group level.")) + } + } + // Validate the log config if t.LogConfig == nil { mErr.Errors = append(mErr.Errors, errors.New("Missing Log Config")) @@ -2068,12 +2086,12 @@ func (t *Task) Validate() error { mErr.Errors = append(mErr.Errors, err) } - if t.LogConfig != nil && t.Resources != nil { + if t.LogConfig != nil && localDisk != nil { logUsage := (t.LogConfig.MaxFiles * t.LogConfig.MaxFileSizeMB) - if t.Resources.DiskMB <= logUsage { + if localDisk.DiskMB <= logUsage { mErr.Errors = append(mErr.Errors, fmt.Errorf("log storage (%d MB) must be less than requested disk capacity (%d MB)", - logUsage, t.Resources.DiskMB)) + logUsage, localDisk.DiskMB)) } } @@ -2552,6 +2570,37 @@ func (c *Constraint) Validate() error { return mErr.ErrorOrNil() } +// LocalDisk is an ephemeral disk object +type LocalDisk struct { + // Sticky indicates whether the allocation is sticky to a node + Sticky bool + + // DiskMB is the size of the local disk + DiskMB int `mapstructure:"disk"` +} + +// DefaultLocalDisk returns a LocalDisk with default configurations +func DefaultLocalDisk() *LocalDisk { + return &LocalDisk{ + DiskMB: 300, + } +} + +// Validate validates LocalDisk +func (d *LocalDisk) Validate() error { + if d.DiskMB < 10 { + return fmt.Errorf("minimum DiskMB value is 10; got %d", d.DiskMB) + } + return nil +} + +// Copy copies the LocalDisk struct and returns a new one +func (d *LocalDisk) Copy() *LocalDisk { + ld := new(LocalDisk) + *ld = *d + return ld +} + // Vault stores the set of premissions a task needs access to from Vault. type Vault struct { // Policies is the set of policies that the task needs access to @@ -2623,6 +2672,10 @@ type Allocation struct { // of this allocation of the task group. Resources *Resources + // SharedResources are the resources that are shared by all the tasks in an + // allocation + SharedResources *Resources + // TaskResources is the set of resources allocated to each // task. These should sum to the total Resources. TaskResources map[string]*Resources @@ -2670,6 +2723,7 @@ func (a *Allocation) Copy() *Allocation { na.Job = na.Job.Copy() na.Resources = na.Resources.Copy() + na.SharedResources = na.SharedResources.Copy() if a.TaskResources != nil { tr := make(map[string]*Resources, len(na.TaskResources)) diff --git a/nomad/structs/structs_test.go b/nomad/structs/structs_test.go index db9857571..214788529 100644 --- a/nomad/structs/structs_test.go +++ b/nomad/structs/structs_test.go @@ -115,8 +115,9 @@ func testJob() *Job { }, TaskGroups: []*TaskGroup{ &TaskGroup{ - Name: "web", - Count: 10, + Name: "web", + Count: 10, + LocalDisk: DefaultLocalDisk(), RestartPolicy: &RestartPolicy{ Mode: RestartPolicyModeFail, Attempts: 3, @@ -147,7 +148,6 @@ func testJob() *Job { Resources: &Resources{ CPU: 500, MemoryMB: 256, - DiskMB: 20, Networks: []*NetworkResource{ &NetworkResource{ MBits: 50, @@ -345,20 +345,24 @@ func TestTaskGroup_Validate(t *testing.T) { err = tg.Validate() mErr = err.(*multierror.Error) - if !strings.Contains(mErr.Errors[0].Error(), "2 redefines 'web' from task 1") { + if !strings.Contains(mErr.Errors[0].Error(), "should have a local disk object") { t.Fatalf("err: %s", err) } - if !strings.Contains(mErr.Errors[1].Error(), "Task 3 missing name") { + if !strings.Contains(mErr.Errors[1].Error(), "2 redefines 'web' from task 1") { t.Fatalf("err: %s", err) } - if !strings.Contains(mErr.Errors[2].Error(), "Task web validation failed") { + if !strings.Contains(mErr.Errors[2].Error(), "Task 3 missing name") { + t.Fatalf("err: %s", err) + } + if !strings.Contains(mErr.Errors[3].Error(), "Task web validation failed") { t.Fatalf("err: %s", err) } } func TestTask_Validate(t *testing.T) { task := &Task{} - err := task.Validate() + localDisk := DefaultLocalDisk() + err := task.Validate(localDisk) mErr := err.(*multierror.Error) if !strings.Contains(mErr.Errors[0].Error(), "task name") { t.Fatalf("err: %s", err) @@ -371,7 +375,7 @@ func TestTask_Validate(t *testing.T) { } task = &Task{Name: "web/foo"} - err = task.Validate() + err = task.Validate(localDisk) mErr = err.(*multierror.Error) if !strings.Contains(mErr.Errors[0].Error(), "slashes") { t.Fatalf("err: %s", err) @@ -382,13 +386,13 @@ func TestTask_Validate(t *testing.T) { Driver: "docker", Resources: &Resources{ CPU: 100, - DiskMB: 200, MemoryMB: 100, IOPS: 10, }, LogConfig: DefaultLogConfig(), } - err = task.Validate() + localDisk.DiskMB = 200 + err = task.Validate(localDisk) if err != nil { t.Fatalf("err: %s", err) } @@ -416,18 +420,20 @@ func TestTask_Validate_Services(t *testing.T) { Name: "service-name", } + localDisk := DefaultLocalDisk() task := &Task{ Name: "web", Driver: "docker", Resources: &Resources{ CPU: 100, - DiskMB: 200, MemoryMB: 100, IOPS: 10, }, Services: []*Service{s1, s2}, } - err := task.Validate() + localDisk.DiskMB = 200 + + err := task.Validate(localDisk) if err == nil { t.Fatal("expected an error") } @@ -494,12 +500,12 @@ func TestTask_Validate_Service_Check(t *testing.T) { func TestTask_Validate_LogConfig(t *testing.T) { task := &Task{ LogConfig: DefaultLogConfig(), - Resources: &Resources{ - DiskMB: 1, - }, + } + localDisk := &LocalDisk{ + DiskMB: 1, } - err := task.Validate() + err := task.Validate(localDisk) mErr := err.(*multierror.Error) if !strings.Contains(mErr.Errors[3].Error(), "log storage") { t.Fatalf("err: %s", err) diff --git a/scheduler/generic_sched.go b/scheduler/generic_sched.go index 48b1c65b4..40590c2bb 100644 --- a/scheduler/generic_sched.go +++ b/scheduler/generic_sched.go @@ -455,6 +455,10 @@ func (s *GenericScheduler) computePlacements(place []allocTuple) error { TaskResources: option.TaskResources, DesiredStatus: structs.AllocDesiredStatusRun, ClientStatus: structs.AllocClientStatusPending, + + SharedResources: &structs.Resources{ + DiskMB: missing.TaskGroup.LocalDisk.DiskMB, + }, } // If the new allocation is replacing an older allocation then we diff --git a/scheduler/generic_sched_test.go b/scheduler/generic_sched_test.go index a5b37c988..03fe89b11 100644 --- a/scheduler/generic_sched_test.go +++ b/scheduler/generic_sched_test.go @@ -50,8 +50,8 @@ func TestServiceSched_JobRegister(t *testing.T) { } // Ensure the eval has no spawned blocked eval - if len(h.Evals) != 1 { - t.Fatalf("bad: %#v", h.Evals) + if len(h.CreateEvals) != 0 { + t.Fatalf("bad: %#v", h.CreateEvals) if h.Evals[0].BlockedEval != "" { t.Fatalf("bad: %#v", h.Evals[0]) } @@ -91,6 +91,71 @@ func TestServiceSched_JobRegister(t *testing.T) { h.AssertEvalStatus(t, structs.EvalStatusComplete) } +func TestServiceSched_JobRegister_DiskConstraints(t *testing.T) { + h := NewHarness(t) + + // Create a node + node := mock.Node() + noErr(t, h.State.UpsertNode(h.NextIndex(), node)) + + // Create a job with count 2 and disk as 60GB so that only one allocation + // can fit + job := mock.Job() + job.TaskGroups[0].Count = 2 + job.TaskGroups[0].LocalDisk.DiskMB = 88 * 1024 + noErr(t, h.State.UpsertJob(h.NextIndex(), job)) + + // Create a mock evaluation to register the job + eval := &structs.Evaluation{ + ID: structs.GenerateUUID(), + Priority: job.Priority, + TriggeredBy: structs.EvalTriggerJobRegister, + JobID: job.ID, + } + + // Process the evaluation + err := h.Process(NewServiceScheduler, eval) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Ensure a single plan + if len(h.Plans) != 1 { + t.Fatalf("bad: %#v", h.Plans) + } + plan := h.Plans[0] + + // Ensure the plan doesn't have annotations. + if plan.Annotations != nil { + t.Fatalf("expected no annotations") + } + + // Ensure the eval has a blocked eval + if len(h.CreateEvals) != 1 { + t.Fatalf("bad: %#v", h.CreateEvals) + } + + // Ensure the plan allocated only one allocation + var planned []*structs.Allocation + for _, allocList := range plan.NodeAllocation { + planned = append(planned, allocList...) + } + if len(planned) != 1 { + t.Fatalf("bad: %#v", plan) + } + + // Lookup the allocations by JobID + out, err := h.State.AllocsByJob(job.ID) + noErr(t, err) + + // Ensure only one allocation was placed + if len(out) != 1 { + t.Fatalf("bad: %#v", out) + } + + h.AssertEvalStatus(t, structs.EvalStatusComplete) +} + func TestServiceSched_JobRegister_Annotate(t *testing.T) { h := NewHarness(t) diff --git a/scheduler/rank.go b/scheduler/rank.go index b3610078c..9834e9689 100644 --- a/scheduler/rank.go +++ b/scheduler/rank.go @@ -131,11 +131,11 @@ func (iter *StaticRankIterator) Reset() { // BinPackIterator is a RankIterator that scores potential options // based on a bin-packing algorithm. type BinPackIterator struct { - ctx Context - source RankIterator - evict bool - priority int - tasks []*structs.Task + ctx Context + source RankIterator + evict bool + priority int + taskGroup *structs.TaskGroup } // NewBinPackIterator returns a BinPackIterator which tries to fit tasks @@ -154,8 +154,8 @@ func (iter *BinPackIterator) SetPriority(p int) { iter.priority = p } -func (iter *BinPackIterator) SetTasks(tasks []*structs.Task) { - iter.tasks = tasks +func (iter *BinPackIterator) SetTaskGroup(taskGroup *structs.TaskGroup) { + iter.taskGroup = taskGroup } func (iter *BinPackIterator) Next() *RankedNode { @@ -182,8 +182,10 @@ OUTER: netIdx.AddAllocs(proposed) // Assign the resources for each task - total := new(structs.Resources) - for _, task := range iter.tasks { + total := &structs.Resources{ + DiskMB: iter.taskGroup.LocalDisk.DiskMB, + } + for _, task := range iter.taskGroup.Tasks { taskResources := task.Resources.Copy() // Check if we need a network resource diff --git a/scheduler/rank_test.go b/scheduler/rank_test.go index f33cd5521..071df4983 100644 --- a/scheduler/rank_test.go +++ b/scheduler/rank_test.go @@ -68,16 +68,20 @@ func TestBinPackIterator_NoExistingAlloc(t *testing.T) { } static := NewStaticRankIterator(ctx, nodes) - task := &structs.Task{ - Name: "web", - Resources: &structs.Resources{ - CPU: 1024, - MemoryMB: 1024, + taskGroup := &structs.TaskGroup{ + LocalDisk: &structs.LocalDisk{}, + Tasks: []*structs.Task{ + { + Name: "web", + Resources: &structs.Resources{ + CPU: 1024, + MemoryMB: 1024, + }, + }, }, } - binp := NewBinPackIterator(ctx, static, false, 0) - binp.SetTasks([]*structs.Task{task}) + binp.SetTaskGroup(taskGroup) out := collectRanked(binp) if len(out) != 2 { @@ -142,16 +146,21 @@ func TestBinPackIterator_PlannedAlloc(t *testing.T) { }, } - task := &structs.Task{ - Name: "web", - Resources: &structs.Resources{ - CPU: 1024, - MemoryMB: 1024, + taskGroup := &structs.TaskGroup{ + LocalDisk: &structs.LocalDisk{}, + Tasks: []*structs.Task{ + { + Name: "web", + Resources: &structs.Resources{ + CPU: 1024, + MemoryMB: 1024, + }, + }, }, } binp := NewBinPackIterator(ctx, static, false, 0) - binp.SetTasks([]*structs.Task{task}) + binp.SetTaskGroup(taskGroup) out := collectRanked(binp) if len(out) != 1 { @@ -223,16 +232,20 @@ func TestBinPackIterator_ExistingAlloc(t *testing.T) { noErr(t, state.UpsertJobSummary(999, mock.JobSummary(alloc2.JobID))) noErr(t, state.UpsertAllocs(1000, []*structs.Allocation{alloc1, alloc2})) - task := &structs.Task{ - Name: "web", - Resources: &structs.Resources{ - CPU: 1024, - MemoryMB: 1024, + taskGroup := &structs.TaskGroup{ + LocalDisk: &structs.LocalDisk{}, + Tasks: []*structs.Task{ + { + Name: "web", + Resources: &structs.Resources{ + CPU: 1024, + MemoryMB: 1024, + }, + }, }, } - binp := NewBinPackIterator(ctx, static, false, 0) - binp.SetTasks([]*structs.Task{task}) + binp.SetTaskGroup(taskGroup) out := collectRanked(binp) if len(out) != 1 { @@ -307,16 +320,21 @@ func TestBinPackIterator_ExistingAlloc_PlannedEvict(t *testing.T) { plan := ctx.Plan() plan.NodeUpdate[nodes[0].Node.ID] = []*structs.Allocation{alloc1} - task := &structs.Task{ - Name: "web", - Resources: &structs.Resources{ - CPU: 1024, - MemoryMB: 1024, + taskGroup := &structs.TaskGroup{ + LocalDisk: &structs.LocalDisk{}, + Tasks: []*structs.Task{ + { + Name: "web", + Resources: &structs.Resources{ + CPU: 1024, + MemoryMB: 1024, + }, + }, }, } binp := NewBinPackIterator(ctx, static, false, 0) - binp.SetTasks([]*structs.Task{task}) + binp.SetTaskGroup(taskGroup) out := collectRanked(binp) if len(out) != 2 { diff --git a/scheduler/stack.go b/scheduler/stack.go index 98681980e..bea8c91e6 100644 --- a/scheduler/stack.go +++ b/scheduler/stack.go @@ -154,7 +154,7 @@ func (s *GenericStack) Select(tg *structs.TaskGroup) (*RankedNode, *structs.Reso s.taskGroupConstraint.SetConstraints(tgConstr.constraints) s.proposedAllocConstraint.SetTaskGroup(tg) s.wrappedChecks.SetTaskGroup(tg.Name) - s.binPack.SetTasks(tg.Tasks) + s.binPack.SetTaskGroup(tg) // Find the node with the max score option := s.maxScore.Next() @@ -242,7 +242,7 @@ func (s *SystemStack) Select(tg *structs.TaskGroup) (*RankedNode, *structs.Resou // Update the parameters of iterators s.taskGroupDrivers.SetDrivers(tgConstr.drivers) s.taskGroupConstraint.SetConstraints(tgConstr.constraints) - s.binPack.SetTasks(tg.Tasks) + s.binPack.SetTaskGroup(tg) s.wrappedChecks.SetTaskGroup(tg.Name) // Get the next option that satisfies the constraints. diff --git a/scheduler/system_sched.go b/scheduler/system_sched.go index df1badf36..098560e35 100644 --- a/scheduler/system_sched.go +++ b/scheduler/system_sched.go @@ -319,6 +319,10 @@ func (s *SystemScheduler) computePlacements(place []allocTuple) error { TaskResources: option.TaskResources, DesiredStatus: structs.AllocDesiredStatusRun, ClientStatus: structs.AllocClientStatusPending, + + SharedResources: &structs.Resources{ + DiskMB: missing.TaskGroup.LocalDisk.DiskMB, + }, } // If the new allocation is replacing an older allocation then we diff --git a/scheduler/system_sched_test.go b/scheduler/system_sched_test.go index 3ad4fd852..043fbe854 100644 --- a/scheduler/system_sched_test.go +++ b/scheduler/system_sched_test.go @@ -80,6 +80,68 @@ func TestSystemSched_JobRegister(t *testing.T) { h.AssertEvalStatus(t, structs.EvalStatusComplete) } +func TestSystemSched_JobRegister_LocalDiskConstraint(t *testing.T) { + h := NewHarness(t) + + // Create a nodes + node := mock.Node() + noErr(t, h.State.UpsertNode(h.NextIndex(), node)) + + // Create a job + job := mock.SystemJob() + job.TaskGroups[0].LocalDisk.DiskMB = 60 * 1024 + noErr(t, h.State.UpsertJob(h.NextIndex(), job)) + + // Create another job with a lot of disk resource ask so that it doesn't fit + // the node + job1 := mock.SystemJob() + job1.TaskGroups[0].LocalDisk.DiskMB = 60 * 1024 + noErr(t, h.State.UpsertJob(h.NextIndex(), job1)) + + // Create a mock evaluation to register the job + eval := &structs.Evaluation{ + ID: structs.GenerateUUID(), + Priority: job.Priority, + TriggeredBy: structs.EvalTriggerJobRegister, + JobID: job.ID, + } + + // Process the evaluation + if err := h.Process(NewSystemScheduler, eval); err != nil { + t.Fatalf("err: %v", err) + } + + // Lookup the allocations by JobID + out, err := h.State.AllocsByJob(job.ID) + noErr(t, err) + + // Ensure all allocations placed + if len(out) != 1 { + t.Fatalf("bad: %#v", out) + } + + // Create a new harness to test the scheduling result for the second job + h1 := NewHarnessWithState(t, h.State) + // Create a mock evaluation to register the job + eval1 := &structs.Evaluation{ + ID: structs.GenerateUUID(), + Priority: job1.Priority, + TriggeredBy: structs.EvalTriggerJobRegister, + JobID: job1.ID, + } + + // Process the evaluation + if err := h1.Process(NewSystemScheduler, eval1); err != nil { + t.Fatalf("err: %v", err) + } + + out, err = h1.State.AllocsByJob(job1.ID) + noErr(t, err) + if len(out) != 0 { + t.Fatalf("bad: %#v", out) + } +} + func TestSystemSched_ExhaustResources(t *testing.T) { h := NewHarness(t) diff --git a/scheduler/util.go b/scheduler/util.go index accc13a13..0629686d5 100644 --- a/scheduler/util.go +++ b/scheduler/util.go @@ -540,7 +540,7 @@ func taskGroupConstraints(tg *structs.TaskGroup) tgConstrainTuple { c := tgConstrainTuple{ constraints: make([]*structs.Constraint, 0, len(tg.Constraints)), drivers: make(map[string]struct{}), - size: new(structs.Resources), + size: &structs.Resources{DiskMB: tg.LocalDisk.DiskMB}, } c.constraints = append(c.constraints, tg.Constraints...) diff --git a/scheduler/util_test.go b/scheduler/util_test.go index e57db59b2..a63858613 100644 --- a/scheduler/util_test.go +++ b/scheduler/util_test.go @@ -846,6 +846,7 @@ func TestTaskGroupConstraints(t *testing.T) { Name: "web", Count: 10, Constraints: []*structs.Constraint{constr}, + LocalDisk: &structs.LocalDisk{}, Tasks: []*structs.Task{ &structs.Task{ Driver: "exec",