From 8105613c25064bf3dbfbf1b20ba508766b8f12e1 Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Thu, 25 Aug 2016 13:00:20 -0500 Subject: [PATCH] Added an upgrade path for existing jobs with no local disk --- nomad/state/state_store.go | 39 ++++++++++++ nomad/state/state_store_test.go | 108 ++++++++++++++++++++++++++++++++ 2 files changed, 147 insertions(+) diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index cee5935f3..47d304c19 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -357,6 +357,25 @@ func (s *StateStore) UpsertJob(index uint64, job *structs.Job) error { return fmt.Errorf("unable to create job summary: %v", err) } + // COMPAT 0.4.1 -> 0.5 Create the LocalDisk if it's nil by adding up DiskMB + // from task resources + for i, tg := range job.TaskGroups { + var diskMB int + for j, task := range tg.Tasks { + if task.Resources != nil { + resources := task.Resources + diskMB += resources.DiskMB + resources.DiskMB = 0 + task.Resources = resources + tg.Tasks[j] = task + } + } + tg.LocalDisk = &structs.LocalDisk{ + DiskMB: diskMB, + } + job.TaskGroups[i] = tg + } + // Insert the job if err := txn.Insert("jobs", job); err != nil { return fmt.Errorf("job insert failed: %v", err) @@ -1690,6 +1709,26 @@ func (r *StateRestore) NodeRestore(node *structs.Node) error { func (r *StateRestore) JobRestore(job *structs.Job) error { r.items.Add(watch.Item{Table: "jobs"}) r.items.Add(watch.Item{Job: job.ID}) + + // COMPAT 0.4.1 -> 0.5 Create the LocalDisk if it's nil by adding up DiskMB + // from task resources + for i, tg := range job.TaskGroups { + var diskMB int + for j, task := range tg.Tasks { + if task.Resources != nil { + resources := task.Resources + diskMB += resources.DiskMB + resources.DiskMB = 0 + task.Resources = resources + tg.Tasks[j] = task + } + } + tg.LocalDisk = &structs.LocalDisk{ + DiskMB: diskMB, + } + job.TaskGroups[i] = tg + } + if err := r.txn.Insert("jobs", job); err != nil { return fmt.Errorf("job insert failed: %v", err) } diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index f272b9fb1..2312abee2 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -433,6 +433,69 @@ func TestStateStore_UpdateUpsertJob_Job(t *testing.T) { notify.verify(t) } +// This test ensures that UpsertJob creates the LocalDisk is a job doesn't have +// one and clear out the task's disk resource asks +// COMPAT 0.4.1 -> 0.5 +func TestStateStore_UpsertJob_NoLocalDisk(t *testing.T) { + state := testStateStore(t) + job := mock.Job() + + // Set the LocalDisk to nil and set the tasks's DiskMB to 150 + job.TaskGroups[0].LocalDisk = nil + job.TaskGroups[0].Tasks[0].Resources.DiskMB = 150 + + notify := setupNotifyTest( + state, + watch.Item{Table: "jobs"}, + watch.Item{Job: job.ID}) + + err := state.UpsertJob(1000, job) + if err != nil { + t.Fatalf("err: %v", err) + } + + out, err := state.JobByID(job.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Expect the state store to create the LocalDisk and clear out Tasks's + // DiskMB + expected := job.Copy() + expected.TaskGroups[0].LocalDisk = &structs.LocalDisk{ + DiskMB: 150, + } + expected.TaskGroups[0].Tasks[0].Resources.DiskMB = 0 + + if !reflect.DeepEqual(expected, out) { + t.Fatalf("bad: %#v %#v", expected, out) + } + + index, err := state.Index("jobs") + if err != nil { + t.Fatalf("err: %v", err) + } + if index != 1000 { + t.Fatalf("bad: %d", index) + } + + summary, err := state.JobSummaryByID(job.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + if summary == nil { + t.Fatalf("nil summary") + } + if summary.JobID != job.ID { + t.Fatalf("bad summary id: %v", summary.JobID) + } + _, ok := summary.Summary["web"] + if !ok { + t.Fatalf("nil summary for task group") + } + notify.verify(t) +} + func TestStateStore_DeleteJob_Job(t *testing.T) { state := testStateStore(t) job := mock.Job() @@ -813,6 +876,51 @@ func TestStateStore_RestoreJob(t *testing.T) { notify.verify(t) } +// This test ensures that the state restore creates the LocalDisk for a job if +// it doesn't have one +// COMPAT 0.4.1 -> 0.5 +func TestStateStore_Jobs_NoLocalDisk(t *testing.T) { + state := testStateStore(t) + job := mock.Job() + + // Set LocalDisk to nil and set the DiskMB to 150 + job.TaskGroups[0].LocalDisk = nil + job.TaskGroups[0].Tasks[0].Resources.DiskMB = 150 + + notify := setupNotifyTest( + state, + watch.Item{Table: "jobs"}, + watch.Item{Job: job.ID}) + + restore, err := state.Restore() + if err != nil { + t.Fatalf("err: %v", err) + } + + err = restore.JobRestore(job) + if err != nil { + t.Fatalf("err: %v", err) + } + restore.Commit() + + out, err := state.JobByID(job.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Expect job to have local disk and clear out the task's disk resource ask + expected := job.Copy() + expected.TaskGroups[0].LocalDisk = &structs.LocalDisk{ + DiskMB: 150, + } + expected.TaskGroups[0].Tasks[0].Resources.DiskMB = 0 + if !reflect.DeepEqual(out, expected) { + t.Fatalf("Bad: %#v %#v", out, job) + } + + notify.verify(t) +} + func TestStateStore_UpsertPeriodicLaunch(t *testing.T) { state := testStateStore(t) job := mock.Job()