Merge pull request #4903 from hashicorp/b-delete-versions-mod-while-iter
Fix a panic related to batch GC
This commit is contained in:
commit
7ad8f6c103
|
@ -1126,6 +1126,9 @@ func (s *StateStore) deleteJobVersions(index uint64, job *structs.Job, txn *memd
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Put them into a slice so there are no safety concerns while actually
|
||||||
|
// performing the deletes
|
||||||
|
jobs := []*structs.Job{}
|
||||||
for {
|
for {
|
||||||
raw := iter.Next()
|
raw := iter.Next()
|
||||||
if raw == nil {
|
if raw == nil {
|
||||||
|
@ -1138,7 +1141,12 @@ func (s *StateStore) deleteJobVersions(index uint64, job *structs.Job, txn *memd
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
if _, err = txn.DeleteAll("job_version", "id", j.Namespace, j.ID, j.Version); err != nil {
|
jobs = append(jobs, j)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Do the deletes
|
||||||
|
for _, j := range jobs {
|
||||||
|
if err := txn.Delete("job_version", j); err != nil {
|
||||||
return fmt.Errorf("deleting job versions failed: %v", err)
|
return fmt.Errorf("deleting job versions failed: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1661,6 +1661,75 @@ func TestStateStore_DeleteJob_Job(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestStateStore_DeleteJobTxn_BatchDeletes(t *testing.T) {
|
||||||
|
state := testStateStore(t)
|
||||||
|
|
||||||
|
const testJobCount = 10
|
||||||
|
const jobVersionCount = 4
|
||||||
|
|
||||||
|
stateIndex := uint64(1000)
|
||||||
|
|
||||||
|
jobs := make([]*structs.Job, testJobCount)
|
||||||
|
for i := 0; i < testJobCount; i++ {
|
||||||
|
stateIndex++
|
||||||
|
job := mock.BatchJob()
|
||||||
|
|
||||||
|
err := state.UpsertJob(stateIndex, job)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
jobs[i] = job
|
||||||
|
|
||||||
|
// Create some versions
|
||||||
|
for vi := 1; vi < jobVersionCount; vi++ {
|
||||||
|
stateIndex++
|
||||||
|
|
||||||
|
job := job.Copy()
|
||||||
|
job.TaskGroups[0].Tasks[0].Env = map[string]string{
|
||||||
|
"Version": fmt.Sprintf("%d", vi),
|
||||||
|
}
|
||||||
|
|
||||||
|
require.NoError(t, state.UpsertJob(stateIndex, job))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
ws := memdb.NewWatchSet()
|
||||||
|
|
||||||
|
// Sanity check that jobs are present in DB
|
||||||
|
job, err := state.JobByID(ws, jobs[0].Namespace, jobs[0].ID)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, jobs[0].ID, job.ID)
|
||||||
|
|
||||||
|
jobVersions, err := state.JobVersionsByID(ws, jobs[0].Namespace, jobs[0].ID)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, jobVersionCount, len(jobVersions))
|
||||||
|
|
||||||
|
// Actually delete
|
||||||
|
const deletionIndex = uint64(10001)
|
||||||
|
err = state.WithWriteTransaction(func(txn Txn) error {
|
||||||
|
for i, job := range jobs {
|
||||||
|
err := state.DeleteJobTxn(deletionIndex, job.Namespace, job.ID, txn)
|
||||||
|
require.NoError(t, err, "failed at %d %e", i, err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
assert.True(t, watchFired(ws))
|
||||||
|
|
||||||
|
ws = memdb.NewWatchSet()
|
||||||
|
out, err := state.JobByID(ws, jobs[0].Namespace, jobs[0].ID)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Nil(t, out)
|
||||||
|
|
||||||
|
jobVersions, err = state.JobVersionsByID(ws, jobs[0].Namespace, jobs[0].ID)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Empty(t, jobVersions)
|
||||||
|
|
||||||
|
index, err := state.Index("jobs")
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, deletionIndex, index)
|
||||||
|
}
|
||||||
|
|
||||||
func TestStateStore_DeleteJob_MultipleVersions(t *testing.T) {
|
func TestStateStore_DeleteJob_MultipleVersions(t *testing.T) {
|
||||||
state := testStateStore(t)
|
state := testStateStore(t)
|
||||||
assert := assert.New(t)
|
assert := assert.New(t)
|
||||||
|
|
Loading…
Reference in a new issue