diff --git a/db/db_impl.h b/db/db_impl.h index fc80fe39a8..d6ac0bbcdc 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -928,6 +928,9 @@ class DBImpl : public DB { Status BackgroundFlush(bool* madeProgress, JobContext* job_context, LogBuffer* log_buffer); + bool EnoughRoomForCompaction(const std::vector& inputs, + bool* sfm_bookkeeping, LogBuffer* log_buffer); + void PrintStatistics(); // dump rocksdb.stats to LOG diff --git a/db/db_impl_compaction_flush.cc b/db/db_impl_compaction_flush.cc index 3a243263d6..230b1f57bb 100644 --- a/db/db_impl_compaction_flush.cc +++ b/db/db_impl_compaction_flush.cc @@ -24,6 +24,32 @@ namespace rocksdb { +bool DBImpl::EnoughRoomForCompaction( + const std::vector& inputs, + bool* sfm_reserved_compact_space, LogBuffer* log_buffer) { + // Check if we have enough room to do the compaction + bool enough_room = true; +#ifndef ROCKSDB_LITE + auto sfm = static_cast( + immutable_db_options_.sst_file_manager.get()); + if (sfm) { + enough_room = sfm->EnoughRoomForCompaction(inputs); + if (enough_room) { + *sfm_reserved_compact_space = true; + } + } +#endif // ROCKSDB_LITE + if (!enough_room) { + // Just in case tests want to change the value of enough_room + TEST_SYNC_POINT_CALLBACK( + "DBImpl::BackgroundCompaction():CancelledCompaction", &enough_room); + ROCKS_LOG_BUFFER(log_buffer, + "Cancelled compaction because not enough room"); + RecordTick(stats_, COMPACTION_CANCELLED, 1); + } + return enough_room; +} + Status DBImpl::SyncClosedLogs(JobContext* job_context) { TEST_SYNC_POINT("DBImpl::SyncClosedLogs:Start"); mutex_.AssertHeld(); @@ -582,6 +608,16 @@ Status DBImpl::CompactFilesImpl( "files are already being compacted"); } } + bool sfm_reserved_compact_space = false; + // First check if we have enough room to do the compaction + bool enough_room = EnoughRoomForCompaction( + input_files, &sfm_reserved_compact_space, log_buffer); + + if (!enough_room) { + // m's vars will get set properly at the end of this function, + // as long as status == CompactionTooLarge + return Status::CompactionTooLarge(); + } // At this point, CompactFiles will be run. bg_compaction_scheduled_++; @@ -658,6 +694,14 @@ Status DBImpl::CompactFilesImpl( *c->mutable_cf_options(), FlushReason::kManualCompaction); } c->ReleaseCompactionFiles(s); +#ifndef ROCKSDB_LITE + // Need to make sure SstFileManager does its bookkeeping + auto sfm = static_cast( + immutable_db_options_.sst_file_manager.get()); + if (sfm && sfm_reserved_compact_space) { + sfm->OnCompactionCompletion(c.get()); + } +#endif // ROCKSDB_LITE ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem); @@ -696,6 +740,7 @@ Status DBImpl::CompactFilesImpl( if (bg_compaction_scheduled_ == 0) { bg_cv_.SignalAll(); } + TEST_SYNC_POINT("CompactFilesImpl:End"); return status; } @@ -1578,9 +1623,7 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, // InternalKey manual_end_storage; // InternalKey* manual_end = &manual_end_storage; -#ifndef ROCKSDB_LITE - bool sfm_bookkeeping = false; -#endif // ROCKSDB_LITE + bool sfm_reserved_compact_space = false; if (is_manual) { ManualCompactionState* m = manual_compaction; assert(m->in_progress); @@ -1594,16 +1637,29 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, (m->begin ? m->begin->DebugString().c_str() : "(begin)"), (m->end ? m->end->DebugString().c_str() : "(end)")); } else { - ROCKS_LOG_BUFFER( - log_buffer, - "[%s] Manual compaction from level-%d to level-%d from %s .. " - "%s; will stop at %s\n", - m->cfd->GetName().c_str(), m->input_level, c->output_level(), - (m->begin ? m->begin->DebugString().c_str() : "(begin)"), - (m->end ? m->end->DebugString().c_str() : "(end)"), - ((m->done || m->manual_end == nullptr) - ? "(end)" - : m->manual_end->DebugString().c_str())); + // First check if we have enough room to do the compaction + bool enough_room = EnoughRoomForCompaction( + *(c->inputs()), &sfm_reserved_compact_space, log_buffer); + + if (!enough_room) { + // Then don't do the compaction + c->ReleaseCompactionFiles(status); + c.reset(); + // m's vars will get set properly at the end of this function, + // as long as status == CompactionTooLarge + status = Status::CompactionTooLarge(); + } else { + ROCKS_LOG_BUFFER( + log_buffer, + "[%s] Manual compaction from level-%d to level-%d from %s .. " + "%s; will stop at %s\n", + m->cfd->GetName().c_str(), m->input_level, c->output_level(), + (m->begin ? m->begin->DebugString().c_str() : "(begin)"), + (m->end ? m->end->DebugString().c_str() : "(end)"), + ((m->done || m->manual_end == nullptr) + ? "(end)" + : m->manual_end->DebugString().c_str())); + } } } else if (!is_prepicked && !compaction_queue_.empty()) { if (HasExclusiveManualCompaction()) { @@ -1644,24 +1700,10 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, c.reset(cfd->PickCompaction(*mutable_cf_options, log_buffer)); TEST_SYNC_POINT("DBImpl::BackgroundCompaction():AfterPickCompaction"); - bool enough_room = true; if (c != nullptr) { -#ifndef ROCKSDB_LITE - auto sfm = static_cast( - immutable_db_options_.sst_file_manager.get()); - if (sfm) { - enough_room = sfm->EnoughRoomForCompaction(c.get()); - if (enough_room) { - sfm_bookkeeping = true; - } - } -#endif // ROCKSDB_LITE - if (!enough_room) { - // Just in case tests want to change the value of enough_room - TEST_SYNC_POINT_CALLBACK( - "DBImpl::BackgroundCompaction():CancelledCompaction", - &enough_room); - } + bool enough_room = EnoughRoomForCompaction( + *(c->inputs()), &sfm_reserved_compact_space, log_buffer); + if (!enough_room) { // Then don't do the compaction c->ReleaseCompactionFiles(status); @@ -1670,9 +1712,6 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, ->storage_info() ->ComputeCompactionScore(*(c->immutable_cf_options()), *(c->mutable_cf_options())); - - ROCKS_LOG_BUFFER(log_buffer, - "Cancelled compaction because not enough room"); AddToCompactionQueue(cfd); ++unscheduled_compactions_; @@ -1680,7 +1719,6 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, // Don't need to sleep here, because BackgroundCallCompaction // will sleep if !s.ok() status = Status::CompactionTooLarge(); - RecordTick(stats_, COMPACTION_CANCELLED, 1); } else { // update statistics MeasureTime(stats_, NUM_FILES_IN_SINGLE_COMPACTION, @@ -1867,7 +1905,7 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, // Need to make sure SstFileManager does its bookkeeping auto sfm = static_cast( immutable_db_options_.sst_file_manager.get()); - if (sfm && sfm_bookkeeping) { + if (sfm && sfm_reserved_compact_space) { sfm->OnCompactionCompletion(c.get()); } #endif // ROCKSDB_LITE diff --git a/db/db_sst_test.cc b/db/db_sst_test.cc index a64f6a9245..ef896890cc 100644 --- a/db/db_sst_test.cc +++ b/db/db_sst_test.cc @@ -20,6 +20,35 @@ class DBSSTTest : public DBTestBase { DBSSTTest() : DBTestBase("/db_sst_test") {} }; +// A class which remembers the name of each flushed file. +class FlushedFileCollector : public EventListener { + public: + FlushedFileCollector() {} + ~FlushedFileCollector() {} + + virtual void OnFlushCompleted(DB* /*db*/, const FlushJobInfo& info) override { + std::lock_guard lock(mutex_); + flushed_files_.push_back(info.file_path); + } + + std::vector GetFlushedFiles() { + std::lock_guard lock(mutex_); + std::vector result; + for (auto fname : flushed_files_) { + result.push_back(fname); + } + return result; + } + void ClearFlushedFiles() { + std::lock_guard lock(mutex_); + flushed_files_.clear(); + } + + private: + std::vector flushed_files_; + std::mutex mutex_; +}; + TEST_F(DBSSTTest, DontDeletePendingOutputs) { Options options; options.env = env_; @@ -559,6 +588,7 @@ TEST_F(DBSSTTest, CancellingCompactionsWorks) { rocksdb::SyncPoint::GetInstance()->SetCallBack( "DBImpl::BackgroundCompaction():CancelledCompaction", [&](void* arg) { sfm->SetMaxAllowedSpaceUsage(0); + ASSERT_EQ(sfm->GetCompactionsReservedSize(), 0); }); rocksdb::SyncPoint::GetInstance()->SetCallBack( "DBImpl::BackgroundCompaction:NonTrivial:AfterRun", @@ -584,6 +614,8 @@ TEST_F(DBSSTTest, CancellingCompactionsWorks) { ASSERT_OK(Flush()); dbfull()->TEST_WaitForCompact(true); + // Because we set a callback in CancelledCompaction, we actually + // let the compaction run ASSERT_GT(completed_compactions, 0); ASSERT_EQ(sfm->GetCompactionsReservedSize(), 0); // Make sure the stat is bumped @@ -591,6 +623,78 @@ TEST_F(DBSSTTest, CancellingCompactionsWorks) { rocksdb::SyncPoint::GetInstance()->DisableProcessing(); } +TEST_F(DBSSTTest, CancellingManualCompactionsWorks) { + std::shared_ptr sst_file_manager(NewSstFileManager(env_)); + auto sfm = static_cast(sst_file_manager.get()); + + Options options = CurrentOptions(); + options.sst_file_manager = sst_file_manager; + options.statistics = CreateDBStatistics(); + + FlushedFileCollector* collector = new FlushedFileCollector(); + options.listeners.emplace_back(collector); + + DestroyAndReopen(options); + + Random rnd(301); + + // Generate a file containing 10 keys. + for (int i = 0; i < 10; i++) { + ASSERT_OK(Put(Key(i), RandomString(&rnd, 50))); + } + ASSERT_OK(Flush()); + uint64_t total_file_size = 0; + auto files_in_db = GetAllSSTFiles(&total_file_size); + // Set the maximum allowed space usage to the current total size + sfm->SetMaxAllowedSpaceUsage(2 * total_file_size + 1); + + // Generate another file to trigger compaction. + for (int i = 0; i < 10; i++) { + ASSERT_OK(Put(Key(i), RandomString(&rnd, 50))); + } + ASSERT_OK(Flush()); + + // OK, now trigger a manual compaction + dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr); + + // Wait for manual compaction to get scheduled and finish + dbfull()->TEST_WaitForCompact(true); + + ASSERT_EQ(sfm->GetCompactionsReservedSize(), 0); + // Make sure the stat is bumped + ASSERT_EQ(dbfull()->immutable_db_options().statistics.get()->getTickerCount( + COMPACTION_CANCELLED), + 1); + + // Now make sure CompactFiles also gets cancelled + auto l0_files = collector->GetFlushedFiles(); + dbfull()->CompactFiles(rocksdb::CompactionOptions(), l0_files, 0); + + // Wait for manual compaction to get scheduled and finish + dbfull()->TEST_WaitForCompact(true); + + ASSERT_EQ(dbfull()->immutable_db_options().statistics.get()->getTickerCount( + COMPACTION_CANCELLED), + 2); + ASSERT_EQ(sfm->GetCompactionsReservedSize(), 0); + + // Now let the flush through and make sure GetCompactionsReservedSize + // returns to normal + sfm->SetMaxAllowedSpaceUsage(0); + int completed_compactions = 0; + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "CompactFilesImpl:End", [&](void* arg) { completed_compactions++; }); + + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + dbfull()->CompactFiles(rocksdb::CompactionOptions(), l0_files, 0); + dbfull()->TEST_WaitForCompact(true); + + ASSERT_EQ(sfm->GetCompactionsReservedSize(), 0); + ASSERT_GT(completed_compactions, 0); + + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); +} + TEST_F(DBSSTTest, DBWithMaxSpaceAllowedRandomized) { // This test will set a maximum allowed space for the DB, then it will // keep filling the DB until the limit is reached and bg_error_ is set. diff --git a/util/sst_file_manager_impl.cc b/util/sst_file_manager_impl.cc index 07d27695ca..ed8f61f0b9 100644 --- a/util/sst_file_manager_impl.cc +++ b/util/sst_file_manager_impl.cc @@ -106,13 +106,14 @@ bool SstFileManagerImpl::IsMaxAllowedSpaceReachedIncludingCompactions() { max_allowed_space_; } -bool SstFileManagerImpl::EnoughRoomForCompaction(Compaction* c) { +bool SstFileManagerImpl::EnoughRoomForCompaction( + const std::vector& inputs) { MutexLock l(&mu_); uint64_t size_added_by_compaction = 0; // First check if we even have the space to do the compaction - for (size_t i = 0; i < c->num_input_levels(); i++) { - for (size_t j = 0; j < c->num_input_files(i); j++) { - FileMetaData* filemeta = c->input(i, j); + for (size_t i = 0; i < inputs.size(); i++) { + for (size_t j = 0; j < inputs[i].size(); j++) { + FileMetaData* filemeta = inputs[i][j]; size_added_by_compaction += filemeta->fd.GetFileSize(); } } diff --git a/util/sst_file_manager_impl.h b/util/sst_file_manager_impl.h index b7a557d998..5d22725b31 100644 --- a/util/sst_file_manager_impl.h +++ b/util/sst_file_manager_impl.h @@ -67,7 +67,7 @@ class SstFileManagerImpl : public SstFileManager { // estimates how much space is currently being used by compactions (i.e. // if a compaction has started, this function bumps the used space by // the full compaction size). - bool EnoughRoomForCompaction(Compaction* c); + bool EnoughRoomForCompaction(const std::vector& inputs); // Bookkeeping so total_file_sizes_ goes back to normal after compaction // finishes