diff --git a/db/db_impl.cc b/db/db_impl.cc index f4ff976255..0a39387c84 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -260,7 +260,7 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname) wal_manager_(db_options_, env_options_), #endif // ROCKSDB_LITE event_logger_(db_options_.info_log.get()), - bg_work_gate_closed_(false), + bg_work_paused_(0), refitting_level_(false), opened_successfully_(false) { env_->GetAbsolutePath(dbname, &db_absolute_path_); @@ -1548,7 +1548,13 @@ Status DBImpl::CompactRange(const CompactRangeOptions& options, } if (options.change_level) { - s = ReFitLevel(cfd, final_output_level, options.target_level); + Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log, + "[RefitLevel] waiting for background threads to stop"); + s = PauseBackgroundWork(); + if (s.ok()) { + s = ReFitLevel(cfd, final_output_level, options.target_level); + } + ContinueBackgroundWork(); } LogFlush(db_options_.info_log); @@ -1747,6 +1753,25 @@ Status DBImpl::CompactFilesImpl( } #endif // ROCKSDB_LITE +Status DBImpl::PauseBackgroundWork() { + InstrumentedMutexLock guard_lock(&mutex_); + bg_work_paused_++; + while (bg_compaction_scheduled_ > 0 || bg_flush_scheduled_ > 0) { + bg_cv_.Wait(); + } + return Status::OK(); +} + +Status DBImpl::ContinueBackgroundWork() { + InstrumentedMutexLock guard_lock(&mutex_); + assert(bg_work_paused_ > 0); + bg_work_paused_--; + if (bg_work_paused_ == 0) { + MaybeScheduleFlushOrCompaction(); + } + return Status::OK(); +} + void DBImpl::NotifyOnCompactionCompleted( ColumnFamilyData* cfd, Compaction *c, const Status &st, const CompactionJobStats& compaction_job_stats, @@ -1857,14 +1882,18 @@ int DBImpl::FindMinimumEmptyLevelFitting(ColumnFamilyData* cfd, return minimum_level; } +// REQUIREMENT: block all background work by calling PauseBackgroundWork() +// before calling this function Status DBImpl::ReFitLevel(ColumnFamilyData* cfd, int level, int target_level) { assert(level < cfd->NumberLevels()); if (target_level >= cfd->NumberLevels()) { return Status::InvalidArgument("Target level exceeds number of levels"); } - SuperVersion* superversion_to_free = nullptr; - SuperVersion* new_superversion = new SuperVersion(); + std::unique_ptr superversion_to_free; + std::unique_ptr new_superversion(new SuperVersion()); + + Status status; InstrumentedMutexLock guard_lock(&mutex_); @@ -1872,40 +1901,26 @@ Status DBImpl::ReFitLevel(ColumnFamilyData* cfd, int level, int target_level) { if (refitting_level_) { Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log, "[ReFitLevel] another thread is refitting"); - delete new_superversion; return Status::NotSupported("another thread is refitting"); } refitting_level_ = true; - // wait for all background threads to stop - bg_work_gate_closed_ = true; - while (bg_compaction_scheduled_ > 0 || bg_flush_scheduled_) { - Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log, - "[RefitLevel] waiting for background threads to stop: %d %d", - bg_compaction_scheduled_, bg_flush_scheduled_); - bg_cv_.Wait(); - } - - const MutableCFOptions mutable_cf_options = - *cfd->GetLatestMutableCFOptions(); + const MutableCFOptions mutable_cf_options = *cfd->GetLatestMutableCFOptions(); // move to a smaller level int to_level = target_level; if (target_level < 0) { to_level = FindMinimumEmptyLevelFitting(cfd, mutable_cf_options, level); } - Status status; auto* vstorage = cfd->current()->storage_info(); if (to_level > level) { if (level == 0) { - delete new_superversion; return Status::NotSupported( "Cannot change from level 0 to other levels."); } // Check levels are empty for a trivial move for (int l = level + 1; l <= to_level; l++) { if (vstorage->NumLevelFiles(l) > 0) { - delete new_superversion; return Status::NotSupported( "Levels between source and target are not empty for a move."); } @@ -1913,8 +1928,8 @@ Status DBImpl::ReFitLevel(ColumnFamilyData* cfd, int level, int target_level) { } if (to_level != level) { Log(InfoLogLevel::DEBUG_LEVEL, db_options_.info_log, - "[%s] Before refitting:\n%s", - cfd->GetName().c_str(), cfd->current()->DebugString().data()); + "[%s] Before refitting:\n%s", cfd->GetName().c_str(), + cfd->current()->DebugString().data()); VersionEdit edit; edit.SetColumnFamily(cfd->GetID()); @@ -1926,14 +1941,13 @@ Status DBImpl::ReFitLevel(ColumnFamilyData* cfd, int level, int target_level) { f->marked_for_compaction); } Log(InfoLogLevel::DEBUG_LEVEL, db_options_.info_log, - "[%s] Apply version edit:\n%s", - cfd->GetName().c_str(), edit.DebugString().data()); + "[%s] Apply version edit:\n%s", cfd->GetName().c_str(), + edit.DebugString().data()); status = versions_->LogAndApply(cfd, mutable_cf_options, &edit, &mutex_, directories_.GetDbDir()); - superversion_to_free = InstallSuperVersionAndScheduleWork( - cfd, new_superversion, mutable_cf_options); - new_superversion = nullptr; + superversion_to_free.reset(InstallSuperVersionAndScheduleWork( + cfd, new_superversion.release(), mutable_cf_options)); Log(InfoLogLevel::DEBUG_LEVEL, db_options_.info_log, "[%s] LogAndApply: %s\n", cfd->GetName().c_str(), @@ -1941,16 +1955,13 @@ Status DBImpl::ReFitLevel(ColumnFamilyData* cfd, int level, int target_level) { if (status.ok()) { Log(InfoLogLevel::DEBUG_LEVEL, db_options_.info_log, - "[%s] After refitting:\n%s", - cfd->GetName().c_str(), cfd->current()->DebugString().data()); + "[%s] After refitting:\n%s", cfd->GetName().c_str(), + cfd->current()->DebugString().data()); } } refitting_level_ = false; - bg_work_gate_closed_ = false; - delete superversion_to_free; - delete new_superversion; return status; } @@ -2203,8 +2214,8 @@ void DBImpl::MaybeScheduleFlushOrCompaction() { // Compaction may introduce data race to DB open return; } - if (bg_work_gate_closed_) { - // gate closed for background work + if (bg_work_paused_ > 0) { + // we paused the background work return; } else if (shutting_down_.load(std::memory_order_acquire)) { // DB is being deleted; no more background compactions diff --git a/db/db_impl.h b/db/db_impl.h index 634d585770..3f2f97bf6c 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -139,6 +139,9 @@ class DBImpl : public DB { const int output_level, const int output_path_id = -1) override; + virtual Status PauseBackgroundWork() override; + virtual Status ContinueBackgroundWork() override; + using DB::SetOptions; Status SetOptions( ColumnFamilyHandle* column_family, @@ -746,8 +749,8 @@ class DBImpl : public DB { // Unified interface for logging events EventLogger event_logger_; - // A value of true temporarily disables scheduling of background work - bool bg_work_gate_closed_; + // A value of >0 temporarily disables scheduling of background work + int bg_work_paused_; // Guard against multiple concurrent refitting bool refitting_level_; diff --git a/db/db_test.cc b/db/db_test.cc index 92c6076b43..9b3a3d4bf1 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -5664,6 +5664,14 @@ class ModelDB: public DB { return Status::NotSupported("Not supported operation."); } + Status PauseBackgroundWork() override { + return Status::NotSupported("Not supported operation."); + } + + Status ContinueBackgroundWork() override { + return Status::NotSupported("Not supported operation."); + } + using DB::NumberLevels; virtual int NumberLevels(ColumnFamilyHandle* column_family) override { return 1; @@ -9634,6 +9642,33 @@ TEST_F(DBTest, AddExternalSstFileMultiThreaded) { INSTANTIATE_TEST_CASE_P(DBTestWithParam, DBTestWithParam, ::testing::Values(1, 4)); +TEST_F(DBTest, PauseBackgroundWorkTest) { + Options options; + options.write_buffer_size = 100000; // Small write buffer + options = CurrentOptions(options); + Reopen(options); + + std::vector threads; + std::atomic done; + db_->PauseBackgroundWork(); + threads.emplace_back([&]() { + Random rnd(301); + for (int i = 0; i < 10000; ++i) { + Put(RandomString(&rnd, 10), RandomString(&rnd, 10)); + } + done.store(true); + }); + env_->SleepForMicroseconds(200000); + // make sure the thread is not done + ASSERT_EQ(false, done.load()); + db_->ContinueBackgroundWork(); + for (auto& t : threads) { + t.join(); + } + // now it's done + ASSERT_EQ(true, done.load()); +} + } // namespace rocksdb #endif diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index 7038a781a2..5a49638bd1 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -524,6 +524,13 @@ class DB { return CompactFiles(compact_options, DefaultColumnFamily(), input_file_names, output_level, output_path_id); } + + // This function will wait until all currently running background processes + // finish. After it returns, no background process will be run until + // UnblockBackgroundWork is called + virtual Status PauseBackgroundWork() = 0; + virtual Status ContinueBackgroundWork() = 0; + // Number of levels used for this DB. virtual int NumberLevels(ColumnFamilyHandle* column_family) = 0; virtual int NumberLevels() { return NumberLevels(DefaultColumnFamily()); } diff --git a/include/rocksdb/utilities/stackable_db.h b/include/rocksdb/utilities/stackable_db.h index 95eddc62dd..aef192b07e 100644 --- a/include/rocksdb/utilities/stackable_db.h +++ b/include/rocksdb/utilities/stackable_db.h @@ -169,6 +169,13 @@ class StackableDB : public DB { output_level, output_path_id); } + virtual Status PauseBackgroundWork() override { + return db_->PauseBackgroundWork(); + } + virtual Status ContinueBackgroundWork() override { + return db_->ContinueBackgroundWork(); + } + using DB::NumberLevels; virtual int NumberLevels(ColumnFamilyHandle* column_family) override { return db_->NumberLevels(column_family);