diff --git a/db/compaction_job.cc b/db/compaction_job.cc index 310b5df532..108f7d87a2 100644 --- a/db/compaction_job.cc +++ b/db/compaction_job.cc @@ -62,6 +62,46 @@ namespace rocksdb { +const char* GetCompactionReasonString(CompactionReason compaction_reason) { + switch (compaction_reason) { + case CompactionReason::kUnknown: + return "Unknown"; + case CompactionReason::kLevelL0FilesNum: + return "LevelL0FilesNum"; + case CompactionReason::kLevelMaxLevelSize: + return "LevelMaxLevelSize"; + case CompactionReason::kUniversalSizeAmplification: + return "UniversalSizeAmplification"; + case CompactionReason::kUniversalSizeRatio: + return "UniversalSizeRatio"; + case CompactionReason::kUniversalSortedRunNum: + return "UniversalSortedRunNum"; + case CompactionReason::kFIFOMaxSize: + return "FIFOMaxSize"; + case CompactionReason::kFIFOReduceNumFiles: + return "FIFOReduceNumFiles"; + case CompactionReason::kFIFOTtl: + return "FIFOTtl"; + case CompactionReason::kManualCompaction: + return "ManualCompaction"; + case CompactionReason::kFilesMarkedForCompaction: + return "FilesMarkedForCompaction"; + case CompactionReason::kBottommostFiles: + return "BottommostFiles"; + case CompactionReason::kTtl: + return "Ttl"; + case CompactionReason::kFlush: + return "Flush"; + case CompactionReason::kExternalSstIngestion: + return "ExternalSstIngestion"; + case CompactionReason::kNumOfReasons: + // fall through + default: + assert(false); + return "Invalid"; + } +} + // Maintains state for each sub-compaction struct CompactionJob::SubcompactionState { const Compaction* compaction; @@ -276,7 +316,7 @@ CompactionJob::CompactionJob( : job_id_(job_id), compact_(new CompactionState(compaction)), compaction_job_stats_(compaction_job_stats), - compaction_stats_(1), + compaction_stats_(compaction->compaction_reason(), 1), dbname_(dbname), db_options_(db_options), env_options_(env_options), @@ -1498,8 +1538,9 @@ void CompactionJob::LogCompaction() { cfd->GetName().c_str(), scratch); // build event logger report auto stream = event_logger_->Log(); - stream << "job" << job_id_ << "event" - << "compaction_started"; + stream << "job" << job_id_ << "event" << "compaction_started" + << "compaction_reason" + << GetCompactionReasonString(compaction->compaction_reason()); for (size_t i = 0; i < compaction->num_input_levels(); ++i) { stream << ("files_L" + ToString(compaction->level(i))); stream.StartArray(); diff --git a/db/db_compaction_test.cc b/db/db_compaction_test.cc index b4790ef0b5..7d427b7f40 100644 --- a/db/db_compaction_test.cc +++ b/db/db_compaction_test.cc @@ -74,6 +74,48 @@ class FlushedFileCollector : public EventListener { std::mutex mutex_; }; +class CompactionStatsCollector : public EventListener { +public: + CompactionStatsCollector() + : compaction_completed_(static_cast(CompactionReason::kNumOfReasons)) { + for (auto& v : compaction_completed_) { + v.store(0); + } + } + + ~CompactionStatsCollector() {} + + virtual void OnCompactionCompleted(DB* /* db */, + const CompactionJobInfo& info) override { + int k = static_cast(info.compaction_reason); + int num_of_reasons = static_cast(CompactionReason::kNumOfReasons); + assert(k >= 0 && k < num_of_reasons); + compaction_completed_[k]++; + } + + virtual void OnExternalFileIngested(DB* /* db */, + const ExternalFileIngestionInfo& /* info */) override { + int k = static_cast(CompactionReason::kExternalSstIngestion); + compaction_completed_[k]++; + } + + virtual void OnFlushCompleted(DB* /* db */, + const FlushJobInfo& /* info */) override { + int k = static_cast(CompactionReason::kFlush); + compaction_completed_[k]++; + } + + int NumberOfCompactions(CompactionReason reason) const { + int num_of_reasons = static_cast(CompactionReason::kNumOfReasons); + int k = static_cast(reason); + assert(k >= 0 && k < num_of_reasons); + return compaction_completed_.at(k).load(); + } + +private: + std::vector> compaction_completed_; +}; + static const int kCDTValueSize = 1000; static const int kCDTKeysPerBuffer = 4; static const int kCDTNumLevels = 8; @@ -154,6 +196,40 @@ void VerifyCompactionResult( #endif } +/* + * Verifies compaction stats of cfd are valid. + * + * For each level of cfd, its compaction stats are valid if + * 1) sum(stat.counts) == stat.count, and + * 2) stat.counts[i] == collector.NumberOfCompactions(i) + */ +void VerifyCompactionStats(ColumnFamilyData& cfd, + const CompactionStatsCollector& collector) { +#ifndef NDEBUG + InternalStats* internal_stats_ptr = cfd.internal_stats(); + ASSERT_TRUE(internal_stats_ptr != nullptr); + const std::vector& comp_stats = + internal_stats_ptr->TEST_GetCompactionStats(); + const int num_of_reasons = static_cast(CompactionReason::kNumOfReasons); + std::vector counts(num_of_reasons, 0); + // Count the number of compactions caused by each CompactionReason across + // all levels. + for (const auto& stat : comp_stats) { + int sum = 0; + for (int i = 0; i < num_of_reasons; i++) { + counts[i] += stat.counts[i]; + sum += stat.counts[i]; + } + ASSERT_EQ(sum, stat.count); + } + // Verify InternalStats bookkeeping matches that of CompactionStatsCollector, + // assuming that all compactions complete. + for (int i = 0; i < num_of_reasons; i++) { + ASSERT_EQ(collector.NumberOfCompactions(static_cast(i)), counts[i]); + } +#endif /* NDEBUG */ +} + const SstFileMetaData* PickFileRandomly( const ColumnFamilyMetaData& cf_meta, Random* rand, @@ -3562,6 +3638,28 @@ TEST_F(DBCompactionTest, CompactRangeFlushOverlappingMemtable) { } } +TEST_F(DBCompactionTest, CompactionStatsTest) { + Options options = CurrentOptions(); + options.level0_file_num_compaction_trigger = 2; + CompactionStatsCollector* collector = new CompactionStatsCollector(); + options.listeners.emplace_back(collector); + DestroyAndReopen(options); + + for (int i = 0; i < 32; i++) { + for (int j = 0; j < 5000; j++) { + Put(std::to_string(j), std::string(1, 'A')); + } + ASSERT_OK(Flush()); + ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable()); + } + dbfull()->TEST_WaitForCompact(); + ColumnFamilyHandleImpl* cfh = + static_cast(dbfull()->DefaultColumnFamily()); + ColumnFamilyData* cfd = cfh->cfd(); + + VerifyCompactionStats(*cfd, *collector); +} + INSTANTIATE_TEST_CASE_P(DBCompactionTestWithParam, DBCompactionTestWithParam, ::testing::Values(std::make_tuple(1, true), std::make_tuple(1, false), diff --git a/db/db_impl_open.cc b/db/db_impl_open.cc index 5830f0474e..2dd966622b 100644 --- a/db/db_impl_open.cc +++ b/db/db_impl_open.cc @@ -972,7 +972,7 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd, meta.marked_for_compaction); } - InternalStats::CompactionStats stats(1); + InternalStats::CompactionStats stats(CompactionReason::kFlush, 1); stats.micros = env_->NowMicros() - start_micros; stats.bytes_written = meta.fd.GetFileSize(); stats.num_output_files = 1; diff --git a/db/external_sst_file_ingestion_job.cc b/db/external_sst_file_ingestion_job.cc index 91ef46988b..98782ed370 100644 --- a/db/external_sst_file_ingestion_job.cc +++ b/db/external_sst_file_ingestion_job.cc @@ -218,7 +218,7 @@ void ExternalSstFileIngestionJob::UpdateStats() { uint64_t total_l0_files = 0; uint64_t total_time = env_->NowMicros() - job_start_time_; for (IngestedFileInfo& f : files_to_ingest_) { - InternalStats::CompactionStats stats(1); + InternalStats::CompactionStats stats(CompactionReason::kExternalSstIngestion, 1); stats.micros = total_time; stats.bytes_written = f.fd.GetFileSize(); stats.num_output_files = 1; diff --git a/db/flush_job.cc b/db/flush_job.cc index f01565697e..b53b229e42 100644 --- a/db/flush_job.cc +++ b/db/flush_job.cc @@ -392,7 +392,7 @@ Status FlushJob::WriteLevel0Table() { } // Note that here we treat flush as level 0 compaction in internal stats - InternalStats::CompactionStats stats(1); + InternalStats::CompactionStats stats(CompactionReason::kFlush, 1); stats.micros = db_options_.env->NowMicros() - start_micros; stats.bytes_written = meta_.fd.GetFileSize(); MeasureTime(stats_, FLUSH_TIME, stats.micros); diff --git a/db/internal_stats.cc b/db/internal_stats.cc index b3cf238081..46040382e9 100644 --- a/db/internal_stats.cc +++ b/db/internal_stats.cc @@ -957,7 +957,7 @@ void InternalStats::DumpDBStats(std::string* value) { */ void InternalStats::DumpCFMapStats( std::map* cf_stats) { - CompactionStats compaction_stats_sum(0); + CompactionStats compaction_stats_sum; std::map> levels_stats; DumpCFMapStats(&levels_stats, &compaction_stats_sum); for (auto const& level_ent : levels_stats) { @@ -1088,7 +1088,7 @@ void InternalStats::DumpCFStatsNoFileHistogram(std::string* value) { // Print stats for each level std::map> levels_stats; - CompactionStats compaction_stats_sum(0); + CompactionStats compaction_stats_sum; DumpCFMapStats(&levels_stats, &compaction_stats_sum); for (int l = 0; l < number_levels_; ++l) { if (levels_stats.find(l) != levels_stats.end()) { diff --git a/db/internal_stats.h b/db/internal_stats.h index 481c6d32f9..aaefa72d4b 100644 --- a/db/internal_stats.h +++ b/db/internal_stats.h @@ -163,7 +163,10 @@ class InternalStats { // Number of compactions done int count; - explicit CompactionStats(int _count = 0) + // Number of compactions done per CompactionReason + int counts[static_cast(CompactionReason::kNumOfReasons)]; + + explicit CompactionStats() : micros(0), bytes_read_non_output_levels(0), bytes_read_output_level(0), @@ -174,7 +177,36 @@ class InternalStats { num_output_files(0), num_input_records(0), num_dropped_records(0), - count(_count) {} + count(0) { + int num_of_reasons = static_cast(CompactionReason::kNumOfReasons); + for (int i = 0; i < num_of_reasons; i++) { + counts[i] = 0; + } + } + + explicit CompactionStats(CompactionReason reason, int c) + : micros(0), + bytes_read_non_output_levels(0), + bytes_read_output_level(0), + bytes_written(0), + bytes_moved(0), + num_input_files_in_non_output_levels(0), + num_input_files_in_output_level(0), + num_output_files(0), + num_input_records(0), + num_dropped_records(0), + count(c) { + int num_of_reasons = static_cast(CompactionReason::kNumOfReasons); + for (int i = 0; i < num_of_reasons; i++) { + counts[i] = 0; + } + int r = static_cast(reason); + if (r >= 0 && r < num_of_reasons) { + counts[r] = c; + } else { + count = 0; + } + } explicit CompactionStats(const CompactionStats& c) : micros(c.micros), @@ -189,7 +221,12 @@ class InternalStats { num_output_files(c.num_output_files), num_input_records(c.num_input_records), num_dropped_records(c.num_dropped_records), - count(c.count) {} + count(c.count) { + int num_of_reasons = static_cast(CompactionReason::kNumOfReasons); + for (int i = 0; i < num_of_reasons; i++) { + counts[i] = c.counts[i]; + } + } void Clear() { this->micros = 0; @@ -203,6 +240,10 @@ class InternalStats { this->num_input_records = 0; this->num_dropped_records = 0; this->count = 0; + int num_of_reasons = static_cast(CompactionReason::kNumOfReasons); + for (int i = 0; i < num_of_reasons; i++) { + counts[i] = 0; + } } void Add(const CompactionStats& c) { @@ -219,6 +260,10 @@ class InternalStats { this->num_input_records += c.num_input_records; this->num_dropped_records += c.num_dropped_records; this->count += c.count; + int num_of_reasons = static_cast(CompactionReason::kNumOfReasons); + for (int i = 0; i< num_of_reasons; i++) { + counts[i] += c.counts[i]; + } } void Subtract(const CompactionStats& c) { @@ -235,6 +280,10 @@ class InternalStats { this->num_input_records -= c.num_input_records; this->num_dropped_records -= c.num_dropped_records; this->count -= c.count; + int num_of_reasons = static_cast(CompactionReason::kNumOfReasons); + for (int i = 0; i < num_of_reasons; i++) { + counts[i] -= c.counts[i]; + } } }; @@ -307,6 +356,10 @@ class InternalStats { bool GetIntPropertyOutOfMutex(const DBPropertyInfo& property_info, Version* version, uint64_t* value); + const std::vector& TEST_GetCompactionStats() const { + return comp_stats_; + } + // Store a mapping from the user-facing DB::Properties string to our // DBPropertyInfo struct used internally for retrieving properties. static const std::unordered_map ppt_name_to_info; @@ -350,8 +403,7 @@ class InternalStats { uint64_t ingest_keys_addfile; // Total number of keys ingested CFStatsSnapshot() - : comp_stats(0), - ingest_bytes_flush(0), + : ingest_bytes_flush(0), stall_count(0), compact_bytes_write(0), compact_bytes_read(0), @@ -543,7 +595,9 @@ class InternalStats { uint64_t num_dropped_records; int count; - explicit CompactionStats(int _count = 0) {} + explicit CompactionStats() {} + + explicit CompactionStats(CompactionReason reason, int c) {} explicit CompactionStats(const CompactionStats& c) {} diff --git a/db/obsolete_files_test.cc b/db/obsolete_files_test.cc index dbee89a644..26bbf5639b 100644 --- a/db/obsolete_files_test.cc +++ b/db/obsolete_files_test.cc @@ -53,7 +53,7 @@ class ObsoleteFilesTest : public testing::Test { options_.max_bytes_for_level_base = 1024*1024*1000; options_.WAL_ttl_seconds = 300; // Used to test log files options_.WAL_size_limit_MB = 1024; // Used to test log files - dbname_ = test::TmpDir() + "/double_deletefile_test"; + dbname_ = test::TmpDir() + "/obsolete_files_test"; options_.wal_dir = dbname_ + "/wal_files"; // clean up all the files that might have been there before diff --git a/include/rocksdb/listener.h b/include/rocksdb/listener.h index 59bada079a..601951cd0f 100644 --- a/include/rocksdb/listener.h +++ b/include/rocksdb/listener.h @@ -55,8 +55,8 @@ struct TableFileCreationInfo : public TableFileCreationBriefInfo { Status status; }; -enum class CompactionReason { - kUnknown, +enum class CompactionReason : int { + kUnknown = 0, // [Level] number of L0 files > level0_file_num_compaction_trigger kLevelL0FilesNum, // [Level] total size of level > MaxBytesForLevel() @@ -80,7 +80,15 @@ enum class CompactionReason { // [Level] Automatic compaction within bottommost level to cleanup duplicate // versions of same user key, usually due to a released snapshot. kBottommostFiles, + // Compaction based on TTL kTtl, + // According to the comments in flush_job.cc, RocksDB treats flush as + // a level 0 compaction in internal stats. + kFlush, + // Compaction caused by external sst file ingestion + kExternalSstIngestion, + // total number of compaction reasons, new reasons must be added above this. + kNumOfReasons, }; enum class FlushReason : int {