From 57a8e69d4e75c317f6ab02d6fc2fdd404b366794 Mon Sep 17 00:00:00 2001 From: Jay Huh Date: Fri, 25 Oct 2024 13:13:12 -0700 Subject: [PATCH] Include TableProperties in the CompactionServiceResult (#13089) Summary: In Remote Compactions, the primary host receives the serialized compaction result from the remote worker and deserializes it to build the output. Unlike Local Compactions, where table properties are built by TableBuilder, in Remote Compactions, these properties were not included in the serialized compaction result. This was likely done intentionally since the table properties are already available in the SST files. Because TableProperties are not populated as part of CompactionOutputs for remote compactions, we were unable to log the table properties in OnCompactionComplete and use them for verification. We are adding the TableProperties as part of the CompactionServiceOutputFile in this PR. By including the TableProperties in the serialized compaction result, the primary host will be able to access them and verify that they match the values read from the actual SST files. We are also adding the populating `format_version` in table_properties of in TableBuilder. This has not been a big issue because the `format_version` is written to the SST files directly from `TableOptions.format_version`. When loaded from the SST files, it's populated directly by reading from the MetaBlock. This info has only been missing in the TableBuilder's Rep.props. Pull Request resolved: https://github.com/facebook/rocksdb/pull/13089 Test Plan: ``` ./compaction_job_test ``` ``` ./compaction_service_test ``` Reviewed By: pdillinger Differential Revision: D64878740 Pulled By: jaykorean fbshipit-source-id: b6f2fdce851e6477ecb4dd5a87cdc62e176b746b --- db/compaction/compaction_job.h | 7 +- db/compaction/compaction_job_test.cc | 31 +++- db/compaction/compaction_outputs.h | 6 + db/compaction/compaction_service_job.cc | 162 +++++++++++++++++- include/rocksdb/utilities/options_type.h | 58 +++++++ .../block_based/block_based_table_builder.cc | 1 + 6 files changed, 261 insertions(+), 4 deletions(-) diff --git a/db/compaction/compaction_job.h b/db/compaction/compaction_job.h index e266babda5..c519b59590 100644 --- a/db/compaction/compaction_job.h +++ b/db/compaction/compaction_job.h @@ -424,6 +424,7 @@ struct CompactionServiceOutputFile { uint64_t paranoid_hash; bool marked_for_compaction; UniqueId64x2 unique_id{}; + TableProperties table_properties; CompactionServiceOutputFile() = default; CompactionServiceOutputFile( @@ -432,7 +433,8 @@ struct CompactionServiceOutputFile { uint64_t _oldest_ancester_time, uint64_t _file_creation_time, uint64_t _epoch_number, const std::string& _file_checksum, const std::string& _file_checksum_func_name, uint64_t _paranoid_hash, - bool _marked_for_compaction, UniqueId64x2 _unique_id) + bool _marked_for_compaction, UniqueId64x2 _unique_id, + const std::shared_ptr& _table_properties) : file_name(name), smallest_seqno(smallest), largest_seqno(largest), @@ -445,7 +447,8 @@ struct CompactionServiceOutputFile { file_checksum_func_name(_file_checksum_func_name), paranoid_hash(_paranoid_hash), marked_for_compaction(_marked_for_compaction), - unique_id(std::move(_unique_id)) {} + unique_id(std::move(_unique_id)), + table_properties(*_table_properties.get()) {} }; // CompactionServiceResult contains the compaction result from a different db diff --git a/db/compaction/compaction_job_test.cc b/db/compaction/compaction_job_test.cc index 9181fef1d2..cf981cc95a 100644 --- a/db/compaction/compaction_job_test.cc +++ b/db/compaction/compaction_job_test.cc @@ -1662,6 +1662,19 @@ TEST_F(CompactionJobTest, ResultSerialization) { std::string file_checksum = rnd.RandomBinaryString(rnd.Uniform(kStrMaxLen)); std::string file_checksum_func_name = "MyAwesomeChecksumGenerator"; while (!rnd.OneIn(10)) { + TableProperties tp; + tp.user_collected_properties.emplace( + "UCP_Key1", rnd.RandomString(rnd.Uniform(kStrMaxLen))); + tp.user_collected_properties.emplace( + "UCP_Key2", rnd.RandomString(rnd.Uniform(kStrMaxLen))); + tp.readable_properties.emplace("RP_Key1", + rnd.RandomString(rnd.Uniform(kStrMaxLen))); + tp.readable_properties.emplace("RP_K2y2", + rnd.RandomString(rnd.Uniform(kStrMaxLen))); + + std::shared_ptr table_properties = + std::make_shared(tp); + UniqueId64x2 id{rnd64.Uniform(UINT64_MAX), rnd64.Uniform(UINT64_MAX)}; result.output_files.emplace_back( rnd.RandomString(rnd.Uniform(kStrMaxLen)) /* file_name */, @@ -1677,7 +1690,8 @@ TEST_F(CompactionJobTest, ResultSerialization) { file_checksum /* file_checksum */, file_checksum_func_name /* file_checksum_func_name */, rnd64.Uniform(UINT64_MAX) /* paranoid_hash */, - rnd.OneIn(2) /* marked_for_compaction */, id); + rnd.OneIn(2) /* marked_for_compaction */, id /* unique_id */, + table_properties); } result.output_level = rnd.Uniform(10); result.output_path = rnd.RandomString(rnd.Uniform(kStrMaxLen)); @@ -1698,6 +1712,21 @@ TEST_F(CompactionJobTest, ResultSerialization) { ASSERT_OK(CompactionServiceResult::Read(output, &deserialized1)); ASSERT_TRUE(deserialized1.TEST_Equals(&result)); + for (size_t i = 0; i < result.output_files.size(); i++) { + for (const auto& prop : + result.output_files[i].table_properties.user_collected_properties) { + ASSERT_EQ(deserialized1.output_files[i] + .table_properties.user_collected_properties[prop.first], + prop.second); + } + for (const auto& prop : + result.output_files[i].table_properties.readable_properties) { + ASSERT_EQ(deserialized1.output_files[i] + .table_properties.readable_properties[prop.first], + prop.second); + } + } + // Test mismatch deserialized1.stats.num_input_files += 10; std::string mismatch; diff --git a/db/compaction/compaction_outputs.h b/db/compaction/compaction_outputs.h index 51d378ff01..a3c3552ed2 100644 --- a/db/compaction/compaction_outputs.h +++ b/db/compaction/compaction_outputs.h @@ -108,6 +108,12 @@ class CompactionOutputs { Status Finish(const Status& intput_status, const SeqnoToTimeMapping& seqno_to_time_mapping); + // Update output table properties from already populated TableProperties. + // Used for remote compaction + void UpdateTableProperties(const TableProperties& table_properties) { + current_output().table_properties = + std::make_shared(table_properties); + } // Update output table properties from table builder void UpdateTableProperties() { current_output().table_properties = diff --git a/db/compaction/compaction_service_job.cc b/db/compaction/compaction_service_job.cc index a39dff3f9f..ff6dd91822 100644 --- a/db/compaction/compaction_service_job.cc +++ b/db/compaction/compaction_service_job.cc @@ -212,6 +212,7 @@ CompactionJob::ProcessKeyValueCompactionWithCompactionService( sub_compact->Current().AddOutput(std::move(meta), cfd->internal_comparator(), false, true, file.paranoid_hash); + sub_compact->Current().UpdateTableProperties(file.table_properties); } sub_compact->compaction_job_stats = compaction_result.stats; sub_compact->Current().SetNumOutputRecords( @@ -383,7 +384,8 @@ Status CompactionServiceCompactionJob::Run() { meta.largest.Encode().ToString(), meta.oldest_ancester_time, meta.file_creation_time, meta.epoch_number, meta.file_checksum, meta.file_checksum_func_name, output_file.validator.GetHash(), - meta.marked_for_compaction, meta.unique_id); + meta.marked_for_compaction, meta.unique_id, + output_file.table_properties); } TEST_SYNC_POINT_CALLBACK("CompactionServiceCompactionJob::Run:0", @@ -475,6 +477,159 @@ static std::unordered_map cs_input_type_info = { OptionType::kUInt64T, OptionVerificationType::kNormal, OptionTypeFlags::kNone}}, }; +static std::unordered_map + table_properties_type_info = { + {"orig_file_number", + {offsetof(struct TableProperties, orig_file_number), + OptionType::kUInt64T, OptionVerificationType::kNormal, + OptionTypeFlags::kNone}}, + {"data_size", + {offsetof(struct TableProperties, data_size), OptionType::kUInt64T, + OptionVerificationType::kNormal, OptionTypeFlags::kNone}}, + {"index_size", + {offsetof(struct TableProperties, index_size), OptionType::kUInt64T, + OptionVerificationType::kNormal, OptionTypeFlags::kNone}}, + {"index_partitions", + {offsetof(struct TableProperties, index_partitions), + OptionType::kUInt64T, OptionVerificationType::kNormal, + OptionTypeFlags::kNone}}, + {"top_level_index_size", + {offsetof(struct TableProperties, top_level_index_size), + OptionType::kUInt64T, OptionVerificationType::kNormal, + OptionTypeFlags::kNone}}, + {"index_key_is_user_key", + {offsetof(struct TableProperties, index_key_is_user_key), + OptionType::kUInt64T, OptionVerificationType::kNormal, + OptionTypeFlags::kNone}}, + {"index_value_is_delta_encoded", + {offsetof(struct TableProperties, index_value_is_delta_encoded), + OptionType::kUInt64T, OptionVerificationType::kNormal, + OptionTypeFlags::kNone}}, + {"filter_size", + {offsetof(struct TableProperties, filter_size), OptionType::kUInt64T, + OptionVerificationType::kNormal, OptionTypeFlags::kNone}}, + {"raw_key_size", + {offsetof(struct TableProperties, raw_key_size), OptionType::kUInt64T, + OptionVerificationType::kNormal, OptionTypeFlags::kNone}}, + {"raw_value_size", + {offsetof(struct TableProperties, raw_value_size), + OptionType::kUInt64T, OptionVerificationType::kNormal, + OptionTypeFlags::kNone}}, + {"num_data_blocks", + {offsetof(struct TableProperties, num_data_blocks), + OptionType::kUInt64T, OptionVerificationType::kNormal, + OptionTypeFlags::kNone}}, + {"num_entries", + {offsetof(struct TableProperties, num_entries), OptionType::kUInt64T, + OptionVerificationType::kNormal, OptionTypeFlags::kNone}}, + {"num_filter_entries", + {offsetof(struct TableProperties, num_filter_entries), + OptionType::kUInt64T, OptionVerificationType::kNormal, + OptionTypeFlags::kNone}}, + {"num_deletions", + {offsetof(struct TableProperties, num_deletions), OptionType::kUInt64T, + OptionVerificationType::kNormal, OptionTypeFlags::kNone}}, + {"num_merge_operands", + {offsetof(struct TableProperties, num_merge_operands), + OptionType::kUInt64T, OptionVerificationType::kNormal, + OptionTypeFlags::kNone}}, + {"num_range_deletions", + {offsetof(struct TableProperties, num_range_deletions), + OptionType::kUInt64T, OptionVerificationType::kNormal, + OptionTypeFlags::kNone}}, + {"format_version", + {offsetof(struct TableProperties, format_version), + OptionType::kUInt64T, OptionVerificationType::kNormal, + OptionTypeFlags::kNone}}, + {"fixed_key_len", + {offsetof(struct TableProperties, fixed_key_len), OptionType::kUInt64T, + OptionVerificationType::kNormal, OptionTypeFlags::kNone}}, + {"column_family_id", + {offsetof(struct TableProperties, column_family_id), + OptionType::kUInt64T, OptionVerificationType::kNormal, + OptionTypeFlags::kNone}}, + {"creation_time", + {offsetof(struct TableProperties, creation_time), OptionType::kUInt64T, + OptionVerificationType::kNormal, OptionTypeFlags::kNone}}, + {"oldest_key_time", + {offsetof(struct TableProperties, oldest_key_time), + OptionType::kUInt64T, OptionVerificationType::kNormal, + OptionTypeFlags::kNone}}, + {"file_creation_time", + {offsetof(struct TableProperties, file_creation_time), + OptionType::kUInt64T, OptionVerificationType::kNormal, + OptionTypeFlags::kNone}}, + {"slow_compression_estimated_data_size", + {offsetof(struct TableProperties, + slow_compression_estimated_data_size), + OptionType::kUInt64T, OptionVerificationType::kNormal, + OptionTypeFlags::kNone}}, + {"fast_compression_estimated_data_size", + {offsetof(struct TableProperties, + fast_compression_estimated_data_size), + OptionType::kUInt64T, OptionVerificationType::kNormal, + OptionTypeFlags::kNone}}, + {"external_sst_file_global_seqno_offset", + {offsetof(struct TableProperties, + external_sst_file_global_seqno_offset), + OptionType::kUInt64T, OptionVerificationType::kNormal, + OptionTypeFlags::kNone}}, + {"tail_start_offset", + {offsetof(struct TableProperties, tail_start_offset), + OptionType::kUInt64T, OptionVerificationType::kNormal, + OptionTypeFlags::kNone}}, + {"user_defined_timestamps_persisted", + {offsetof(struct TableProperties, user_defined_timestamps_persisted), + OptionType::kUInt64T, OptionVerificationType::kNormal, + OptionTypeFlags::kNone}}, + {"key_largest_seqno", + {offsetof(struct TableProperties, key_largest_seqno), + OptionType::kUInt64T, OptionVerificationType::kNormal, + OptionTypeFlags::kNone}}, + {"db_id", + {offsetof(struct TableProperties, db_id), OptionType::kEncodedString}}, + {"db_session_id", + {offsetof(struct TableProperties, db_session_id), + OptionType::kEncodedString}}, + {"db_host_id", + {offsetof(struct TableProperties, db_host_id), + OptionType::kEncodedString}}, + {"column_family_name", + {offsetof(struct TableProperties, column_family_name), + OptionType::kEncodedString}}, + {"filter_policy_name", + {offsetof(struct TableProperties, filter_policy_name), + OptionType::kEncodedString}}, + {"comparator_name", + {offsetof(struct TableProperties, comparator_name), + OptionType::kEncodedString}}, + {"merge_operator_name", + {offsetof(struct TableProperties, merge_operator_name), + OptionType::kEncodedString}}, + {"prefix_extractor_name", + {offsetof(struct TableProperties, prefix_extractor_name), + OptionType::kEncodedString}}, + {"property_collectors_names", + {offsetof(struct TableProperties, property_collectors_names), + OptionType::kEncodedString}}, + {"compression_name", + {offsetof(struct TableProperties, compression_name), + OptionType::kEncodedString}}, + {"compression_options", + {offsetof(struct TableProperties, compression_options), + OptionType::kEncodedString}}, + {"seqno_to_time_mapping", + {offsetof(struct TableProperties, seqno_to_time_mapping), + OptionType::kEncodedString}}, + {"user_collected_properties", + OptionTypeInfo::StringMap( + offsetof(struct TableProperties, user_collected_properties), + OptionVerificationType::kNormal, OptionTypeFlags::kNone)}, + {"readable_properties", + OptionTypeInfo::StringMap( + offsetof(struct TableProperties, readable_properties), + OptionVerificationType::kNormal, OptionTypeFlags::kNone)}, +}; static std::unordered_map cs_output_file_type_info = { @@ -531,6 +686,11 @@ static std::unordered_map offsetof(struct CompactionServiceOutputFile, unique_id), OptionVerificationType::kNormal, OptionTypeFlags::kNone, {0, OptionType::kUInt64T})}, + {"table_properties", + OptionTypeInfo::Struct( + "table_properties", &table_properties_type_info, + offsetof(struct CompactionServiceOutputFile, table_properties), + OptionVerificationType::kNormal, OptionTypeFlags::kNone)}, }; static std::unordered_map diff --git a/include/rocksdb/utilities/options_type.h b/include/rocksdb/utilities/options_type.h index c34c860167..9feb61fa9c 100644 --- a/include/rocksdb/utilities/options_type.h +++ b/include/rocksdb/utilities/options_type.h @@ -58,6 +58,7 @@ enum class OptionType { kEncodedString, kTemperature, kArray, + kStringMap, // Map of kUnknown, }; @@ -441,6 +442,63 @@ class OptionTypeInfo { return info; } + static OptionTypeInfo StringMap(int _offset, + OptionVerificationType _verification, + OptionTypeFlags _flags, + char kv_separator = '=', + char item_separator = ';') { + OptionTypeInfo info(_offset, OptionType::kStringMap, _verification, _flags); + info.SetParseFunc( + [kv_separator, item_separator](const ConfigOptions&, const std::string&, + const std::string& value, void* addr) { + std::map map; + Status s; + for (size_t start = 0, end = 0; + s.ok() && start < value.size() && end != std::string::npos; + start = end + 1) { + std::string token; + s = OptionTypeInfo::NextToken(value, item_separator, start, &end, + &token); + if (s.ok() && !token.empty()) { + size_t pos = token.find(kv_separator); + assert(pos != std::string::npos); + std::string k = token.substr(0, pos); + std::string v = token.substr(pos + 1); + std::string decoded_key; + std::string decoded_value; + (Slice(k)).DecodeHex(&decoded_key); + (Slice(v)).DecodeHex(&decoded_value); + map.emplace(std::move(decoded_key), std::move(decoded_value)); + } + } + if (s.ok()) { + *(static_cast*>(addr)) = map; + } + return s; + }); + info.SetSerializeFunc( + [kv_separator, item_separator](const ConfigOptions&, const std::string&, + const void* addr, std::string* value) { + const auto map = + static_cast*>(addr); + value->append("{"); + for (const auto& entry : *map) { + value->append(Slice(entry.first).ToString(true)); + *value += kv_separator; + value->append(Slice(entry.second).ToString(true)); + *value += item_separator; + } + value->append("}"); + return Status::OK(); + }); + info.SetEqualsFunc([](const ConfigOptions&, const std::string&, + const void* addr1, const void* addr2, std::string*) { + return (*static_cast*>(addr1) == + *static_cast*>(addr2)); + }); + return info; + } + // Create a new std::shared_ptr OptionTypeInfo // This function will call the T::CreateFromString method to create a new // std::shared_ptr object. diff --git a/table/block_based/block_based_table_builder.cc b/table/block_based/block_based_table_builder.cc index ec24721b7a..ec4a695b63 100644 --- a/table/block_based/block_based_table_builder.cc +++ b/table/block_based/block_based_table_builder.cc @@ -624,6 +624,7 @@ struct BlockBasedTableBuilder::Rep { props.db_id = tbo.db_id; props.db_session_id = tbo.db_session_id; props.db_host_id = ioptions.db_host_id; + props.format_version = table_options.format_version; if (!ReifyDbHostIdProperty(ioptions.env, &props.db_host_id).ok()) { ROCKS_LOG_INFO(ioptions.logger, "db_host_id property will not be set"); }