Rollback memtable flush upon atomic flush fail (#4641)

Summary:
This fixes an assertion.

An atomic flush can have multiple flush jobs. Some of them may fail. If any of
them fails, we need to rollback all of them.
For the flush jobs that do fail, we already call `RollbackMemTableFlush` in
`FlushJob::Run`. The tricky part is for flush jobs that have completed
successfully. We need to call `RollbackMemTableFlush` for them as well.

The newly added DBAtomicFlushTest.AtomicFlushRollbackSomeJobs will SigAbort
without the corresponding change in AtomicFlushMemTablesToOutputFiles.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/4641

Differential Revision: D12943649

Pulled By: riversand963

fbshipit-source-id: c66a4a664a1e0938e938fd41edc5a70c34cdd868
This commit is contained in:
Yanqin Jin 2018-11-14 20:52:21 -08:00 committed by Facebook Github Bot
parent 6bee36a786
commit 147697420a
2 changed files with 67 additions and 5 deletions

View File

@ -355,6 +355,52 @@ TEST_P(DBAtomicFlushTest, AtomicFlushTriggeredByMemTableFull) {
SyncPoint::GetInstance()->DisableProcessing();
}
TEST_P(DBAtomicFlushTest, AtomicFlushRollbackSomeJobs) {
bool atomic_flush = GetParam();
if (!atomic_flush) {
return;
}
std::unique_ptr<FaultInjectionTestEnv> fault_injection_env(
new FaultInjectionTestEnv(env_));
Options options = CurrentOptions();
options.create_if_missing = true;
options.atomic_flush = atomic_flush;
options.env = fault_injection_env.get();
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->LoadDependency(
{{"DBImpl::AtomicFlushMemTablesToOutputFiles:SomeFlushJobsComplete:1",
"DBAtomicFlushTest::AtomicFlushRollbackSomeJobs:1"},
{"DBAtomicFlushTest::AtomicFlushRollbackSomeJobs:2",
"DBImpl::AtomicFlushMemTablesToOutputFiles:SomeFlushJobsComplete:2"}});
SyncPoint::GetInstance()->EnableProcessing();
CreateAndReopenWithCF({"pikachu", "eevee"}, options);
size_t num_cfs = handles_.size();
ASSERT_EQ(3, num_cfs);
WriteOptions wopts;
wopts.disableWAL = true;
for (size_t i = 0; i != num_cfs; ++i) {
int cf_id = static_cast<int>(i);
ASSERT_OK(Put(cf_id, "key", "value", wopts));
}
FlushOptions flush_opts;
flush_opts.wait = false;
ASSERT_OK(dbfull()->Flush(flush_opts, handles_));
TEST_SYNC_POINT("DBAtomicFlushTest::AtomicFlushRollbackSomeJobs:1");
fault_injection_env->SetFilesystemActive(false);
TEST_SYNC_POINT("DBAtomicFlushTest::AtomicFlushRollbackSomeJobs:2");
for (auto* cfh : handles_) {
dbfull()->TEST_WaitForFlushMemTable(cfh);
}
for (size_t i = 0; i != num_cfs; ++i) {
auto cfh = static_cast<ColumnFamilyHandleImpl*>(handles_[i]);
ASSERT_EQ(1, cfh->cfd()->imm()->NumNotFlushed());
ASSERT_TRUE(cfh->cfd()->mem()->IsEmpty());
}
fault_injection_env->SetFilesystemActive(true);
Destroy(options);
}
INSTANTIATE_TEST_CASE_P(DBFlushDirectIOTest, DBFlushDirectIOTest,
testing::Bool());

View File

@ -307,7 +307,7 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
assert(num_cfs == static_cast<int>(jobs.size()));
for (int i = 0; i != num_cfs; ++i) {
file_meta.emplace_back(FileMetaData());
file_meta.emplace_back();
#ifndef ROCKSDB_LITE
const MutableCFOptions& mutable_cf_options =
@ -335,7 +335,7 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
if (s.ok()) {
// TODO (yanqin): parallelize jobs with threads.
for (int i = 0; i != num_cfs; ++i) {
for (int i = 1; i != num_cfs; ++i) {
exec_status[i].second =
jobs[i].Run(&logs_with_prep_tracker_, &file_meta[i]);
exec_status[i].first = true;
@ -344,6 +344,20 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
break;
}
}
if (num_cfs > 1) {
TEST_SYNC_POINT(
"DBImpl::AtomicFlushMemTablesToOutputFiles:SomeFlushJobsComplete:1");
TEST_SYNC_POINT(
"DBImpl::AtomicFlushMemTablesToOutputFiles:SomeFlushJobsComplete:2");
}
if (s.ok()) {
exec_status[0].second =
jobs[0].Run(&logs_with_prep_tracker_, &file_meta[0]);
exec_status[0].first = true;
if (!exec_status[0].second.ok()) {
s = exec_status[0].second;
}
}
}
if (s.ok()) {
@ -428,9 +442,11 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
}
if (!s.IsShutdownInProgress()) {
for (int i = 0; i != num_cfs; ++i) {
auto& mems = jobs[i].GetMemTables();
cfds[i]->imm()->RollbackMemtableFlush(mems,
file_meta[i].fd.GetNumber());
if (exec_status[i].first && exec_status[i].second.ok()) {
auto& mems = jobs[i].GetMemTables();
cfds[i]->imm()->RollbackMemtableFlush(mems,
file_meta[i].fd.GetNumber());
}
}
Status new_bg_error = s;
error_handler_.SetBGError(new_bg_error, BackgroundErrorReason::kFlush);