mirror of
https://github.com/facebook/rocksdb.git
synced 2024-12-04 20:02:50 +00:00
Fix DisableManualCompaction()
hang (#12578)
Summary: Prior to this PR the following sequence could happen: 1. `RunManualCompaction()` A schedules compaction to thread pool and waits 2. `RunManualCompaction()` B waits without scheduling anything due to conflict 3. `DisableManualCompaction()` bumps `manual_compaction_paused_` and wakes up both 4. `RunManualCompaction()` A (`scheduled && !unscheduled`) unschedules its compaction and marks itself done 5. `RunManualCompaction()` B (`!scheduled && !unscheduled`) schedules compaction to thread pool 6. `RunManualCompaction()` B (`scheduled && !unscheduled`) waits on its compaction 7. `RunManualCompaction()` B at some point wakes up and finishes, either by unscheduling or by compaction execution 8. `DisableManualCompaction()` returns as there are no more manual compactions running Between 6. and 7. the wait can be long while the compaction sits in the thread pool queue. That wait is unnecessary. This PR changes the behavior from step 5. onward: 5'. `RunManualCompaction()` B (`!scheduled && !unscheduled`) marks itself done 6'. `DisableManualCompaction()` returns as there are no more manual compactions running Pull Request resolved: https://github.com/facebook/rocksdb/pull/12578 Reviewed By: cbi42 Differential Revision: D56528144 Pulled By: ajkr fbshipit-source-id: 4da2467376d7d4ff435547aa74dd8f118db0c03b
This commit is contained in:
parent
6f7cabeac8
commit
d2ebdea566
|
@ -3997,7 +3997,7 @@ TEST_P(DBCompactionTestWithParam, FullCompactionInBottomPriThreadPool) {
|
|||
Env::Default()->SetBackgroundThreads(0, Env::Priority::BOTTOM);
|
||||
}
|
||||
|
||||
TEST_F(DBCompactionTest, CancelCompactionWaitingOnConflict) {
|
||||
TEST_F(DBCompactionTest, CancelCompactionWaitingOnRunningConflict) {
|
||||
// This test verifies cancellation of a compaction waiting to be scheduled due
|
||||
// to conflict with a running compaction.
|
||||
//
|
||||
|
@ -4036,7 +4036,7 @@ TEST_F(DBCompactionTest, CancelCompactionWaitingOnConflict) {
|
|||
// Make sure the manual compaction has seen the conflict before being canceled
|
||||
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
|
||||
{{"ColumnFamilyData::CompactRange:Return",
|
||||
"DBCompactionTest::CancelCompactionWaitingOnConflict:"
|
||||
"DBCompactionTest::CancelCompactionWaitingOnRunningConflict:"
|
||||
"PreDisableManualCompaction"}});
|
||||
auto manual_compaction_thread = port::Thread([this]() {
|
||||
ASSERT_TRUE(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)
|
||||
|
@ -4047,12 +4047,73 @@ TEST_F(DBCompactionTest, CancelCompactionWaitingOnConflict) {
|
|||
// despite finding a conflict with an automatic compaction that is still
|
||||
// running
|
||||
TEST_SYNC_POINT(
|
||||
"DBCompactionTest::CancelCompactionWaitingOnConflict:"
|
||||
"DBCompactionTest::CancelCompactionWaitingOnRunningConflict:"
|
||||
"PreDisableManualCompaction");
|
||||
db_->DisableManualCompaction();
|
||||
manual_compaction_thread.join();
|
||||
}
|
||||
|
||||
TEST_F(DBCompactionTest, CancelCompactionWaitingOnScheduledConflict) {
|
||||
// This test verifies cancellation of a compaction waiting to be scheduled due
|
||||
// to conflict with a scheduled (but not running) compaction.
|
||||
//
|
||||
// A `CompactRange()` in universal compacts all files, waiting for files to
|
||||
// become available if they are locked for another compaction. This test
|
||||
// blocks the compaction thread pool and then calls `CompactRange()` twice.
|
||||
// The first call to `CompactRange()` schedules a compaction that is queued
|
||||
// in the thread pool. The second call to `CompactRange()` blocks on the first
|
||||
// call due to the conflict in file picking. The test verifies that
|
||||
// `DisableManualCompaction()` can cancel both while the thread pool remains
|
||||
// blocked.
|
||||
const int kNumSortedRuns = 4;
|
||||
|
||||
Options options = CurrentOptions();
|
||||
options.compaction_style = kCompactionStyleUniversal;
|
||||
options.disable_auto_compactions = true;
|
||||
options.memtable_factory.reset(
|
||||
test::NewSpecialSkipListFactory(KNumKeysByGenerateNewFile - 1));
|
||||
Reopen(options);
|
||||
|
||||
test::SleepingBackgroundTask sleeping_task_low;
|
||||
env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
|
||||
Env::Priority::LOW);
|
||||
|
||||
// Fill overlapping files in L0
|
||||
Random rnd(301);
|
||||
for (int i = 0; i < kNumSortedRuns; ++i) {
|
||||
int key_idx = 0;
|
||||
GenerateNewFile(&rnd, &key_idx, false /* nowait */);
|
||||
}
|
||||
|
||||
std::atomic<int> num_compact_range_calls{0};
|
||||
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
|
||||
"ColumnFamilyData::CompactRange:Return",
|
||||
[&](void* /* arg */) { num_compact_range_calls++; });
|
||||
|
||||
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
|
||||
|
||||
const int kNumManualCompactions = 2;
|
||||
port::Thread manual_compaction_threads[kNumManualCompactions];
|
||||
for (int i = 0; i < kNumManualCompactions; i++) {
|
||||
manual_compaction_threads[i] = port::Thread([this]() {
|
||||
ASSERT_TRUE(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)
|
||||
.IsIncomplete());
|
||||
});
|
||||
}
|
||||
while (num_compact_range_calls < kNumManualCompactions) {
|
||||
}
|
||||
|
||||
// Cancel it. Threads should be joinable, i.e., both the scheduled and blocked
|
||||
// manual compactions were canceled despite no compaction could have ever run.
|
||||
db_->DisableManualCompaction();
|
||||
for (int i = 0; i < kNumManualCompactions; i++) {
|
||||
manual_compaction_threads[i].join();
|
||||
}
|
||||
|
||||
sleeping_task_low.WakeUp();
|
||||
sleeping_task_low.WaitUntilDone();
|
||||
}
|
||||
|
||||
TEST_F(DBCompactionTest, OptimizedDeletionObsoleting) {
|
||||
// Deletions can be dropped when compacted to non-last level if they fall
|
||||
// outside the lower-level files' key-ranges.
|
||||
|
|
|
@ -2154,16 +2154,6 @@ Status DBImpl::RunManualCompaction(
|
|||
manual.begin, manual.end, &manual.manual_end, &manual_conflict,
|
||||
max_file_num_to_ignore, trim_ts)) == nullptr &&
|
||||
manual_conflict))) {
|
||||
if (!scheduled) {
|
||||
// There is a conflicting compaction
|
||||
if (manual_compaction_paused_ > 0 || manual.canceled == true) {
|
||||
// Stop waiting since it was canceled. Pretend the error came from
|
||||
// compaction so the below cleanup/error handling code can process it.
|
||||
manual.done = true;
|
||||
manual.status =
|
||||
Status::Incomplete(Status::SubCode::kManualCompactionPaused);
|
||||
}
|
||||
}
|
||||
if (!manual.done) {
|
||||
bg_cv_.Wait();
|
||||
}
|
||||
|
@ -2238,6 +2228,17 @@ Status DBImpl::RunManualCompaction(
|
|||
*final_output_level = compaction->output_level();
|
||||
}
|
||||
}
|
||||
if (!scheduled) {
|
||||
// There is nothing scheduled to wait on, so any cancellation can end the
|
||||
// manual now.
|
||||
if (manual_compaction_paused_ > 0 || manual.canceled == true) {
|
||||
// Stop waiting since it was canceled. Pretend the error came from
|
||||
// compaction so the below cleanup/error handling code can process it.
|
||||
manual.done = true;
|
||||
manual.status =
|
||||
Status::Incomplete(Status::SubCode::kManualCompactionPaused);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
log_buffer.FlushBufferToLog();
|
||||
|
|
|
@ -0,0 +1 @@
|
|||
* Fixed hang in `DisableManualCompactions()` where compactions waiting to be scheduled due to conflicts would not be canceled promptly
|
Loading…
Reference in a new issue