Release compaction files in manifest write callback (#11764)

Summary:
Fixes https://github.com/facebook/rocksdb/issues/10257 (also see [here](https://github.com/facebook/rocksdb/pull/10355#issuecomment-1684308556)) by releasing compaction files earlier when writing to manifest in LogAndApply().  This is done by passing in a [callback](ba59751430/db/version_set.h (L1199)) to LogAndApply(). The new Version is created in the same critical section where compaction files are released. When compaction picker is picking compaction based on the new version, these compaction files will already be released.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/11764

Test Plan:
* Existing unit tests
* A repro unit test to validate that compaction files are released: `./db_compaction_test --gtest_filter=DBCompactionTest.ReleaseCompactionDuringManifestWrite`
* `python3 ./tools/db_crashtest.py --simple whitebox` with some assertions to check compaction files are released

Reviewed By: ajkr

Differential Revision: D48742152

Pulled By: cbi42

fbshipit-source-id: 7560fd0e723a63fe692234015d2b96850f8b5d77
This commit is contained in:
Changyu Bi 2023-09-18 13:11:53 -07:00 committed by Facebook GitHub Bot
parent 920d72e6fa
commit cc254efea6
7 changed files with 171 additions and 20 deletions

View File

@ -844,7 +844,8 @@ Status CompactionJob::Run() {
return status;
}
Status CompactionJob::Install(const MutableCFOptions& mutable_cf_options) {
Status CompactionJob::Install(const MutableCFOptions& mutable_cf_options,
bool* compaction_released) {
assert(compact_);
AutoThreadOperationStageUpdater stage_updater(
@ -860,7 +861,7 @@ Status CompactionJob::Install(const MutableCFOptions& mutable_cf_options) {
compaction_stats_);
if (status.ok()) {
status = InstallCompactionResults(mutable_cf_options);
status = InstallCompactionResults(mutable_cf_options, compaction_released);
}
if (!versions_->io_status().ok()) {
io_status_ = versions_->io_status();
@ -1697,7 +1698,7 @@ Status CompactionJob::FinishCompactionOutputFile(
}
Status CompactionJob::InstallCompactionResults(
const MutableCFOptions& mutable_cf_options) {
const MutableCFOptions& mutable_cf_options, bool* compaction_released) {
assert(compact_);
db_mutex_->AssertHeld();
@ -1779,9 +1780,15 @@ Status CompactionJob::InstallCompactionResults(
}
}
return versions_->LogAndApply(compaction->column_family_data(),
mutable_cf_options, read_options, edit,
db_mutex_, db_directory_);
auto manifest_wcb = [&compaction, &compaction_released](const Status& s) {
compaction->ReleaseCompactionFiles(s);
*compaction_released = true;
};
return versions_->LogAndApply(
compaction->column_family_data(), mutable_cf_options, read_options, edit,
db_mutex_, db_directory_, /*new_descriptor_log=*/false,
/*column_family_options=*/nullptr, manifest_wcb);
}
void CompactionJob::RecordCompactionIOStats() {

View File

@ -186,7 +186,10 @@ class CompactionJob {
// REQUIRED: mutex held
// Add compaction input/output to the current version
Status Install(const MutableCFOptions& mutable_cf_options);
// Releases compaction file through Compaction::ReleaseCompactionFiles().
// Sets *compaction_released to true if compaction is released.
Status Install(const MutableCFOptions& mutable_cf_options,
bool* compaction_released);
// Return the IO status
IOStatus io_status() const { return io_status_; }
@ -273,7 +276,8 @@ class CompactionJob {
const Slice& next_table_min_key,
const Slice* comp_start_user_key,
const Slice* comp_end_user_key);
Status InstallCompactionResults(const MutableCFOptions& mutable_cf_options);
Status InstallCompactionResults(const MutableCFOptions& mutable_cf_options,
bool* compaction_released);
Status OpenCompactionOutputFile(SubcompactionState* sub_compact,
CompactionOutputs& outputs);
void UpdateCompactionJobStats(

View File

@ -674,7 +674,9 @@ class CompactionJobTestBase : public testing::Test {
ASSERT_OK(s);
ASSERT_OK(compaction_job.io_status());
mutex_.Lock();
ASSERT_OK(compaction_job.Install(*cfd->GetLatestMutableCFOptions()));
bool compaction_released = false;
ASSERT_OK(compaction_job.Install(*cfd->GetLatestMutableCFOptions(),
&compaction_released));
ASSERT_OK(compaction_job.io_status());
mutex_.Unlock();
log_buffer.FlushBufferToLog();

View File

@ -10114,6 +10114,112 @@ TEST_F(DBCompactionTest, ErrorWhenReadFileHead) {
}
}
TEST_F(DBCompactionTest, ReleaseCompactionDuringManifestWrite) {
// Tests the fix for issue #10257.
// Compactions are released in LogAndApply() so that picking a compaction
// from the new Version won't see these compactions as registered.
Options options = CurrentOptions();
options.compaction_style = kCompactionStyleLevel;
// Make sure we can run multiple compactions at the same time.
env_->SetBackgroundThreads(3, Env::Priority::LOW);
env_->SetBackgroundThreads(3, Env::Priority::BOTTOM);
options.max_background_compactions = 3;
options.num_levels = 4;
DestroyAndReopen(options);
Random rnd(301);
// Construct the following LSM
// L2: [K1-K2] [K10-K11] [k100-k101]
// L3: [K1] [K10] [k100]
// We will have 3 threads to run 3 manual compactions.
// The first thread that writes to MANIFEST will not finish
// until the next two threads enters LogAndApply() and form
// a write group.
// We check that compactions are all released after the first
// thread from the write group finishes writing to MANIFEST.
// L3
ASSERT_OK(Put(Key(1), rnd.RandomString(20)));
ASSERT_OK(Flush());
MoveFilesToLevel(3);
ASSERT_OK(Put(Key(10), rnd.RandomString(20)));
ASSERT_OK(Flush());
MoveFilesToLevel(3);
ASSERT_OK(Put(Key(100), rnd.RandomString(20)));
ASSERT_OK(Flush());
MoveFilesToLevel(3);
// L2
ASSERT_OK(Put(Key(100), rnd.RandomString(20)));
ASSERT_OK(Put(Key(101), rnd.RandomString(20)));
ASSERT_OK(Flush());
MoveFilesToLevel(2);
ASSERT_OK(Put(Key(1), rnd.RandomString(20)));
ASSERT_OK(Put(Key(2), rnd.RandomString(20)));
ASSERT_OK(Flush());
MoveFilesToLevel(2);
ASSERT_OK(Put(Key(10), rnd.RandomString(20)));
ASSERT_OK(Put(Key(11), rnd.RandomString(20)));
ASSERT_OK(Flush());
MoveFilesToLevel(2);
ASSERT_EQ(NumTableFilesAtLevel(1), 0);
ASSERT_EQ(NumTableFilesAtLevel(2), 3);
ASSERT_EQ(NumTableFilesAtLevel(3), 3);
SyncPoint::GetInstance()->ClearAllCallBacks();
std::atomic_int count = 0;
SyncPoint::GetInstance()->SetCallBack(
"VersionSet::LogAndApply:BeforeWriterWaiting", [&](void*) {
int c = count.fetch_add(1);
if (c == 2) {
TEST_SYNC_POINT("all threads to enter LogAndApply");
}
});
SyncPoint::GetInstance()->LoadDependency(
{{"all threads to enter LogAndApply",
"VersionSet::LogAndApply:WriteManifestStart"}});
// Verify that compactions are released after writing to MANIFEST
std::atomic_int after_compact_count = 0;
SyncPoint::GetInstance()->SetCallBack(
"DBImpl::BackgroundCompaction:AfterCompaction", [&](void* ptr) {
int c = after_compact_count.fetch_add(1);
if (c > 0) {
ColumnFamilyData* cfd = (ColumnFamilyData*)(ptr);
ASSERT_TRUE(
cfd->compaction_picker()->compactions_in_progress()->empty());
}
});
SyncPoint::GetInstance()->EnableProcessing();
std::vector<std::thread> threads;
threads.emplace_back(std::thread([&]() {
std::string k1_str = Key(1);
std::string k2_str = Key(2);
Slice k1 = k1_str;
Slice k2 = k2_str;
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &k1, &k2));
}));
threads.emplace_back(std::thread([&]() {
std::string k10_str = Key(10);
std::string k11_str = Key(11);
Slice k10 = k10_str;
Slice k11 = k11_str;
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &k10, &k11));
}));
std::string k100_str = Key(100);
std::string k101_str = Key(101);
Slice k100 = k100_str;
Slice k101 = k101_str;
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &k100, &k101));
for (auto& thread : threads) {
thread.join();
}
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
}
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {

View File

@ -1512,7 +1512,12 @@ Status DBImpl::CompactFilesImpl(
TEST_SYNC_POINT("CompactFilesImpl:3");
mutex_.Lock();
Status status = compaction_job.Install(*c->mutable_cf_options());
bool compaction_released = false;
Status status =
compaction_job.Install(*c->mutable_cf_options(), &compaction_released);
if (!compaction_released) {
c->ReleaseCompactionFiles(s);
}
if (status.ok()) {
assert(compaction_job.io_status().ok());
InstallSuperVersionAndScheduleWork(c->column_family_data(),
@ -1523,7 +1528,6 @@ Status DBImpl::CompactFilesImpl(
// not check compaction_job.io_status() explicitly if we're not calling
// SetBGError
compaction_job.io_status().PermitUncheckedError();
c->ReleaseCompactionFiles(s);
// Need to make sure SstFileManager does its bookkeeping
auto sfm = static_cast<SstFileManagerImpl*>(
immutable_db_options_.sst_file_manager.get());
@ -3388,8 +3392,6 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
std::unique_ptr<TaskLimiterToken> task_token;
// InternalKey manual_end_storage;
// InternalKey* manual_end = &manual_end_storage;
bool sfm_reserved_compact_space = false;
if (is_manual) {
ManualCompactionState* m = manual_compaction;
@ -3525,6 +3527,7 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
}
IOStatus io_s;
bool compaction_released = false;
if (!c) {
// Nothing to do
ROCKS_LOG_BUFFER(log_buffer, "Compaction nothing to do");
@ -3547,7 +3550,12 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
}
status = versions_->LogAndApply(
c->column_family_data(), *c->mutable_cf_options(), read_options,
c->edit(), &mutex_, directories_.GetDbDir());
c->edit(), &mutex_, directories_.GetDbDir(),
/*new_descriptor_log=*/false, /*column_family_options=*/nullptr,
[&c, &compaction_released](const Status& s) {
c->ReleaseCompactionFiles(s);
compaction_released = true;
});
io_s = versions_->io_status();
InstallSuperVersionAndScheduleWork(c->column_family_data(),
&job_context->superversion_contexts[0],
@ -3613,7 +3621,12 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
}
status = versions_->LogAndApply(
c->column_family_data(), *c->mutable_cf_options(), read_options,
c->edit(), &mutex_, directories_.GetDbDir());
c->edit(), &mutex_, directories_.GetDbDir(),
/*new_descriptor_log=*/false, /*column_family_options=*/nullptr,
[&c, &compaction_released](const Status& s) {
c->ReleaseCompactionFiles(s);
compaction_released = true;
});
io_s = versions_->io_status();
// Use latest MutableCFOptions
InstallSuperVersionAndScheduleWork(c->column_family_data(),
@ -3663,6 +3676,7 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
// Transfer requested token, so it doesn't need to do it again.
ca->prepicked_compaction->task_token = std::move(task_token);
++bg_bottom_compaction_scheduled_;
assert(c == nullptr);
env_->Schedule(&DBImpl::BGWorkBottomCompaction, ca, Env::Priority::BOTTOM,
this, &DBImpl::UnscheduleCompactionCallback);
} else {
@ -3706,8 +3720,8 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
compaction_job.Run().PermitUncheckedError();
TEST_SYNC_POINT("DBImpl::BackgroundCompaction:NonTrivial:AfterRun");
mutex_.Lock();
status = compaction_job.Install(*c->mutable_cf_options());
status =
compaction_job.Install(*c->mutable_cf_options(), &compaction_released);
io_s = compaction_job.io_status();
if (status.ok()) {
InstallSuperVersionAndScheduleWork(c->column_family_data(),
@ -3726,7 +3740,23 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
}
if (c != nullptr) {
if (!compaction_released) {
c->ReleaseCompactionFiles(status);
} else {
#ifndef NDEBUG
// Sanity checking that compaction files are freed.
for (size_t i = 0; i < c->num_input_levels(); i++) {
for (size_t j = 0; j < c->inputs(i)->size(); j++) {
assert(!c->input(i, j)->being_compacted);
}
}
std::unordered_set<Compaction*>* cip = c->column_family_data()
->compaction_picker()
->compactions_in_progress();
assert(cip->find(c.get()) == cip->end());
#endif
}
*made_progress = true;
// Need to make sure SstFileManager does its bookkeeping

View File

@ -1174,7 +1174,8 @@ class VersionSet {
const MutableCFOptions& mutable_cf_options,
const ReadOptions& read_options, VersionEdit* edit, InstrumentedMutex* mu,
FSDirectory* dir_contains_current_file, bool new_descriptor_log = false,
const ColumnFamilyOptions* column_family_options = nullptr) {
const ColumnFamilyOptions* column_family_options = nullptr,
const std::function<void(const Status&)>& manifest_wcb = {}) {
autovector<ColumnFamilyData*> cfds;
cfds.emplace_back(column_family_data);
autovector<const MutableCFOptions*> mutable_cf_options_list;
@ -1185,7 +1186,7 @@ class VersionSet {
edit_lists.emplace_back(edit_list);
return LogAndApply(cfds, mutable_cf_options_list, read_options, edit_lists,
mu, dir_contains_current_file, new_descriptor_log,
column_family_options);
column_family_options, {manifest_wcb});
}
// The batch version. If edit_list.size() > 1, caller must ensure that
// no edit in the list column family add or drop

View File

@ -0,0 +1 @@
* Fix a bug (Issue #10257) where DB can hang after write stall since no compaction is scheduled (#11764).