diff --git a/db/compaction/compaction_job_test.cc b/db/compaction/compaction_job_test.cc index e286817e6f..f8a78bc9b6 100644 --- a/db/compaction/compaction_job_test.cc +++ b/db/compaction/compaction_job_test.cc @@ -50,7 +50,8 @@ void VerifyInitializationOfCompactionJobStats( ASSERT_EQ(compaction_job_stats.num_output_records, 0U); ASSERT_EQ(compaction_job_stats.num_output_files, 0U); - ASSERT_EQ(compaction_job_stats.is_manual_compaction, true); + ASSERT_TRUE(compaction_job_stats.is_manual_compaction); + ASSERT_FALSE(compaction_job_stats.is_remote_compaction); ASSERT_EQ(compaction_job_stats.total_input_bytes, 0U); ASSERT_EQ(compaction_job_stats.total_output_bytes, 0U); diff --git a/db/compaction/compaction_service_job.cc b/db/compaction/compaction_service_job.cc index 8a8db33627..b34c7e662b 100644 --- a/db/compaction/compaction_service_job.cc +++ b/db/compaction/compaction_service_job.cc @@ -68,8 +68,11 @@ CompactionJob::ProcessKeyValueCompactionWithCompactionService( "[%s] [JOB %d] Starting remote compaction (output level: %d): %s", compaction->column_family_data()->GetName().c_str(), job_id_, compaction_input.output_level, input_files_oss.str().c_str()); - CompactionServiceJobInfo info(dbname_, db_id_, db_session_id_, - GetCompactionId(sub_compact), thread_pri_); + CompactionServiceJobInfo info( + dbname_, db_id_, db_session_id_, GetCompactionId(sub_compact), + thread_pri_, compaction->compaction_reason(), + compaction->is_full_compaction(), compaction->is_manual_compaction(), + compaction->bottommost_level()); CompactionServiceScheduleResponse response = db_options_.compaction_service->Schedule(info, compaction_input_binary); switch (response.status) { @@ -333,6 +336,7 @@ Status CompactionServiceCompactionJob::Run() { // Build compaction result compaction_result_->output_level = compact_->compaction->output_level(); compaction_result_->output_path = output_path_; + compaction_result_->stats.is_remote_compaction = true; for (const auto& output_file : sub_compact->GetOutputs()) { auto& meta = output_file.meta; compaction_result_->output_files.emplace_back( @@ -527,6 +531,10 @@ static std::unordered_map {offsetof(struct CompactionJobStats, is_manual_compaction), OptionType::kBoolean, OptionVerificationType::kNormal, OptionTypeFlags::kNone}}, + {"is_remote_compaction", + {offsetof(struct CompactionJobStats, is_remote_compaction), + OptionType::kBoolean, OptionVerificationType::kNormal, + OptionTypeFlags::kNone}}, {"total_input_bytes", {offsetof(struct CompactionJobStats, total_input_bytes), OptionType::kUInt64T, OptionVerificationType::kNormal, diff --git a/db/compaction/compaction_service_test.cc b/db/compaction/compaction_service_test.cc index 8aacf2b6d2..bb53a4029b 100644 --- a/db/compaction/compaction_service_test.cc +++ b/db/compaction/compaction_service_test.cc @@ -21,8 +21,10 @@ class MyTestCompactionService : public CompactionService { : db_path_(std::move(db_path)), options_(options), statistics_(statistics), - start_info_("na", "na", "na", 0, Env::TOTAL), - wait_info_("na", "na", "na", 0, Env::TOTAL), + start_info_("na", "na", "na", 0, Env::TOTAL, CompactionReason::kUnknown, + false, false, false), + wait_info_("na", "na", "na", 0, Env::TOTAL, CompactionReason::kUnknown, + false, false, false), listeners_(listeners), table_properties_collector_factories_( std::move(table_properties_collector_factories)) {} @@ -97,8 +99,12 @@ class MyTestCompactionService : public CompactionService { Status s = DB::OpenAndCompact(options, db_path_, db_path_ + "/" + scheduled_job_id, compaction_input, result, options_override); - if (is_override_wait_result_) { - *result = override_wait_result_; + { + InstrumentedMutexLock l(&mutex_); + if (is_override_wait_result_) { + *result = override_wait_result_; + } + result_ = *result; } compaction_num_.fetch_add(1); if (s.ok()) { @@ -141,6 +147,10 @@ class MyTestCompactionService : public CompactionService { void SetCanceled(bool canceled) { canceled_ = canceled; } + void GetResult(CompactionServiceResult* deserialized) { + CompactionServiceResult::Read(result_, deserialized).PermitUncheckedError(); + } + CompactionServiceJobStatus GetFinalCompactionServiceJobStatus() { return final_updated_status_.load(); } @@ -162,6 +172,7 @@ class MyTestCompactionService : public CompactionService { CompactionServiceJobStatus override_wait_status_ = CompactionServiceJobStatus::kFailure; bool is_override_wait_result_ = false; + std::string result_; std::string override_wait_result_; std::vector> listeners_; std::vector> @@ -331,6 +342,14 @@ TEST_F(CompactionServiceTest, BasicCompactions) { ReopenWithColumnFamilies({kDefaultColumnFamilyName, "cf_1", "cf_2", "cf_3"}, options); ASSERT_GT(verify_passed, 0); + CompactionServiceResult result; + my_cs->GetResult(&result); + if (s.IsAborted()) { + ASSERT_NOK(result.status); + } else { + ASSERT_OK(result.status); + } + ASSERT_TRUE(result.stats.is_remote_compaction); Close(); } @@ -369,6 +388,12 @@ TEST_F(CompactionServiceTest, ManualCompaction) { ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); ASSERT_GE(my_cs->GetCompactionNum(), comp_num + 1); VerifyTestData(); + + CompactionServiceResult result; + my_cs->GetResult(&result); + ASSERT_OK(result.status); + ASSERT_TRUE(result.stats.is_manual_compaction); + ASSERT_TRUE(result.stats.is_remote_compaction); } TEST_F(CompactionServiceTest, CancelCompactionOnRemoteSide) { @@ -601,11 +626,20 @@ TEST_F(CompactionServiceTest, CompactionInfo) { {file.db_path + "/" + file.name}, 2)); info = my_cs->GetCompactionInfoForStart(); ASSERT_EQ(Env::USER, info.priority); + ASSERT_EQ(CompactionReason::kManualCompaction, info.compaction_reason); + ASSERT_EQ(true, info.is_manual_compaction); + ASSERT_EQ(false, info.is_full_compaction); + ASSERT_EQ(true, info.bottommost_level); info = my_cs->GetCompactionInfoForWait(); ASSERT_EQ(Env::USER, info.priority); + ASSERT_EQ(CompactionReason::kManualCompaction, info.compaction_reason); + ASSERT_EQ(true, info.is_manual_compaction); + ASSERT_EQ(false, info.is_full_compaction); + ASSERT_EQ(true, info.bottommost_level); // Test priority BOTTOM env_->SetBackgroundThreads(1, Env::BOTTOM); + // This will set bottommost_level = true but is_full_compaction = false options.num_levels = 2; ReopenWithCompactionService(&options); my_cs = @@ -628,9 +662,71 @@ TEST_F(CompactionServiceTest, CompactionInfo) { } ASSERT_OK(dbfull()->TEST_WaitForCompact()); info = my_cs->GetCompactionInfoForStart(); + ASSERT_EQ(CompactionReason::kLevelL0FilesNum, info.compaction_reason); + ASSERT_EQ(false, info.is_manual_compaction); + ASSERT_EQ(false, info.is_full_compaction); + ASSERT_EQ(true, info.bottommost_level); ASSERT_EQ(Env::BOTTOM, info.priority); info = my_cs->GetCompactionInfoForWait(); ASSERT_EQ(Env::BOTTOM, info.priority); + ASSERT_EQ(CompactionReason::kLevelL0FilesNum, info.compaction_reason); + ASSERT_EQ(false, info.is_manual_compaction); + ASSERT_EQ(false, info.is_full_compaction); + ASSERT_EQ(true, info.bottommost_level); + + // Test Non-Bottommost Level + options.num_levels = 4; + ReopenWithCompactionService(&options); + my_cs = + static_cast_with_check(GetCompactionService()); + + for (int i = 0; i < options.level0_file_num_compaction_trigger; i++) { + for (int j = 0; j < 10; j++) { + int key_id = i * 10 + j; + ASSERT_OK(Put(Key(key_id), "value_new_new" + std::to_string(key_id))); + } + ASSERT_OK(Flush()); + } + + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + info = my_cs->GetCompactionInfoForStart(); + ASSERT_EQ(false, info.is_manual_compaction); + ASSERT_EQ(false, info.is_full_compaction); + ASSERT_EQ(false, info.bottommost_level); + info = my_cs->GetCompactionInfoForWait(); + ASSERT_EQ(false, info.is_manual_compaction); + ASSERT_EQ(false, info.is_full_compaction); + ASSERT_EQ(false, info.bottommost_level); + + // Test Full Compaction + Bottommost Level + options.num_levels = 6; + ReopenWithCompactionService(&options); + my_cs = + static_cast_with_check(GetCompactionService()); + + for (int i = 0; i < 20; i++) { + for (int j = 0; j < 10; j++) { + int key_id = i * 10 + j; + ASSERT_OK(Put(Key(key_id), "value_new_new" + std::to_string(key_id))); + } + ASSERT_OK(Flush()); + } + + CompactRangeOptions cro; + cro.bottommost_level_compaction = BottommostLevelCompaction::kForce; + ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); + + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + info = my_cs->GetCompactionInfoForStart(); + ASSERT_EQ(true, info.is_manual_compaction); + ASSERT_EQ(true, info.is_full_compaction); + ASSERT_EQ(true, info.bottommost_level); + ASSERT_EQ(CompactionReason::kManualCompaction, info.compaction_reason); + info = my_cs->GetCompactionInfoForWait(); + ASSERT_EQ(true, info.is_manual_compaction); + ASSERT_EQ(true, info.is_full_compaction); + ASSERT_EQ(true, info.bottommost_level); + ASSERT_EQ(CompactionReason::kManualCompaction, info.compaction_reason); } TEST_F(CompactionServiceTest, FallbackLocalAuto) { diff --git a/include/rocksdb/compaction_job_stats.h b/include/rocksdb/compaction_job_stats.h index 7e81530443..8ddb6a330d 100644 --- a/include/rocksdb/compaction_job_stats.h +++ b/include/rocksdb/compaction_job_stats.h @@ -19,80 +19,83 @@ struct CompactionJobStats { void Add(const CompactionJobStats& stats); // the elapsed time of this compaction in microseconds. - uint64_t elapsed_micros; + uint64_t elapsed_micros = 0; // the elapsed CPU time of this compaction in microseconds. - uint64_t cpu_micros; + uint64_t cpu_micros = 0; // Used internally indicating whether a subcompaction's // `num_input_records` is accurate. - bool has_num_input_records; + bool has_num_input_records = false; // the number of compaction input records. - uint64_t num_input_records; + uint64_t num_input_records = 0; // the number of blobs read from blob files - uint64_t num_blobs_read; + uint64_t num_blobs_read = 0; // the number of compaction input files (table files) - size_t num_input_files; + size_t num_input_files = 0; // the number of compaction input files at the output level (table files) - size_t num_input_files_at_output_level; + size_t num_input_files_at_output_level = 0; // the number of compaction output records. - uint64_t num_output_records; + uint64_t num_output_records = 0; // the number of compaction output files (table files) - size_t num_output_files; + size_t num_output_files = 0; // the number of compaction output files (blob files) - size_t num_output_files_blob; + size_t num_output_files_blob = 0; // true if the compaction is a full compaction (all live SST files input) - bool is_full_compaction; + bool is_full_compaction = false; // true if the compaction is a manual compaction - bool is_manual_compaction; + bool is_manual_compaction = false; + // true if the compaction ran in a remote worker + bool is_remote_compaction = false; // the total size of table files in the compaction input - uint64_t total_input_bytes; + uint64_t total_input_bytes = 0; // the total size of blobs read from blob files - uint64_t total_blob_bytes_read; + uint64_t total_blob_bytes_read = 0; // the total size of table files in the compaction output - uint64_t total_output_bytes; + uint64_t total_output_bytes = 0; // the total size of blob files in the compaction output - uint64_t total_output_bytes_blob; + uint64_t total_output_bytes_blob = 0; + ; // number of records being replaced by newer record associated with same key. // this could be a new value or a deletion entry for that key so this field // sums up all updated and deleted keys - uint64_t num_records_replaced; + uint64_t num_records_replaced = 0; // the sum of the uncompressed input keys in bytes. - uint64_t total_input_raw_key_bytes; + uint64_t total_input_raw_key_bytes = 0; // the sum of the uncompressed input values in bytes. - uint64_t total_input_raw_value_bytes; + uint64_t total_input_raw_value_bytes = 0; // the number of deletion entries before compaction. Deletion entries // can disappear after compaction because they expired - uint64_t num_input_deletion_records; + uint64_t num_input_deletion_records = 0; // number of deletion records that were found obsolete and discarded // because it is not possible to delete any more keys with this entry // (i.e. all possible deletions resulting from it have been completed) - uint64_t num_expired_deletion_records; + uint64_t num_expired_deletion_records = 0; // number of corrupt keys (ParseInternalKey returned false when applied to // the key) encountered and written out. - uint64_t num_corrupt_keys; + uint64_t num_corrupt_keys = 0; // Following counters are only populated if // options.report_bg_io_stats = true; // Time spent on file's Append() call. - uint64_t file_write_nanos; + uint64_t file_write_nanos = 0; // Time spent on sync file range. - uint64_t file_range_sync_nanos; + uint64_t file_range_sync_nanos = 0; // Time spent on file fsync. - uint64_t file_fsync_nanos; + uint64_t file_fsync_nanos = 0; // Time spent on preparing file write (fallocate, etc) - uint64_t file_prepare_write_nanos; + uint64_t file_prepare_write_nanos = 0; // 0-terminated strings storing the first 8 bytes of the smallest and // largest key in the output. @@ -102,10 +105,10 @@ struct CompactionJobStats { std::string largest_output_key_prefix; // number of single-deletes which do not meet a put - uint64_t num_single_del_fallthru; + uint64_t num_single_del_fallthru = 0; // number of single-deletes which meet something other than a put - uint64_t num_single_del_mismatch; + uint64_t num_single_del_mismatch = 0; // TODO: Add output_to_penultimate_level output information }; diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index e272e3a69a..9700e25af5 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -466,14 +466,27 @@ struct CompactionServiceJobInfo { Env::Priority priority; + // Additional Compaction Details that can be useful in the CompactionService + CompactionReason compaction_reason; + bool is_full_compaction; + bool is_manual_compaction; + bool bottommost_level; + CompactionServiceJobInfo(std::string db_name_, std::string db_id_, std::string db_session_id_, uint64_t job_id_, - Env::Priority priority_) + Env::Priority priority_, + CompactionReason compaction_reason_, + bool is_full_compaction_, bool is_manual_compaction_, + bool bottommost_level_) : db_name(std::move(db_name_)), db_id(std::move(db_id_)), db_session_id(std::move(db_session_id_)), job_id(job_id_), - priority(priority_) {} + priority(priority_), + compaction_reason(compaction_reason_), + is_full_compaction(is_full_compaction_), + is_manual_compaction(is_manual_compaction_), + bottommost_level(bottommost_level_) {} }; struct CompactionServiceScheduleResponse { diff --git a/util/compaction_job_stats_impl.cc b/util/compaction_job_stats_impl.cc index cdb591f23c..37e39987e0 100644 --- a/util/compaction_job_stats_impl.cc +++ b/util/compaction_job_stats_impl.cc @@ -24,6 +24,7 @@ void CompactionJobStats::Reset() { is_full_compaction = false; is_manual_compaction = false; + is_remote_compaction = false; total_input_bytes = 0; total_blob_bytes_read = 0;