diff --git a/db/compaction/compaction_job.h b/db/compaction/compaction_job.h index e266babda5..c519b59590 100644 --- a/db/compaction/compaction_job.h +++ b/db/compaction/compaction_job.h @@ -424,6 +424,7 @@ struct CompactionServiceOutputFile { uint64_t paranoid_hash; bool marked_for_compaction; UniqueId64x2 unique_id{}; + TableProperties table_properties; CompactionServiceOutputFile() = default; CompactionServiceOutputFile( @@ -432,7 +433,8 @@ struct CompactionServiceOutputFile { uint64_t _oldest_ancester_time, uint64_t _file_creation_time, uint64_t _epoch_number, const std::string& _file_checksum, const std::string& _file_checksum_func_name, uint64_t _paranoid_hash, - bool _marked_for_compaction, UniqueId64x2 _unique_id) + bool _marked_for_compaction, UniqueId64x2 _unique_id, + const std::shared_ptr& _table_properties) : file_name(name), smallest_seqno(smallest), largest_seqno(largest), @@ -445,7 +447,8 @@ struct CompactionServiceOutputFile { file_checksum_func_name(_file_checksum_func_name), paranoid_hash(_paranoid_hash), marked_for_compaction(_marked_for_compaction), - unique_id(std::move(_unique_id)) {} + unique_id(std::move(_unique_id)), + table_properties(*_table_properties.get()) {} }; // CompactionServiceResult contains the compaction result from a different db diff --git a/db/compaction/compaction_job_test.cc b/db/compaction/compaction_job_test.cc index 9181fef1d2..cf981cc95a 100644 --- a/db/compaction/compaction_job_test.cc +++ b/db/compaction/compaction_job_test.cc @@ -1662,6 +1662,19 @@ TEST_F(CompactionJobTest, ResultSerialization) { std::string file_checksum = rnd.RandomBinaryString(rnd.Uniform(kStrMaxLen)); std::string file_checksum_func_name = "MyAwesomeChecksumGenerator"; while (!rnd.OneIn(10)) { + TableProperties tp; + tp.user_collected_properties.emplace( + "UCP_Key1", rnd.RandomString(rnd.Uniform(kStrMaxLen))); + tp.user_collected_properties.emplace( + "UCP_Key2", rnd.RandomString(rnd.Uniform(kStrMaxLen))); + tp.readable_properties.emplace("RP_Key1", + rnd.RandomString(rnd.Uniform(kStrMaxLen))); + tp.readable_properties.emplace("RP_K2y2", + rnd.RandomString(rnd.Uniform(kStrMaxLen))); + + std::shared_ptr table_properties = + std::make_shared(tp); + UniqueId64x2 id{rnd64.Uniform(UINT64_MAX), rnd64.Uniform(UINT64_MAX)}; result.output_files.emplace_back( rnd.RandomString(rnd.Uniform(kStrMaxLen)) /* file_name */, @@ -1677,7 +1690,8 @@ TEST_F(CompactionJobTest, ResultSerialization) { file_checksum /* file_checksum */, file_checksum_func_name /* file_checksum_func_name */, rnd64.Uniform(UINT64_MAX) /* paranoid_hash */, - rnd.OneIn(2) /* marked_for_compaction */, id); + rnd.OneIn(2) /* marked_for_compaction */, id /* unique_id */, + table_properties); } result.output_level = rnd.Uniform(10); result.output_path = rnd.RandomString(rnd.Uniform(kStrMaxLen)); @@ -1698,6 +1712,21 @@ TEST_F(CompactionJobTest, ResultSerialization) { ASSERT_OK(CompactionServiceResult::Read(output, &deserialized1)); ASSERT_TRUE(deserialized1.TEST_Equals(&result)); + for (size_t i = 0; i < result.output_files.size(); i++) { + for (const auto& prop : + result.output_files[i].table_properties.user_collected_properties) { + ASSERT_EQ(deserialized1.output_files[i] + .table_properties.user_collected_properties[prop.first], + prop.second); + } + for (const auto& prop : + result.output_files[i].table_properties.readable_properties) { + ASSERT_EQ(deserialized1.output_files[i] + .table_properties.readable_properties[prop.first], + prop.second); + } + } + // Test mismatch deserialized1.stats.num_input_files += 10; std::string mismatch; diff --git a/db/compaction/compaction_outputs.h b/db/compaction/compaction_outputs.h index 51d378ff01..a3c3552ed2 100644 --- a/db/compaction/compaction_outputs.h +++ b/db/compaction/compaction_outputs.h @@ -108,6 +108,12 @@ class CompactionOutputs { Status Finish(const Status& intput_status, const SeqnoToTimeMapping& seqno_to_time_mapping); + // Update output table properties from already populated TableProperties. + // Used for remote compaction + void UpdateTableProperties(const TableProperties& table_properties) { + current_output().table_properties = + std::make_shared(table_properties); + } // Update output table properties from table builder void UpdateTableProperties() { current_output().table_properties = diff --git a/db/compaction/compaction_service_job.cc b/db/compaction/compaction_service_job.cc index a39dff3f9f..ff6dd91822 100644 --- a/db/compaction/compaction_service_job.cc +++ b/db/compaction/compaction_service_job.cc @@ -212,6 +212,7 @@ CompactionJob::ProcessKeyValueCompactionWithCompactionService( sub_compact->Current().AddOutput(std::move(meta), cfd->internal_comparator(), false, true, file.paranoid_hash); + sub_compact->Current().UpdateTableProperties(file.table_properties); } sub_compact->compaction_job_stats = compaction_result.stats; sub_compact->Current().SetNumOutputRecords( @@ -383,7 +384,8 @@ Status CompactionServiceCompactionJob::Run() { meta.largest.Encode().ToString(), meta.oldest_ancester_time, meta.file_creation_time, meta.epoch_number, meta.file_checksum, meta.file_checksum_func_name, output_file.validator.GetHash(), - meta.marked_for_compaction, meta.unique_id); + meta.marked_for_compaction, meta.unique_id, + output_file.table_properties); } TEST_SYNC_POINT_CALLBACK("CompactionServiceCompactionJob::Run:0", @@ -475,6 +477,159 @@ static std::unordered_map cs_input_type_info = { OptionType::kUInt64T, OptionVerificationType::kNormal, OptionTypeFlags::kNone}}, }; +static std::unordered_map + table_properties_type_info = { + {"orig_file_number", + {offsetof(struct TableProperties, orig_file_number), + OptionType::kUInt64T, OptionVerificationType::kNormal, + OptionTypeFlags::kNone}}, + {"data_size", + {offsetof(struct TableProperties, data_size), OptionType::kUInt64T, + OptionVerificationType::kNormal, OptionTypeFlags::kNone}}, + {"index_size", + {offsetof(struct TableProperties, index_size), OptionType::kUInt64T, + OptionVerificationType::kNormal, OptionTypeFlags::kNone}}, + {"index_partitions", + {offsetof(struct TableProperties, index_partitions), + OptionType::kUInt64T, OptionVerificationType::kNormal, + OptionTypeFlags::kNone}}, + {"top_level_index_size", + {offsetof(struct TableProperties, top_level_index_size), + OptionType::kUInt64T, OptionVerificationType::kNormal, + OptionTypeFlags::kNone}}, + {"index_key_is_user_key", + {offsetof(struct TableProperties, index_key_is_user_key), + OptionType::kUInt64T, OptionVerificationType::kNormal, + OptionTypeFlags::kNone}}, + {"index_value_is_delta_encoded", + {offsetof(struct TableProperties, index_value_is_delta_encoded), + OptionType::kUInt64T, OptionVerificationType::kNormal, + OptionTypeFlags::kNone}}, + {"filter_size", + {offsetof(struct TableProperties, filter_size), OptionType::kUInt64T, + OptionVerificationType::kNormal, OptionTypeFlags::kNone}}, + {"raw_key_size", + {offsetof(struct TableProperties, raw_key_size), OptionType::kUInt64T, + OptionVerificationType::kNormal, OptionTypeFlags::kNone}}, + {"raw_value_size", + {offsetof(struct TableProperties, raw_value_size), + OptionType::kUInt64T, OptionVerificationType::kNormal, + OptionTypeFlags::kNone}}, + {"num_data_blocks", + {offsetof(struct TableProperties, num_data_blocks), + OptionType::kUInt64T, OptionVerificationType::kNormal, + OptionTypeFlags::kNone}}, + {"num_entries", + {offsetof(struct TableProperties, num_entries), OptionType::kUInt64T, + OptionVerificationType::kNormal, OptionTypeFlags::kNone}}, + {"num_filter_entries", + {offsetof(struct TableProperties, num_filter_entries), + OptionType::kUInt64T, OptionVerificationType::kNormal, + OptionTypeFlags::kNone}}, + {"num_deletions", + {offsetof(struct TableProperties, num_deletions), OptionType::kUInt64T, + OptionVerificationType::kNormal, OptionTypeFlags::kNone}}, + {"num_merge_operands", + {offsetof(struct TableProperties, num_merge_operands), + OptionType::kUInt64T, OptionVerificationType::kNormal, + OptionTypeFlags::kNone}}, + {"num_range_deletions", + {offsetof(struct TableProperties, num_range_deletions), + OptionType::kUInt64T, OptionVerificationType::kNormal, + OptionTypeFlags::kNone}}, + {"format_version", + {offsetof(struct TableProperties, format_version), + OptionType::kUInt64T, OptionVerificationType::kNormal, + OptionTypeFlags::kNone}}, + {"fixed_key_len", + {offsetof(struct TableProperties, fixed_key_len), OptionType::kUInt64T, + OptionVerificationType::kNormal, OptionTypeFlags::kNone}}, + {"column_family_id", + {offsetof(struct TableProperties, column_family_id), + OptionType::kUInt64T, OptionVerificationType::kNormal, + OptionTypeFlags::kNone}}, + {"creation_time", + {offsetof(struct TableProperties, creation_time), OptionType::kUInt64T, + OptionVerificationType::kNormal, OptionTypeFlags::kNone}}, + {"oldest_key_time", + {offsetof(struct TableProperties, oldest_key_time), + OptionType::kUInt64T, OptionVerificationType::kNormal, + OptionTypeFlags::kNone}}, + {"file_creation_time", + {offsetof(struct TableProperties, file_creation_time), + OptionType::kUInt64T, OptionVerificationType::kNormal, + OptionTypeFlags::kNone}}, + {"slow_compression_estimated_data_size", + {offsetof(struct TableProperties, + slow_compression_estimated_data_size), + OptionType::kUInt64T, OptionVerificationType::kNormal, + OptionTypeFlags::kNone}}, + {"fast_compression_estimated_data_size", + {offsetof(struct TableProperties, + fast_compression_estimated_data_size), + OptionType::kUInt64T, OptionVerificationType::kNormal, + OptionTypeFlags::kNone}}, + {"external_sst_file_global_seqno_offset", + {offsetof(struct TableProperties, + external_sst_file_global_seqno_offset), + OptionType::kUInt64T, OptionVerificationType::kNormal, + OptionTypeFlags::kNone}}, + {"tail_start_offset", + {offsetof(struct TableProperties, tail_start_offset), + OptionType::kUInt64T, OptionVerificationType::kNormal, + OptionTypeFlags::kNone}}, + {"user_defined_timestamps_persisted", + {offsetof(struct TableProperties, user_defined_timestamps_persisted), + OptionType::kUInt64T, OptionVerificationType::kNormal, + OptionTypeFlags::kNone}}, + {"key_largest_seqno", + {offsetof(struct TableProperties, key_largest_seqno), + OptionType::kUInt64T, OptionVerificationType::kNormal, + OptionTypeFlags::kNone}}, + {"db_id", + {offsetof(struct TableProperties, db_id), OptionType::kEncodedString}}, + {"db_session_id", + {offsetof(struct TableProperties, db_session_id), + OptionType::kEncodedString}}, + {"db_host_id", + {offsetof(struct TableProperties, db_host_id), + OptionType::kEncodedString}}, + {"column_family_name", + {offsetof(struct TableProperties, column_family_name), + OptionType::kEncodedString}}, + {"filter_policy_name", + {offsetof(struct TableProperties, filter_policy_name), + OptionType::kEncodedString}}, + {"comparator_name", + {offsetof(struct TableProperties, comparator_name), + OptionType::kEncodedString}}, + {"merge_operator_name", + {offsetof(struct TableProperties, merge_operator_name), + OptionType::kEncodedString}}, + {"prefix_extractor_name", + {offsetof(struct TableProperties, prefix_extractor_name), + OptionType::kEncodedString}}, + {"property_collectors_names", + {offsetof(struct TableProperties, property_collectors_names), + OptionType::kEncodedString}}, + {"compression_name", + {offsetof(struct TableProperties, compression_name), + OptionType::kEncodedString}}, + {"compression_options", + {offsetof(struct TableProperties, compression_options), + OptionType::kEncodedString}}, + {"seqno_to_time_mapping", + {offsetof(struct TableProperties, seqno_to_time_mapping), + OptionType::kEncodedString}}, + {"user_collected_properties", + OptionTypeInfo::StringMap( + offsetof(struct TableProperties, user_collected_properties), + OptionVerificationType::kNormal, OptionTypeFlags::kNone)}, + {"readable_properties", + OptionTypeInfo::StringMap( + offsetof(struct TableProperties, readable_properties), + OptionVerificationType::kNormal, OptionTypeFlags::kNone)}, +}; static std::unordered_map cs_output_file_type_info = { @@ -531,6 +686,11 @@ static std::unordered_map offsetof(struct CompactionServiceOutputFile, unique_id), OptionVerificationType::kNormal, OptionTypeFlags::kNone, {0, OptionType::kUInt64T})}, + {"table_properties", + OptionTypeInfo::Struct( + "table_properties", &table_properties_type_info, + offsetof(struct CompactionServiceOutputFile, table_properties), + OptionVerificationType::kNormal, OptionTypeFlags::kNone)}, }; static std::unordered_map diff --git a/include/rocksdb/utilities/options_type.h b/include/rocksdb/utilities/options_type.h index c34c860167..9feb61fa9c 100644 --- a/include/rocksdb/utilities/options_type.h +++ b/include/rocksdb/utilities/options_type.h @@ -58,6 +58,7 @@ enum class OptionType { kEncodedString, kTemperature, kArray, + kStringMap, // Map of kUnknown, }; @@ -441,6 +442,63 @@ class OptionTypeInfo { return info; } + static OptionTypeInfo StringMap(int _offset, + OptionVerificationType _verification, + OptionTypeFlags _flags, + char kv_separator = '=', + char item_separator = ';') { + OptionTypeInfo info(_offset, OptionType::kStringMap, _verification, _flags); + info.SetParseFunc( + [kv_separator, item_separator](const ConfigOptions&, const std::string&, + const std::string& value, void* addr) { + std::map map; + Status s; + for (size_t start = 0, end = 0; + s.ok() && start < value.size() && end != std::string::npos; + start = end + 1) { + std::string token; + s = OptionTypeInfo::NextToken(value, item_separator, start, &end, + &token); + if (s.ok() && !token.empty()) { + size_t pos = token.find(kv_separator); + assert(pos != std::string::npos); + std::string k = token.substr(0, pos); + std::string v = token.substr(pos + 1); + std::string decoded_key; + std::string decoded_value; + (Slice(k)).DecodeHex(&decoded_key); + (Slice(v)).DecodeHex(&decoded_value); + map.emplace(std::move(decoded_key), std::move(decoded_value)); + } + } + if (s.ok()) { + *(static_cast*>(addr)) = map; + } + return s; + }); + info.SetSerializeFunc( + [kv_separator, item_separator](const ConfigOptions&, const std::string&, + const void* addr, std::string* value) { + const auto map = + static_cast*>(addr); + value->append("{"); + for (const auto& entry : *map) { + value->append(Slice(entry.first).ToString(true)); + *value += kv_separator; + value->append(Slice(entry.second).ToString(true)); + *value += item_separator; + } + value->append("}"); + return Status::OK(); + }); + info.SetEqualsFunc([](const ConfigOptions&, const std::string&, + const void* addr1, const void* addr2, std::string*) { + return (*static_cast*>(addr1) == + *static_cast*>(addr2)); + }); + return info; + } + // Create a new std::shared_ptr OptionTypeInfo // This function will call the T::CreateFromString method to create a new // std::shared_ptr object. diff --git a/table/block_based/block_based_table_builder.cc b/table/block_based/block_based_table_builder.cc index ec24721b7a..ec4a695b63 100644 --- a/table/block_based/block_based_table_builder.cc +++ b/table/block_based/block_based_table_builder.cc @@ -624,6 +624,7 @@ struct BlockBasedTableBuilder::Rep { props.db_id = tbo.db_id; props.db_session_id = tbo.db_session_id; props.db_host_id = ioptions.db_host_id; + props.format_version = table_options.format_version; if (!ReifyDbHostIdProperty(ioptions.env, &props.db_host_id).ok()) { ROCKS_LOG_INFO(ioptions.logger, "db_host_id property will not be set"); }