Added an upgrade path for existing jobs with no local disk
This commit is contained in:
parent
ec73c768f1
commit
8105613c25
|
@ -357,6 +357,25 @@ func (s *StateStore) UpsertJob(index uint64, job *structs.Job) error {
|
|||
return fmt.Errorf("unable to create job summary: %v", err)
|
||||
}
|
||||
|
||||
// COMPAT 0.4.1 -> 0.5 Create the LocalDisk if it's nil by adding up DiskMB
|
||||
// from task resources
|
||||
for i, tg := range job.TaskGroups {
|
||||
var diskMB int
|
||||
for j, task := range tg.Tasks {
|
||||
if task.Resources != nil {
|
||||
resources := task.Resources
|
||||
diskMB += resources.DiskMB
|
||||
resources.DiskMB = 0
|
||||
task.Resources = resources
|
||||
tg.Tasks[j] = task
|
||||
}
|
||||
}
|
||||
tg.LocalDisk = &structs.LocalDisk{
|
||||
DiskMB: diskMB,
|
||||
}
|
||||
job.TaskGroups[i] = tg
|
||||
}
|
||||
|
||||
// Insert the job
|
||||
if err := txn.Insert("jobs", job); err != nil {
|
||||
return fmt.Errorf("job insert failed: %v", err)
|
||||
|
@ -1690,6 +1709,26 @@ func (r *StateRestore) NodeRestore(node *structs.Node) error {
|
|||
func (r *StateRestore) JobRestore(job *structs.Job) error {
|
||||
r.items.Add(watch.Item{Table: "jobs"})
|
||||
r.items.Add(watch.Item{Job: job.ID})
|
||||
|
||||
// COMPAT 0.4.1 -> 0.5 Create the LocalDisk if it's nil by adding up DiskMB
|
||||
// from task resources
|
||||
for i, tg := range job.TaskGroups {
|
||||
var diskMB int
|
||||
for j, task := range tg.Tasks {
|
||||
if task.Resources != nil {
|
||||
resources := task.Resources
|
||||
diskMB += resources.DiskMB
|
||||
resources.DiskMB = 0
|
||||
task.Resources = resources
|
||||
tg.Tasks[j] = task
|
||||
}
|
||||
}
|
||||
tg.LocalDisk = &structs.LocalDisk{
|
||||
DiskMB: diskMB,
|
||||
}
|
||||
job.TaskGroups[i] = tg
|
||||
}
|
||||
|
||||
if err := r.txn.Insert("jobs", job); err != nil {
|
||||
return fmt.Errorf("job insert failed: %v", err)
|
||||
}
|
||||
|
|
|
@ -433,6 +433,69 @@ func TestStateStore_UpdateUpsertJob_Job(t *testing.T) {
|
|||
notify.verify(t)
|
||||
}
|
||||
|
||||
// This test ensures that UpsertJob creates the LocalDisk is a job doesn't have
|
||||
// one and clear out the task's disk resource asks
|
||||
// COMPAT 0.4.1 -> 0.5
|
||||
func TestStateStore_UpsertJob_NoLocalDisk(t *testing.T) {
|
||||
state := testStateStore(t)
|
||||
job := mock.Job()
|
||||
|
||||
// Set the LocalDisk to nil and set the tasks's DiskMB to 150
|
||||
job.TaskGroups[0].LocalDisk = nil
|
||||
job.TaskGroups[0].Tasks[0].Resources.DiskMB = 150
|
||||
|
||||
notify := setupNotifyTest(
|
||||
state,
|
||||
watch.Item{Table: "jobs"},
|
||||
watch.Item{Job: job.ID})
|
||||
|
||||
err := state.UpsertJob(1000, job)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
out, err := state.JobByID(job.ID)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Expect the state store to create the LocalDisk and clear out Tasks's
|
||||
// DiskMB
|
||||
expected := job.Copy()
|
||||
expected.TaskGroups[0].LocalDisk = &structs.LocalDisk{
|
||||
DiskMB: 150,
|
||||
}
|
||||
expected.TaskGroups[0].Tasks[0].Resources.DiskMB = 0
|
||||
|
||||
if !reflect.DeepEqual(expected, out) {
|
||||
t.Fatalf("bad: %#v %#v", expected, out)
|
||||
}
|
||||
|
||||
index, err := state.Index("jobs")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if index != 1000 {
|
||||
t.Fatalf("bad: %d", index)
|
||||
}
|
||||
|
||||
summary, err := state.JobSummaryByID(job.ID)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if summary == nil {
|
||||
t.Fatalf("nil summary")
|
||||
}
|
||||
if summary.JobID != job.ID {
|
||||
t.Fatalf("bad summary id: %v", summary.JobID)
|
||||
}
|
||||
_, ok := summary.Summary["web"]
|
||||
if !ok {
|
||||
t.Fatalf("nil summary for task group")
|
||||
}
|
||||
notify.verify(t)
|
||||
}
|
||||
|
||||
func TestStateStore_DeleteJob_Job(t *testing.T) {
|
||||
state := testStateStore(t)
|
||||
job := mock.Job()
|
||||
|
@ -813,6 +876,51 @@ func TestStateStore_RestoreJob(t *testing.T) {
|
|||
notify.verify(t)
|
||||
}
|
||||
|
||||
// This test ensures that the state restore creates the LocalDisk for a job if
|
||||
// it doesn't have one
|
||||
// COMPAT 0.4.1 -> 0.5
|
||||
func TestStateStore_Jobs_NoLocalDisk(t *testing.T) {
|
||||
state := testStateStore(t)
|
||||
job := mock.Job()
|
||||
|
||||
// Set LocalDisk to nil and set the DiskMB to 150
|
||||
job.TaskGroups[0].LocalDisk = nil
|
||||
job.TaskGroups[0].Tasks[0].Resources.DiskMB = 150
|
||||
|
||||
notify := setupNotifyTest(
|
||||
state,
|
||||
watch.Item{Table: "jobs"},
|
||||
watch.Item{Job: job.ID})
|
||||
|
||||
restore, err := state.Restore()
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
err = restore.JobRestore(job)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
restore.Commit()
|
||||
|
||||
out, err := state.JobByID(job.ID)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Expect job to have local disk and clear out the task's disk resource ask
|
||||
expected := job.Copy()
|
||||
expected.TaskGroups[0].LocalDisk = &structs.LocalDisk{
|
||||
DiskMB: 150,
|
||||
}
|
||||
expected.TaskGroups[0].Tasks[0].Resources.DiskMB = 0
|
||||
if !reflect.DeepEqual(out, expected) {
|
||||
t.Fatalf("Bad: %#v %#v", out, job)
|
||||
}
|
||||
|
||||
notify.verify(t)
|
||||
}
|
||||
|
||||
func TestStateStore_UpsertPeriodicLaunch(t *testing.T) {
|
||||
state := testStateStore(t)
|
||||
job := mock.Job()
|
||||
|
|
Loading…
Reference in New Issue