From 4b317234338597cb60fd6c009d0fd9b30d892fa5 Mon Sep 17 00:00:00 2001 From: Islam AbdelRahman Date: Mon, 9 May 2016 15:57:19 -0700 Subject: [PATCH] Add bottommost_compression option Summary: Add a new option that can be used to set a specific compression algorithm for bottommost level. This option will only affect levels larger than base level. I have also updated CompactionJobInfo to include the compression algorithm used in compaction Test Plan: added new unittest existing unittests Reviewers: andrewkr, yhchiang, sdong Reviewed By: sdong Subscribers: lightmark, andrewkr, dhruba, yoshinorim Differential Revision: https://reviews.facebook.net/D57669 --- HISTORY.md | 5 ++ db/compaction.cc | 5 +- db/compaction_picker.cc | 22 +++++-- db/compaction_picker.h | 1 + db/db_impl.cc | 1 + db/db_test2.cc | 95 +++++++++++++++++++++++++++++ include/rocksdb/immutable_options.h | 2 + include/rocksdb/listener.h | 4 ++ include/rocksdb/options.h | 10 +++ util/options.cc | 7 +++ util/options_helper.h | 8 ++- util/options_settable_test.cc | 1 + util/options_test.cc | 3 + 13 files changed, 154 insertions(+), 10 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index ad2050af01..2154e136ff 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -1,4 +1,9 @@ # Rocksdb Change Log +## Unreleased +### Public API changes +* Add bottommost_compression option, This option can be used to set a specific compression algorithm for the bottommost level (Last level containing files in the DB). +* Introduce CompactionJobInfo::compression, This field state the compression algorithm used to generate the output files of the compaction. + ## 4.8.0 (5/2/2016) ### Public API Change * Allow preset compression dictionary for improved compression of block-based tables. This is supported for zlib, zstd, and lz4. The compression dictionary's size is configurable via CompressionOptions::max_dict_bytes. diff --git a/db/compaction.cc b/db/compaction.cc index d7a7943c0d..c709365739 100644 --- a/db/compaction.cc +++ b/db/compaction.cc @@ -203,8 +203,9 @@ Compaction::~Compaction() { } bool Compaction::InputCompressionMatchesOutput() const { - int base_level = input_version_->storage_info()->base_level(); - bool matches = (GetCompressionType(*cfd_->ioptions(), start_level_, + VersionStorageInfo* vstorage = input_version_->storage_info(); + int base_level = vstorage->base_level(); + bool matches = (GetCompressionType(*cfd_->ioptions(), vstorage, start_level_, base_level) == output_compression_); if (matches) { TEST_SYNC_POINT("Compaction::InputCompressionMatchesOutput:Matches"); diff --git a/db/compaction_picker.cc b/db/compaction_picker.cc index 91b9a2b5a4..1a36930bc9 100644 --- a/db/compaction_picker.cc +++ b/db/compaction_picker.cc @@ -109,12 +109,20 @@ SmallestKeyHeap create_level_heap(Compaction* c, const Comparator* ucmp) { // matter what the values of the other two parameters are. // Otherwise, the compression type is determined based on options and level. CompressionType GetCompressionType(const ImmutableCFOptions& ioptions, + const VersionStorageInfo* vstorage, int level, int base_level, const bool enable_compression) { if (!enable_compression) { // disable compression return kNoCompression; } + + // If bottommost_compression is set and we are compacting to the + // bottommost level then we should use it. + if (ioptions.bottommost_compression != kDisableCompressionOption && + level > base_level && level >= (vstorage->num_non_empty_levels() - 1)) { + return ioptions.bottommost_compression; + } // If the use has specified a different compression level for each level, // then pick the compression for that level. if (!ioptions.compression_per_level.empty()) { @@ -505,7 +513,7 @@ Compaction* CompactionPicker::CompactRange( vstorage, mutable_cf_options, std::move(inputs), output_level, mutable_cf_options.MaxFileSizeForLevel(output_level), /* max_grandparent_overlap_bytes */ LLONG_MAX, output_path_id, - GetCompressionType(ioptions_, output_level, 1), + GetCompressionType(ioptions_, vstorage, output_level, 1), /* grandparents */ {}, /* is manual */ true); if (start_level == 0) { level0_compactions_in_progress_.insert(c); @@ -605,8 +613,8 @@ Compaction* CompactionPicker::CompactRange( vstorage, mutable_cf_options, std::move(compaction_inputs), output_level, mutable_cf_options.MaxFileSizeForLevel(output_level), mutable_cf_options.MaxGrandParentOverlapBytes(input_level), - output_path_id, - GetCompressionType(ioptions_, output_level, vstorage->base_level()), + output_path_id, GetCompressionType(ioptions_, vstorage, output_level, + vstorage->base_level()), std::move(grandparents), /* is manual compaction */ true); TEST_SYNC_POINT_CALLBACK("CompactionPicker::CompactRange:Return", compaction); @@ -1026,7 +1034,8 @@ Compaction* LevelCompactionPicker::PickCompaction( mutable_cf_options.MaxFileSizeForLevel(output_level), mutable_cf_options.MaxGrandParentOverlapBytes(level), GetPathId(ioptions_, mutable_cf_options, output_level), - GetCompressionType(ioptions_, output_level, vstorage->base_level()), + GetCompressionType(ioptions_, vstorage, output_level, + vstorage->base_level()), std::move(grandparents), is_manual, score, false /* deletion_compaction */, compaction_reason); @@ -1638,7 +1647,8 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp( return new Compaction( vstorage, mutable_cf_options, std::move(inputs), output_level, mutable_cf_options.MaxFileSizeForLevel(output_level), LLONG_MAX, path_id, - GetCompressionType(ioptions_, start_level, 1, enable_compression), + GetCompressionType(ioptions_, vstorage, start_level, 1, + enable_compression), /* grandparents */ {}, /* is manual */ false, score, false /* deletion_compaction */, compaction_reason); } @@ -1763,7 +1773,7 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp( vstorage->num_levels() - 1, mutable_cf_options.MaxFileSizeForLevel(vstorage->num_levels() - 1), /* max_grandparent_overlap_bytes */ LLONG_MAX, path_id, - GetCompressionType(ioptions_, vstorage->num_levels() - 1, 1), + GetCompressionType(ioptions_, vstorage, vstorage->num_levels() - 1, 1), /* grandparents */ {}, /* is manual */ false, score, false /* deletion_compaction */, CompactionReason::kUniversalSizeAmplification); diff --git a/db/compaction_picker.h b/db/compaction_picker.h index 0503c86924..c1b0fabbc8 100644 --- a/db/compaction_picker.h +++ b/db/compaction_picker.h @@ -350,6 +350,7 @@ class NullCompactionPicker : public CompactionPicker { #endif // !ROCKSDB_LITE CompressionType GetCompressionType(const ImmutableCFOptions& ioptions, + const VersionStorageInfo* vstorage, int level, int base_level, const bool enable_compression = true); diff --git a/db/db_impl.cc b/db/db_impl.cc index fdb924deaa..0d621b6398 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -2066,6 +2066,7 @@ void DBImpl::NotifyOnCompactionCompleted( info.stats = compaction_job_stats; info.table_properties = c->GetOutputTableProperties(); info.compaction_reason = c->compaction_reason(); + info.compression = c->output_compression(); for (size_t i = 0; i < c->num_input_levels(); ++i) { for (const auto fmd : *c->inputs(i)) { auto fn = TableFileName(db_options_.db_paths, fmd->fd.GetNumber(), diff --git a/db/db_test2.cc b/db/db_test2.cc index b28db2f896..4e03e20916 100644 --- a/db/db_test2.cc +++ b/db/db_test2.cc @@ -817,6 +817,101 @@ TEST_F(DBTest2, FirstSnapshotTest) { db_->ReleaseSnapshot(s1); } +class CompactionCompressionListener : public EventListener { + public: + explicit CompactionCompressionListener(Options* db_options) + : db_options_(db_options) {} + + void OnCompactionCompleted(DB* db, const CompactionJobInfo& ci) override { + // Figure out last level with files + int bottommost_level = 0; + for (int level = 0; level < db->NumberLevels(); level++) { + std::string files_at_level; + ASSERT_TRUE( + db->GetProperty("rocksdb.num-files-at-level" + NumberToString(level), + &files_at_level)); + if (files_at_level != "0") { + bottommost_level = level; + } + } + + if (db_options_->bottommost_compression != kDisableCompressionOption && + ci.output_level == bottommost_level && ci.output_level >= 2) { + ASSERT_EQ(ci.compression, db_options_->bottommost_compression); + } else if (db_options_->compression_per_level.size() != 0) { + ASSERT_EQ(ci.compression, + db_options_->compression_per_level[ci.output_level]); + } else { + ASSERT_EQ(ci.compression, db_options_->compression); + } + max_level_checked = std::max(max_level_checked, ci.output_level); + } + + int max_level_checked = 0; + const Options* db_options_; +}; + +TEST_F(DBTest2, CompressionOptions) { + if (!Zlib_Supported() || !Snappy_Supported()) { + return; + } + + Options options = CurrentOptions(); + options.level0_file_num_compaction_trigger = 2; + options.max_bytes_for_level_base = 100; + options.max_bytes_for_level_multiplier = 2; + options.num_levels = 7; + options.max_background_compactions = 1; + options.base_background_compactions = 1; + + CompactionCompressionListener* listener = + new CompactionCompressionListener(&options); + options.listeners.emplace_back(listener); + + const int kKeySize = 5; + const int kValSize = 20; + Random rnd(301); + + for (int iter = 0; iter <= 2; iter++) { + listener->max_level_checked = 0; + + if (iter == 0) { + // Use different compression algorithms for different levels but + // always use Zlib for bottommost level + options.compression_per_level = {kNoCompression, kNoCompression, + kNoCompression, kSnappyCompression, + kSnappyCompression, kSnappyCompression, + kZlibCompression}; + options.compression = kNoCompression; + options.bottommost_compression = kZlibCompression; + } else if (iter == 1) { + // Use Snappy except for bottommost level use ZLib + options.compression_per_level = {}; + options.compression = kSnappyCompression; + options.bottommost_compression = kZlibCompression; + } else if (iter == 2) { + // Use Snappy everywhere + options.compression_per_level = {}; + options.compression = kSnappyCompression; + options.bottommost_compression = kDisableCompressionOption; + } + + DestroyAndReopen(options); + // Write 10 random files + for (int i = 0; i < 10; i++) { + for (int j = 0; j < 5; j++) { + ASSERT_OK( + Put(RandomString(&rnd, kKeySize), RandomString(&rnd, kValSize))); + } + ASSERT_OK(Flush()); + dbfull()->TEST_WaitForCompact(); + } + + // Make sure that we wrote enough to check all 7 levels + ASSERT_EQ(listener->max_level_checked, 6); + } +} + class PinL0IndexAndFilterBlocksTest : public DBTestBase, public testing::WithParamInterface { public: diff --git a/include/rocksdb/immutable_options.h b/include/rocksdb/immutable_options.h index 5a1500826a..0a9c29bf67 100644 --- a/include/rocksdb/immutable_options.h +++ b/include/rocksdb/immutable_options.h @@ -85,6 +85,8 @@ struct ImmutableCFOptions { std::vector compression_per_level; + CompressionType bottommost_compression; + CompressionOptions compression_opts; bool level_compaction_dynamic_level_bytes; diff --git a/include/rocksdb/listener.h b/include/rocksdb/listener.h index 6b228b5c96..9c1fa54497 100644 --- a/include/rocksdb/listener.h +++ b/include/rocksdb/listener.h @@ -20,6 +20,7 @@ typedef std::unordered_map> class DB; class Status; struct CompactionJobStats; +enum CompressionType : char; enum class TableFileCreationReason { kFlush, @@ -142,6 +143,9 @@ struct CompactionJobInfo { // Reason to run the compaction CompactionReason compaction_reason; + // Compression algorithm used for output files + CompressionType compression; + // If non-null, this variable stores detailed information // about this compaction. CompactionJobStats stats; diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 9853d7d01d..fb5eab5d46 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -64,6 +64,9 @@ enum CompressionType : char { kXpressCompression = 0x6, // zstd format is not finalized yet so it's subject to changes. kZSTDNotFinalCompression = 0x40, + + // kDisableCompressionOption is used to disable some compression options. + kDisableCompressionOption = -1, }; enum CompactionStyle : char { @@ -369,6 +372,13 @@ struct ColumnFamilyOptions { // change when data grows. std::vector compression_per_level; + // Compression algorithm that will be used for the bottommost level that + // contain files. If level-compaction is used, this option will only affect + // levels after base level. + // + // Default: kDisableCompressionOption (Disabled) + CompressionType bottommost_compression; + // different options for compression algorithms CompressionOptions compression_opts; diff --git a/util/options.cc b/util/options.cc index a4587d2ed9..d7f86c877b 100644 --- a/util/options.cc +++ b/util/options.cc @@ -66,6 +66,7 @@ ImmutableCFOptions::ImmutableCFOptions(const Options& options) use_fsync(options.use_fsync), compression(options.compression), compression_per_level(options.compression_per_level), + bottommost_compression(options.bottommost_compression), compression_opts(options.compression_opts), level_compaction_dynamic_level_bytes( options.level_compaction_dynamic_level_bytes), @@ -88,6 +89,7 @@ ColumnFamilyOptions::ColumnFamilyOptions() min_write_buffer_number_to_merge(1), max_write_buffer_number_to_maintain(0), compression(Snappy_Supported() ? kSnappyCompression : kNoCompression), + bottommost_compression(kDisableCompressionOption), prefix_extractor(nullptr), num_levels(7), level0_file_num_compaction_trigger(4), @@ -146,6 +148,7 @@ ColumnFamilyOptions::ColumnFamilyOptions(const Options& options) options.max_write_buffer_number_to_maintain), compression(options.compression), compression_per_level(options.compression_per_level), + bottommost_compression(options.bottommost_compression), compression_opts(options.compression_opts), prefix_extractor(options.prefix_extractor), num_levels(options.num_levels), @@ -494,6 +497,10 @@ void ColumnFamilyOptions::Dump(Logger* log) const { Header(log, " Options.compression: %s", CompressionTypeToString(compression).c_str()); } + Header(log, " Options.bottommost_compression: %s", + bottommost_compression == kDisableCompressionOption + ? "Disabled" + : CompressionTypeToString(bottommost_compression).c_str()); Header(log, " Options.prefix_extractor: %s", prefix_extractor == nullptr ? "nullptr" : prefix_extractor->Name()); Header(log, " Options.num_levels: %d", num_levels); diff --git a/util/options_helper.h b/util/options_helper.h index 947b8a9ec5..909cbc9ec1 100644 --- a/util/options_helper.h +++ b/util/options_helper.h @@ -460,6 +460,9 @@ static std::unordered_map cf_options_type_info = { {"compression_per_level", {offsetof(struct ColumnFamilyOptions, compression_per_level), OptionType::kVectorCompressionType, OptionVerificationType::kNormal}}, + {"bottommost_compression", + {offsetof(struct ColumnFamilyOptions, bottommost_compression), + OptionType::kCompressionType, OptionVerificationType::kNormal}}, {"comparator", {offsetof(struct ColumnFamilyOptions, comparator), OptionType::kComparator, OptionVerificationType::kByName}}, @@ -575,8 +578,9 @@ static std::unordered_map {"kBZip2Compression", kBZip2Compression}, {"kLZ4Compression", kLZ4Compression}, {"kLZ4HCCompression", kLZ4HCCompression}, - {"kXpressCompression", kXpressCompression }, - {"kZSTDNotFinalCompression", kZSTDNotFinalCompression}}; + {"kXpressCompression", kXpressCompression}, + {"kZSTDNotFinalCompression", kZSTDNotFinalCompression}, + {"kDisableCompressionOption", kDisableCompressionOption}}; static std::unordered_map block_base_table_index_type_string_map = { diff --git a/util/options_settable_test.cc b/util/options_settable_test.cc index 8b79eacbf4..b4c0fd4cf3 100644 --- a/util/options_settable_test.cc +++ b/util/options_settable_test.cc @@ -397,6 +397,7 @@ TEST_F(OptionsSettableTest, ColumnFamilyOptionsAllFieldsSettable) { "max_bytes_for_level_multiplier=60;" "memtable_factory=SkipListFactory;" "compression=kNoCompression;" + "bottommost_compression=kDisableCompressionOption;" "min_partial_merge_operands=7576;" "level0_stop_writes_trigger=33;" "num_levels=99;" diff --git a/util/options_test.cc b/util/options_test.cc index 131a80a799..03b2780f67 100644 --- a/util/options_test.cc +++ b/util/options_test.cc @@ -103,6 +103,7 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) { "kLZ4HCCompression:" "kXpressCompression:" "kZSTDNotFinalCompression"}, + {"bottommost_compression", "kLZ4Compression"}, {"compression_opts", "4:5:6:7"}, {"num_levels", "8"}, {"level0_file_num_compaction_trigger", "8"}, @@ -202,6 +203,7 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) { ASSERT_EQ(new_cf_opt.compression_opts.level, 5); ASSERT_EQ(new_cf_opt.compression_opts.strategy, 6); ASSERT_EQ(new_cf_opt.compression_opts.max_dict_bytes, 7); + ASSERT_EQ(new_cf_opt.bottommost_compression, kLZ4Compression); ASSERT_EQ(new_cf_opt.num_levels, 8); ASSERT_EQ(new_cf_opt.level0_file_num_compaction_trigger, 8); ASSERT_EQ(new_cf_opt.level0_slowdown_writes_trigger, 9); @@ -608,6 +610,7 @@ TEST_F(OptionsTest, GetOptionsFromStringTest) { ASSERT_EQ(new_options.compression_opts.level, 5); ASSERT_EQ(new_options.compression_opts.strategy, 6); ASSERT_EQ(new_options.compression_opts.max_dict_bytes, 0); + ASSERT_EQ(new_options.bottommost_compression, kDisableCompressionOption); ASSERT_EQ(new_options.write_buffer_size, 10U); ASSERT_EQ(new_options.max_write_buffer_number, 16); BlockBasedTableOptions new_block_based_table_options =