diff --git a/HISTORY.md b/HISTORY.md index d9e08b4134..9efb85ec5f 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -17,6 +17,7 @@ * Fixed a potential timer crash when open close DB concurrently. * Fixed a race condition for `alive_log_files_` in non-two-write-queues mode. The race is between the write_thread_ in WriteToWAL() and another thread executing `FindObsoleteFiles()`. The race condition will be caught if `__glibcxx_requires_nonempty` is enabled. * Fixed a bug that `Iterator::Refresh()` reads stale keys after DeleteRange() performed. +* Fixed a race condition when disable and re-enable manual compaction. ### Public API changes * Remove BlockBasedTableOptions.hash_index_allow_collision which already takes no effect. diff --git a/db/db_compaction_test.cc b/db/db_compaction_test.cc index fd0eb66eb3..03502157d6 100644 --- a/db/db_compaction_test.cc +++ b/db/db_compaction_test.cc @@ -6973,8 +6973,7 @@ TEST_F(DBCompactionTest, DisableJustStartedManualCompaction) { {{"DBImpl::BGWorkCompaction", "DBCompactionTest::DisableJustStartedManualCompaction:" "PreDisableManualCompaction"}, - {"DBCompactionTest::DisableJustStartedManualCompaction:" - "ManualCompactionReturn", + {"DBImpl::RunManualCompaction:Unscheduled", "BackgroundCallCompaction:0"}}); SyncPoint::GetInstance()->EnableProcessing(); @@ -6983,9 +6982,6 @@ TEST_F(DBCompactionTest, DisableJustStartedManualCompaction) { cro.exclusive_manual_compaction = true; auto s = db_->CompactRange(cro, nullptr, nullptr); ASSERT_TRUE(s.IsIncomplete()); - TEST_SYNC_POINT( - "DBCompactionTest::DisableJustStartedManualCompaction:" - "ManualCompactionReturn"); }); TEST_SYNC_POINT( "DBCompactionTest::DisableJustStartedManualCompaction:" @@ -6995,6 +6991,43 @@ TEST_F(DBCompactionTest, DisableJustStartedManualCompaction) { compact_thread.join(); } +TEST_F(DBCompactionTest, DisableInProgressManualCompaction) { + const int kNumL0Files = 4; + + Options options = CurrentOptions(); + options.level0_file_num_compaction_trigger = kNumL0Files; + Reopen(options); + + SyncPoint::GetInstance()->LoadDependency( + {{"DBImpl::BackgroundCompaction:InProgress", + "DBCompactionTest::DisableInProgressManualCompaction:" + "PreDisableManualCompaction"}, + {"DBImpl::RunManualCompaction:Unscheduled", + "CompactionJob::Run():Start"}}); + SyncPoint::GetInstance()->EnableProcessing(); + + // generate files, but avoid trigger auto compaction + for (int i = 0; i < kNumL0Files / 2; i++) { + ASSERT_OK(Put(Key(1), "value1")); + ASSERT_OK(Put(Key(2), "value2")); + ASSERT_OK(Flush()); + } + + port::Thread compact_thread([&]() { + CompactRangeOptions cro; + cro.exclusive_manual_compaction = true; + auto s = db_->CompactRange(cro, nullptr, nullptr); + ASSERT_TRUE(s.IsIncomplete()); + }); + + TEST_SYNC_POINT( + "DBCompactionTest::DisableInProgressManualCompaction:" + "PreDisableManualCompaction"); + db_->DisableManualCompaction(); + + compact_thread.join(); +} + TEST_F(DBCompactionTest, DisableManualCompactionThreadQueueFull) { const int kNumL0Files = 4; diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 320c074636..bc0117ecd1 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -1550,8 +1550,7 @@ class DBImpl : public DB { Compaction* compaction; // caller retains ownership of `manual_compaction_state` as it is reused // across background compactions. - std::shared_ptr - manual_compaction_state; // nullptr if non-manual + ManualCompactionState* manual_compaction_state; // nullptr if non-manual // task limiter token is requested during compaction picking. std::unique_ptr task_token; }; diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index 44da77f7e1..ab7dc10e32 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -1796,10 +1796,11 @@ Status DBImpl::RunManualCompaction( CompactionArg* ca = nullptr; bool scheduled = false; + bool unscheduled = false; Env::Priority thread_pool_priority = Env::Priority::TOTAL; bool manual_conflict = false; - auto manual = std::make_shared( + ManualCompactionState manual( cfd, input_level, output_level, compact_range_options.target_path_id, exclusive, disallow_trivial_move, compact_range_options.canceled); // For universal compaction, we enforce every manual compaction to compact @@ -1807,18 +1808,18 @@ Status DBImpl::RunManualCompaction( if (begin == nullptr || cfd->ioptions()->compaction_style == kCompactionStyleUniversal || cfd->ioptions()->compaction_style == kCompactionStyleFIFO) { - manual->begin = nullptr; + manual.begin = nullptr; } else { begin_storage.SetMinPossibleForUserKey(*begin); - manual->begin = &begin_storage; + manual.begin = &begin_storage; } if (end == nullptr || cfd->ioptions()->compaction_style == kCompactionStyleUniversal || cfd->ioptions()->compaction_style == kCompactionStyleFIFO) { - manual->end = nullptr; + manual.end = nullptr; } else { end_storage.SetMaxPossibleForUserKey(*end); - manual->end = &end_storage; + manual.end = &end_storage; } TEST_SYNC_POINT("DBImpl::RunManualCompaction:0"); @@ -1830,10 +1831,10 @@ Status DBImpl::RunManualCompaction( // `DisableManualCompaction()` just waited for the manual compaction queue // to drain. So return immediately. TEST_SYNC_POINT("DBImpl::RunManualCompaction:PausedAtStart"); - manual->status = + manual.status = Status::Incomplete(Status::SubCode::kManualCompactionPaused); - manual->done = true; - return manual->status; + manual.done = true; + return manual.status; } // When a manual compaction arrives, temporarily disable scheduling of @@ -1853,7 +1854,7 @@ Status DBImpl::RunManualCompaction( // However, only one of them will actually schedule compaction, while // others will wait on a condition variable until it completes. - AddManualCompaction(manual.get()); + AddManualCompaction(&manual); TEST_SYNC_POINT_CALLBACK("DBImpl::RunManualCompaction:NotScheduled", &mutex_); if (exclusive) { // Limitation: there's no way to wake up the below loop when user sets @@ -1862,11 +1863,11 @@ Status DBImpl::RunManualCompaction( while (bg_bottom_compaction_scheduled_ > 0 || bg_compaction_scheduled_ > 0) { if (manual_compaction_paused_ > 0 || - (manual->canceled != nullptr && *manual->canceled == true)) { + (manual.canceled != nullptr && *manual.canceled == true)) { // Pretend the error came from compaction so the below cleanup/error // handling code can process it. - manual->done = true; - manual->status = + manual.done = true; + manual.status = Status::Incomplete(Status::SubCode::kManualCompactionPaused); break; } @@ -1888,64 +1889,63 @@ Status DBImpl::RunManualCompaction( // We don't check bg_error_ here, because if we get the error in compaction, // the compaction will set manual.status to bg_error_ and set manual.done to // true. - while (!manual->done) { + while (!manual.done) { assert(HasPendingManualCompaction()); manual_conflict = false; Compaction* compaction = nullptr; - if (ShouldntRunManualCompaction(manual.get()) || - (manual->in_progress == true) || scheduled || - (((manual->manual_end = &manual->tmp_storage1) != nullptr) && - ((compaction = manual->cfd->CompactRange( - *manual->cfd->GetLatestMutableCFOptions(), mutable_db_options_, - manual->input_level, manual->output_level, compact_range_options, - manual->begin, manual->end, &manual->manual_end, - &manual_conflict, max_file_num_to_ignore, trim_ts)) == nullptr && + if (ShouldntRunManualCompaction(&manual) || (manual.in_progress == true) || + scheduled || + (((manual.manual_end = &manual.tmp_storage1) != nullptr) && + ((compaction = manual.cfd->CompactRange( + *manual.cfd->GetLatestMutableCFOptions(), mutable_db_options_, + manual.input_level, manual.output_level, compact_range_options, + manual.begin, manual.end, &manual.manual_end, &manual_conflict, + max_file_num_to_ignore, trim_ts)) == nullptr && manual_conflict))) { // exclusive manual compactions should not see a conflict during // CompactRange assert(!exclusive || !manual_conflict); // Running either this or some other manual compaction bg_cv_.Wait(); - if (manual_compaction_paused_ > 0) { - manual->done = true; - manual->status = - Status::Incomplete(Status::SubCode::kManualCompactionPaused); - if (scheduled) { - assert(thread_pool_priority != Env::Priority::TOTAL); - auto unscheduled_task_num = env_->UnSchedule( - GetTaskTag(TaskType::kManualCompaction), thread_pool_priority); - if (unscheduled_task_num > 0) { - ROCKS_LOG_INFO( - immutable_db_options_.info_log, - "[%s] Unscheduled %d number of manual compactions from the " - "thread-pool", - cfd->GetName().c_str(), unscheduled_task_num); - } + if (manual_compaction_paused_ > 0 && scheduled && !unscheduled) { + assert(thread_pool_priority != Env::Priority::TOTAL); + // unschedule all manual compactions + auto unscheduled_task_num = env_->UnSchedule( + GetTaskTag(TaskType::kManualCompaction), thread_pool_priority); + if (unscheduled_task_num > 0) { + ROCKS_LOG_INFO( + immutable_db_options_.info_log, + "[%s] Unscheduled %d number of manual compactions from the " + "thread-pool", + cfd->GetName().c_str(), unscheduled_task_num); + // it may unschedule other manual compactions, notify others. + bg_cv_.SignalAll(); } - break; + unscheduled = true; + TEST_SYNC_POINT("DBImpl::RunManualCompaction:Unscheduled"); } - if (scheduled && manual->incomplete == true) { - assert(!manual->in_progress); + if (scheduled && manual.incomplete == true) { + assert(!manual.in_progress); scheduled = false; - manual->incomplete = false; + manual.incomplete = false; } } else if (!scheduled) { if (compaction == nullptr) { - manual->done = true; + manual.done = true; bg_cv_.SignalAll(); continue; } ca = new CompactionArg; ca->db = this; ca->prepicked_compaction = new PrepickedCompaction; - ca->prepicked_compaction->manual_compaction_state = manual; + ca->prepicked_compaction->manual_compaction_state = &manual; ca->prepicked_compaction->compaction = compaction; if (!RequestCompactionToken( cfd, true, &ca->prepicked_compaction->task_token, &log_buffer)) { // Don't throttle manual compaction, only count outstanding tasks. assert(false); } - manual->incomplete = false; + manual.incomplete = false; if (compaction->bottommost_level() && env_->GetBackgroundThreads(Env::Priority::BOTTOM) > 0) { bg_bottom_compaction_scheduled_++; @@ -1969,18 +1969,18 @@ Status DBImpl::RunManualCompaction( } log_buffer.FlushBufferToLog(); - assert(!manual->in_progress); + assert(!manual.in_progress); assert(HasPendingManualCompaction()); - RemoveManualCompaction(manual.get()); + RemoveManualCompaction(&manual); // if the manual job is unscheduled, try schedule other jobs in case there's // any unscheduled compaction job which was blocked by exclusive manual // compaction. - if (manual->status.IsIncomplete() && - manual->status.subcode() == Status::SubCode::kManualCompactionPaused) { + if (manual.status.IsIncomplete() && + manual.status.subcode() == Status::SubCode::kManualCompactionPaused) { MaybeScheduleFlushOrCompaction(); } bg_cv_.SignalAll(); - return manual->status; + return manual.status; } void DBImpl::GenerateFlushRequest(const autovector& cfds, @@ -2706,6 +2706,12 @@ void DBImpl::UnscheduleCompactionCallback(void* arg) { CompactionArg ca = *(ca_ptr); delete reinterpret_cast(arg); if (ca.prepicked_compaction != nullptr) { + // if it's a manual compaction, set status to ManualCompactionPaused + if (ca.prepicked_compaction->manual_compaction_state) { + ca.prepicked_compaction->manual_compaction_state->done = true; + ca.prepicked_compaction->manual_compaction_state->status = + Status::Incomplete(Status::SubCode::kManualCompactionPaused); + } if (ca.prepicked_compaction->compaction != nullptr) { ca.prepicked_compaction->compaction->ReleaseCompactionFiles( Status::Incomplete(Status::SubCode::kManualCompactionPaused)); @@ -2948,7 +2954,7 @@ void DBImpl::BackgroundCallCompaction(PrepickedCompaction* prepicked_compaction, mutex_.Lock(); } else if (s.IsManualCompactionPaused()) { assert(prepicked_compaction); - auto m = prepicked_compaction->manual_compaction_state; + ManualCompactionState* m = prepicked_compaction->manual_compaction_state; assert(m); ROCKS_LOG_BUFFER(&log_buffer, "[%s] [JOB %d] Manual compaction paused", m->cfd->GetName().c_str(), job_context.job_id); @@ -3030,7 +3036,7 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, LogBuffer* log_buffer, PrepickedCompaction* prepicked_compaction, Env::Priority thread_pri) { - std::shared_ptr manual_compaction = + ManualCompactionState* manual_compaction = prepicked_compaction == nullptr ? nullptr : prepicked_compaction->manual_compaction_state; @@ -3074,10 +3080,6 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, if (!status.ok()) { if (is_manual) { manual_compaction->status = status; - manual_compaction->status - .PermitUncheckedError(); // the manual compaction thread may exit - // first, which won't be able to check the - // status manual_compaction->done = true; manual_compaction->in_progress = false; manual_compaction = nullptr; @@ -3094,13 +3096,15 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, manual_compaction->in_progress = true; } + TEST_SYNC_POINT("DBImpl::BackgroundCompaction:InProgress"); + std::unique_ptr task_token; // InternalKey manual_end_storage; // InternalKey* manual_end = &manual_end_storage; bool sfm_reserved_compact_space = false; if (is_manual) { - auto m = manual_compaction; + ManualCompactionState* m = manual_compaction; assert(m->in_progress); if (!c) { m->done = true; @@ -3484,7 +3488,7 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, c.reset(); if (is_manual) { - auto m = manual_compaction; + ManualCompactionState* m = manual_compaction; if (!status.ok()) { m->status = status; m->done = true;