From d79be3dca24016c5d6ce218cc6baab6bf928cb97 Mon Sep 17 00:00:00 2001 From: Peter Dillinger Date: Fri, 21 Apr 2023 21:57:40 -0700 Subject: [PATCH] Changes and enhancements to compression stats, thresholds (#11388) Summary: ## Option API updates * Add new CompressionOptions::max_compressed_bytes_per_kb, which corresponds to 1024.0 / min allowable compression ratio. This avoids the hard-coded minimum ratio of 8/7. * Remove unnecessary constructor for CompressionOptions. * Document undocumented CompressionOptions. Use idiom for default values shown clearly in one place (not precariously repeated). ## Stat API updates * Deprecate the BYTES_COMPRESSED, BYTES_DECOMPRESSED histograms. Histograms incur substantial extra space & time costs compared to tickers, and the distribution of uncompressed data block sizes tends to be uninteresting. If we're interested in that distribution, I don't see why it should be limited to blocks stored as compressed. * Deprecate the NUMBER_BLOCK_NOT_COMPRESSED ticker, because the name is very confusing. * New or existing tickers relevant to compression: * BYTES_COMPRESSED_FROM * BYTES_COMPRESSED_TO * BYTES_COMPRESSION_BYPASSED * BYTES_COMPRESSION_REJECTED * COMPACT_WRITE_BYTES + FLUSH_WRITE_BYTES (both existing) * NUMBER_BLOCK_COMPRESSED (existing) * NUMBER_BLOCK_COMPRESSION_BYPASSED * NUMBER_BLOCK_COMPRESSION_REJECTED * BYTES_DECOMPRESSED_FROM * BYTES_DECOMPRESSED_TO We can compute a number of things with these stats: * "Successful" compression ratio: BYTES_COMPRESSED_FROM / BYTES_COMPRESSED_TO * Compression ratio of data on which compression was attempted: (BYTES_COMPRESSED_FROM + BYTES_COMPRESSION_REJECTED) / (BYTES_COMPRESSED_TO + BYTES_COMPRESSION_REJECTED) * Compression ratio of data that could be eligible for compression: (BYTES_COMPRESSED_FROM + X) / (BYTES_COMPRESSED_TO + X) where X = BYTES_COMPRESSION_REJECTED + NUMBER_BLOCK_COMPRESSION_REJECTED * Overall SST compression ratio (compression disabled vs. actual): (Y - BYTES_COMPRESSED_TO + BYTES_COMPRESSED_FROM) / Y where Y = COMPACT_WRITE_BYTES + FLUSH_WRITE_BYTES Keeping _REJECTED separate from _BYPASSED helps us to understand "wasted" CPU time in compression. ## BlockBasedTableBuilder Various small refactorings, optimizations, and name clean-ups. Pull Request resolved: https://github.com/facebook/rocksdb/pull/11388 Test Plan: unit tests added * `options_settable_test.cc`: use non-deprecated idiom for configuring CompressionOptions from string. The old idiom is tested elsewhere and does not need to be updated to support the new field. Reviewed By: ajkr Differential Revision: D45128202 Pulled By: pdillinger fbshipit-source-id: 5a652bf5c022b7ec340cf79018cccf0686962803 --- HISTORY.md | 2 + db/blob/blob_file_builder.cc | 1 + db/db_statistics_test.cc | 95 ++++++++++++---- include/rocksdb/advanced_options.h | 88 +++++++-------- include/rocksdb/statistics.h | 37 ++++++- monitoring/statistics.cc | 10 ++ options/cf_options.cc | 4 + options/options_settable_test.cc | 9 +- .../block_based/block_based_table_builder.cc | 103 +++++++++--------- table/format.cc | 4 +- table/sst_file_dumper.cc | 3 +- table/table_test.cc | 66 ++++++++++- test_util/testharness.h | 5 + util/stop_watch.h | 2 + 14 files changed, 304 insertions(+), 125 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index 1ca433e8ed..ad6b66c999 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -3,6 +3,7 @@ ### Public API Changes * `SstFileWriter::DeleteRange()` now returns `Status::InvalidArgument` if the range's end key comes before its start key according to the user comparator. Previously the behavior was undefined. * Add `multi_get_for_update` to C API. +* Remove unnecessary constructor for CompressionOptions. ### Behavior changes * Changed default block cache size from an 8MB to 32MB LRUCache, which increases the default number of cache shards from 16 to 64. This change is intended to minimize cache mutex contention under stress conditions. See https://github.com/facebook/rocksdb/wiki/Block-Cache for more information. @@ -17,6 +18,7 @@ ### New Features * Add experimental `PerfContext` counters `iter_{next|prev|seek}_count` for db iterator, each counting the times of corresponding API being called. * Allow runtime changes to whether `WriteBufferManager` allows stall or not by calling `SetAllowStall()` +* Added statistics tickers BYTES_COMPRESSED_FROM, BYTES_COMPRESSED_TO, BYTES_COMPRESSION_BYPASSED, BYTES_COMPRESSION_REJECTED, NUMBER_BLOCK_COMPRESSION_BYPASSED, and NUMBER_BLOCK_COMPRESSION_REJECTED. Disabled/deprecated histograms BYTES_COMPRESSED and BYTES_DECOMPRESSED, and ticker NUMBER_BLOCK_NOT_COMPRESSED. The new tickers offer more inight into compression ratios, rejected vs. disabled compression, etc. (#11388) * New statistics `rocksdb.file.read.{flush|compaction}.micros` that measure read time of block-based SST tables or blob files during flush or compaction. ### Bug Fixes diff --git a/db/blob/blob_file_builder.cc b/db/blob/blob_file_builder.cc index 952a5676bf..21c1e5d417 100644 --- a/db/blob/blob_file_builder.cc +++ b/db/blob/blob_file_builder.cc @@ -259,6 +259,7 @@ Status BlobFileBuilder::CompressBlobIfNeeded( return Status::OK(); } + // TODO: allow user CompressionOptions, including max_compressed_bytes_per_kb CompressionOptions opts; CompressionContext context(blob_compression_type_); constexpr uint64_t sample_for_compression = 0; diff --git a/db/db_statistics_test.cc b/db/db_statistics_test.cc index 85a54aa948..ed80760518 100644 --- a/db/db_statistics_test.cc +++ b/db/db_statistics_test.cc @@ -49,47 +49,102 @@ TEST_F(DBStatisticsTest, CompressionStatsTest) { options.compression = type; options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics(); options.statistics->set_stats_level(StatsLevel::kExceptTimeForMutex); + BlockBasedTableOptions bbto; + bbto.enable_index_compression = false; + options.table_factory.reset(NewBlockBasedTableFactory(bbto)); DestroyAndReopen(options); - int kNumKeysWritten = 100000; + auto PopStat = [&](Tickers t) -> uint64_t { + return options.statistics->getAndResetTickerCount(t); + }; + + int kNumKeysWritten = 100; + double compress_to = 0.5; + // About three KVs per block + int len = static_cast(BlockBasedTableOptions().block_size / 3); + int uncomp_est = kNumKeysWritten * (len + 20); + + Random rnd(301); + std::string buf; // Check that compressions occur and are counted when compression is turned on - Random rnd(301); for (int i = 0; i < kNumKeysWritten; ++i) { - // compressible string - ASSERT_OK(Put(Key(i), rnd.RandomString(128) + std::string(128, 'a'))); + ASSERT_OK( + Put(Key(i), test::CompressibleString(&rnd, compress_to, len, &buf))); } ASSERT_OK(Flush()); - ASSERT_GT(options.statistics->getTickerCount(NUMBER_BLOCK_COMPRESSED), 0); + EXPECT_EQ(34, PopStat(NUMBER_BLOCK_COMPRESSED)); + EXPECT_NEAR2(uncomp_est, PopStat(BYTES_COMPRESSED_FROM), uncomp_est / 10); + EXPECT_NEAR2(uncomp_est * compress_to, PopStat(BYTES_COMPRESSED_TO), + uncomp_est / 10); + EXPECT_EQ(0, PopStat(NUMBER_BLOCK_DECOMPRESSED)); + EXPECT_EQ(0, PopStat(BYTES_DECOMPRESSED_FROM)); + EXPECT_EQ(0, PopStat(BYTES_DECOMPRESSED_TO)); + + // And decompressions for (int i = 0; i < kNumKeysWritten; ++i) { auto r = Get(Key(i)); } - ASSERT_GT(options.statistics->getTickerCount(NUMBER_BLOCK_DECOMPRESSED), 0); + EXPECT_EQ(34, PopStat(NUMBER_BLOCK_DECOMPRESSED)); + EXPECT_NEAR2(uncomp_est, PopStat(BYTES_DECOMPRESSED_TO), uncomp_est / 10); + EXPECT_NEAR2(uncomp_est * compress_to, PopStat(BYTES_DECOMPRESSED_FROM), + uncomp_est / 10); + EXPECT_EQ(0, PopStat(BYTES_COMPRESSION_BYPASSED)); + EXPECT_EQ(0, PopStat(BYTES_COMPRESSION_REJECTED)); + EXPECT_EQ(0, PopStat(NUMBER_BLOCK_COMPRESSION_BYPASSED)); + EXPECT_EQ(0, PopStat(NUMBER_BLOCK_COMPRESSION_REJECTED)); + + // Check when compression is rejected. + compress_to = 0.95; + DestroyAndReopen(options); + + for (int i = 0; i < kNumKeysWritten; ++i) { + ASSERT_OK( + Put(Key(i), test::CompressibleString(&rnd, compress_to, len, &buf))); + } + ASSERT_OK(Flush()); + for (int i = 0; i < kNumKeysWritten; ++i) { + auto r = Get(Key(i)); + } + EXPECT_EQ(34, PopStat(NUMBER_BLOCK_COMPRESSION_REJECTED)); + EXPECT_NEAR2(uncomp_est, PopStat(BYTES_COMPRESSION_REJECTED), + uncomp_est / 10); + + EXPECT_EQ(0, PopStat(NUMBER_BLOCK_COMPRESSED)); + EXPECT_EQ(0, PopStat(NUMBER_BLOCK_COMPRESSION_BYPASSED)); + EXPECT_EQ(0, PopStat(NUMBER_BLOCK_DECOMPRESSED)); + EXPECT_EQ(0, PopStat(BYTES_COMPRESSED_FROM)); + EXPECT_EQ(0, PopStat(BYTES_COMPRESSED_TO)); + EXPECT_EQ(0, PopStat(BYTES_COMPRESSION_BYPASSED)); + EXPECT_EQ(0, PopStat(BYTES_DECOMPRESSED_FROM)); + EXPECT_EQ(0, PopStat(BYTES_DECOMPRESSED_TO)); + + // Check when compression is disabled. options.compression = kNoCompression; DestroyAndReopen(options); - uint64_t currentCompressions = - options.statistics->getTickerCount(NUMBER_BLOCK_COMPRESSED); - uint64_t currentDecompressions = - options.statistics->getTickerCount(NUMBER_BLOCK_DECOMPRESSED); - // Check that compressions do not occur when turned off for (int i = 0; i < kNumKeysWritten; ++i) { - // compressible string - ASSERT_OK(Put(Key(i), rnd.RandomString(128) + std::string(128, 'a'))); + ASSERT_OK( + Put(Key(i), test::CompressibleString(&rnd, compress_to, len, &buf))); } ASSERT_OK(Flush()); - ASSERT_EQ(options.statistics->getTickerCount(NUMBER_BLOCK_COMPRESSED) - - currentCompressions, - 0); - for (int i = 0; i < kNumKeysWritten; ++i) { auto r = Get(Key(i)); } - ASSERT_EQ(options.statistics->getTickerCount(NUMBER_BLOCK_DECOMPRESSED) - - currentDecompressions, - 0); + EXPECT_EQ(34, PopStat(NUMBER_BLOCK_COMPRESSION_BYPASSED)); + EXPECT_NEAR2(uncomp_est, PopStat(BYTES_COMPRESSION_BYPASSED), + uncomp_est / 10); + + EXPECT_EQ(0, PopStat(NUMBER_BLOCK_COMPRESSED)); + EXPECT_EQ(0, PopStat(NUMBER_BLOCK_COMPRESSION_REJECTED)); + EXPECT_EQ(0, PopStat(NUMBER_BLOCK_DECOMPRESSED)); + EXPECT_EQ(0, PopStat(BYTES_COMPRESSED_FROM)); + EXPECT_EQ(0, PopStat(BYTES_COMPRESSED_TO)); + EXPECT_EQ(0, PopStat(BYTES_COMPRESSION_REJECTED)); + EXPECT_EQ(0, PopStat(BYTES_DECOMPRESSED_FROM)); + EXPECT_EQ(0, PopStat(BYTES_DECOMPRESSED_TO)); } TEST_F(DBStatisticsTest, MutexWaitStatsDisabledByDefault) { diff --git a/include/rocksdb/advanced_options.h b/include/rocksdb/advanced_options.h index 78d8a0e4b8..5862126d0a 100644 --- a/include/rocksdb/advanced_options.h +++ b/include/rocksdb/advanced_options.h @@ -85,17 +85,28 @@ struct CompactionOptionsFIFO { // Compression options for different compression algorithms like Zlib struct CompressionOptions { + // ==> BEGIN options that can be set by deprecated configuration syntax, <== + // ==> e.g. compression_opts=5:6:7:8:9:10:true:11:false <== + // ==> Please use compression_opts={level=6;strategy=7;} form instead. <== + // RocksDB's generic default compression level. Internally it'll be translated // to the default compression level specific to the library being used (see // comment above `ColumnFamilyOptions::compression`). // // The default value is the max 16-bit int as it'll be written out in OPTIONS // file, which should be portable. - const static int kDefaultCompressionLevel = 32767; + static constexpr int kDefaultCompressionLevel = 32767; - int window_bits; - int level; - int strategy; + // zlib only: windowBits parameter. See https://www.zlib.net/manual.html + int window_bits = -14; + + // Compression "level" applicable to zstd, zlib, LZ4. Except for + // kDefaultCompressionLevel (see above), the meaning of each value depends + // on the compression algorithm. + int level = kDefaultCompressionLevel; + + // zlib only: strategy parameter. See https://www.zlib.net/manual.html + int strategy = 0; // Maximum size of dictionaries used to prime the compression library. // Enabling dictionary can improve compression ratios when there are @@ -117,18 +128,14 @@ struct CompressionOptions { // If block cache insertion fails with `Status::MemoryLimit` (i.e., it is // full), we finalize the dictionary with whatever data we have and then stop // buffering. - // - // Default: 0. - uint32_t max_dict_bytes; + uint32_t max_dict_bytes = 0; // Maximum size of training data passed to zstd's dictionary trainer. Using // zstd's dictionary trainer can achieve even better compression ratio // improvements than using `max_dict_bytes` alone. // // The training data will be used to generate a dictionary of max_dict_bytes. - // - // Default: 0. - uint32_t zstd_max_train_bytes; + uint32_t zstd_max_train_bytes = 0; // Number of threads for parallel compression. // Parallel compression is enabled only if threads > 1. @@ -141,9 +148,7 @@ struct CompressionOptions { // compressed size is in flight when compression is parallelized. To be // reasonably accurate, this inflation is also estimated by using historical // compression ratio and current bytes inflight. - // - // Default: 1. - uint32_t parallel_threads; + uint32_t parallel_threads = 1; // When the compression options are set by the user, it will be set to "true". // For bottommost_compression_opts, to enable it, user must set enabled=true. @@ -152,9 +157,7 @@ struct CompressionOptions { // // For compression_opts, if compression_opts.enabled=false, it is still // used as compression options for compression process. - // - // Default: false. - bool enabled; + bool enabled = false; // Limit on data buffering when gathering samples to build a dictionary. Zero // means no limit. When dictionary is disabled (`max_dict_bytes == 0`), @@ -173,9 +176,7 @@ struct CompressionOptions { // `zstd_max_train_bytes` (when enabled) can restrict how many samples we can // pass to the dictionary trainer. Configuring it below `max_dict_bytes` can // restrict the size of the final dictionary. - // - // Default: 0 (unlimited) - uint64_t max_dict_buffer_bytes; + uint64_t max_dict_buffer_bytes = 0; // Use zstd trainer to generate dictionaries. When this option is set to true, // zstd_max_train_bytes of training data sampled from max_dict_buffer_bytes @@ -187,34 +188,29 @@ struct CompressionOptions { // data will be passed to this API. Using this API should save CPU time on // dictionary training, but the compression ratio may not be as good as using // a dictionary trainer. - // - // Default: true - bool use_zstd_dict_trainer; + bool use_zstd_dict_trainer = true; - CompressionOptions() - : window_bits(-14), - level(kDefaultCompressionLevel), - strategy(0), - max_dict_bytes(0), - zstd_max_train_bytes(0), - parallel_threads(1), - enabled(false), - max_dict_buffer_bytes(0), - use_zstd_dict_trainer(true) {} - CompressionOptions(int wbits, int _lev, int _strategy, - uint32_t _max_dict_bytes, uint32_t _zstd_max_train_bytes, - uint32_t _parallel_threads, bool _enabled, - uint64_t _max_dict_buffer_bytes, - bool _use_zstd_dict_trainer) - : window_bits(wbits), - level(_lev), - strategy(_strategy), - max_dict_bytes(_max_dict_bytes), - zstd_max_train_bytes(_zstd_max_train_bytes), - parallel_threads(_parallel_threads), - enabled(_enabled), - max_dict_buffer_bytes(_max_dict_buffer_bytes), - use_zstd_dict_trainer(_use_zstd_dict_trainer) {} + // ===> END options that can be set by deprecated configuration syntax <=== + // ===> Use compression_opts={level=6;strategy=7;} form for below opts <=== + + // Essentially specifies a minimum acceptable compression ratio. A block is + // stored uncompressed if the compressed block does not achieve this ratio, + // because the downstream cost of decompression is not considered worth such + // a small savings (if any). + // However, the ratio is specified in a way that is efficient for checking. + // An integer from 1 to 1024 indicates the maximum allowable compressed bytes + // per 1KB of input, so the minimum acceptable ratio is 1024.0 / this value. + // For example, for a minimum ratio of 1.5:1, set to 683. See SetMinRatio(). + // Default: abandon use of compression for a specific block or entry if + // compressed by less than 12.5% (minimum ratio of 1.143:1). + int max_compressed_bytes_per_kb = 1024 * 7 / 8; + + // A convenience function for setting max_compressed_bytes_per_kb based on a + // minimum acceptable compression ratio (uncompressed size over compressed + // size). + void SetMinRatio(double min_ratio) { + max_compressed_bytes_per_kb = static_cast(1024.0 / min_ratio + 0.5); + } }; // Temperature of a file. Used to pass to FileSystem for a different diff --git a/include/rocksdb/statistics.h b/include/rocksdb/statistics.h index b7a8905ffd..56686d438f 100644 --- a/include/rocksdb/statistics.h +++ b/include/rocksdb/statistics.h @@ -201,6 +201,7 @@ enum Tickers : uint32_t { NUMBER_BLOCK_COMPRESSED, NUMBER_BLOCK_DECOMPRESSED, + // DEPRECATED / unused (see NUMBER_BLOCK_COMPRESSION_*) NUMBER_BLOCK_NOT_COMPRESSED, MERGE_OPERATION_TOTAL_TIME, FILTER_OPERATION_TOTAL_TIME, @@ -435,6 +436,36 @@ enum Tickers : uint32_t { // # of times timestamps can successfully help skip the table access TIMESTAMP_FILTER_TABLE_FILTERED, + // Number of input bytes (uncompressed) to compression for SST blocks that + // are stored compressed. + BYTES_COMPRESSED_FROM, + // Number of output bytes (compressed) from compression for SST blocks that + // are stored compressed. + BYTES_COMPRESSED_TO, + // Number of uncompressed bytes for SST blocks that are stored uncompressed + // because compression type is kNoCompression, or some error case caused + // compression not to run or produce an output. Index blocks are only counted + // if enable_index_compression is true. + BYTES_COMPRESSION_BYPASSED, + // Number of input bytes (uncompressed) to compression for SST blocks that + // are stored uncompressed because the compression result was rejected, + // either because the ratio was not acceptable (see + // CompressionOptions::max_compressed_bytes_per_kb) or found invalid by the + // `verify_compression` option. + BYTES_COMPRESSION_REJECTED, + + // Like BYTES_COMPRESSION_BYPASSED but counting number of blocks + NUMBER_BLOCK_COMPRESSION_BYPASSED, + // Like BYTES_COMPRESSION_REJECTED but counting number of blocks + NUMBER_BLOCK_COMPRESSION_REJECTED, + + // Number of input bytes (compressed) to decompression in reading compressed + // SST blocks from storage. + BYTES_DECOMPRESSED_FROM, + // Number of output bytes (uncompressed) from decompression in reading + // compressed SST blocks from storage. + BYTES_DECOMPRESSED_TO, + TICKER_ENUM_MAX }; @@ -486,10 +517,8 @@ enum Histograms : uint32_t { BYTES_PER_WRITE, BYTES_PER_MULTIGET, - // number of bytes compressed/decompressed - // number of bytes is when uncompressed; i.e. before/after respectively - BYTES_COMPRESSED, - BYTES_DECOMPRESSED, + BYTES_COMPRESSED, // DEPRECATED / unused (see BYTES_COMPRESSED_{FROM,TO}) + BYTES_DECOMPRESSED, // DEPRECATED / unused (see BYTES_DECOMPRESSED_{FROM,TO}) COMPRESSION_TIMES_NANOS, DECOMPRESSION_TIMES_NANOS, // Number of merge operands passed to the merge operator in user read diff --git a/monitoring/statistics.cc b/monitoring/statistics.cc index 90e3fbda7c..6d160484e3 100644 --- a/monitoring/statistics.cc +++ b/monitoring/statistics.cc @@ -222,6 +222,16 @@ const std::vector> TickersNameMap = { {TIMESTAMP_FILTER_TABLE_CHECKED, "rocksdb.timestamp.filter.table.checked"}, {TIMESTAMP_FILTER_TABLE_FILTERED, "rocksdb.timestamp.filter.table.filtered"}, + {BYTES_COMPRESSED_FROM, "rocksdb.bytes.compressed.from"}, + {BYTES_COMPRESSED_TO, "rocksdb.bytes.compressed.to"}, + {BYTES_COMPRESSION_BYPASSED, "rocksdb.bytes.compression_bypassed"}, + {BYTES_COMPRESSION_REJECTED, "rocksdb.bytes.compression.rejected"}, + {NUMBER_BLOCK_COMPRESSION_BYPASSED, + "rocksdb.number.block_compression_bypassed"}, + {NUMBER_BLOCK_COMPRESSION_REJECTED, + "rocksdb.number.block_compression_rejected"}, + {BYTES_DECOMPRESSED_FROM, "rocksdb.bytes.decompressed.from"}, + {BYTES_DECOMPRESSED_TO, "rocksdb.bytes.decompressed.to"}, }; const std::vector> HistogramsNameMap = { diff --git a/options/cf_options.cc b/options/cf_options.cc index 14a7105e96..2057e300a2 100644 --- a/options/cf_options.cc +++ b/options/cf_options.cc @@ -147,6 +147,10 @@ static std::unordered_map {"strategy", {offsetof(struct CompressionOptions, strategy), OptionType::kInt, OptionVerificationType::kNormal, OptionTypeFlags::kMutable}}, + {"max_compressed_bytes_per_kb", + {offsetof(struct CompressionOptions, max_compressed_bytes_per_kb), + OptionType::kInt, OptionVerificationType::kNormal, + OptionTypeFlags::kMutable}}, {"max_dict_bytes", {offsetof(struct CompressionOptions, max_dict_bytes), OptionType::kInt, OptionVerificationType::kNormal, OptionTypeFlags::kMutable}}, diff --git a/options/options_settable_test.cc b/options/options_settable_test.cc index 5f58694e1d..c772c786c9 100644 --- a/options/options_settable_test.cc +++ b/options/options_settable_test.cc @@ -497,8 +497,13 @@ TEST_F(OptionsSettableTest, ColumnFamilyOptionsAllFieldsSettable) { "max_bytes_for_level_multiplier=60;" "memtable_factory=SkipListFactory;" "compression=kNoCompression;" - "compression_opts=5:6:7:8:9:10:true:11:false;" - "bottommost_compression_opts=4:5:6:7:8:9:true:10:true;" + "compression_opts={max_dict_buffer_bytes=5;use_zstd_dict_trainer=true;" + "enabled=false;parallel_threads=6;zstd_max_train_bytes=7;strategy=8;max_" + "dict_bytes=9;level=10;window_bits=11;max_compressed_bytes_per_kb=987;};" + "bottommost_compression_opts={max_dict_buffer_bytes=4;use_zstd_dict_" + "trainer=true;enabled=true;parallel_threads=5;zstd_max_train_bytes=6;" + "strategy=7;max_dict_bytes=8;level=9;window_bits=10;max_compressed_bytes_" + "per_kb=876;};" "bottommost_compression=kDisableCompressionOption;" "level0_stop_writes_trigger=33;" "num_levels=99;" diff --git a/table/block_based/block_based_table_builder.cc b/table/block_based/block_based_table_builder.cc index 3a8731365a..2a8e44d1cb 100644 --- a/table/block_based/block_based_table_builder.cc +++ b/table/block_based/block_based_table_builder.cc @@ -104,9 +104,12 @@ FilterBlockBuilder* CreateFilterBlockBuilder( } } -bool GoodCompressionRatio(size_t compressed_size, size_t uncomp_size) { - // Check to see if compressed less than 12.5% - return compressed_size < uncomp_size - (uncomp_size / 8u); +bool GoodCompressionRatio(size_t compressed_size, size_t uncomp_size, + int max_compressed_bytes_per_kb) { + // For efficiency, avoid floating point and division + return compressed_size <= + (static_cast(max_compressed_bytes_per_kb) * uncomp_size) >> + 10; } } // namespace @@ -114,7 +117,7 @@ bool GoodCompressionRatio(size_t compressed_size, size_t uncomp_size) { // format_version is the block format as defined in include/rocksdb/table.h Slice CompressBlock(const Slice& uncompressed_data, const CompressionInfo& info, CompressionType* type, uint32_t format_version, - bool do_sample, std::string* compressed_output, + bool allow_sample, std::string* compressed_output, std::string* sampled_output_fast, std::string* sampled_output_slow) { assert(type); @@ -126,7 +129,7 @@ Slice CompressBlock(const Slice& uncompressed_data, const CompressionInfo& info, // The users can use these stats to decide if it is worthwhile // enabling compression and they also get a hint about which // compression algorithm wil be beneficial. - if (do_sample && info.SampleForCompression() && + if (allow_sample && info.SampleForCompression() && Random::GetTLSInstance()->OneIn( static_cast(info.SampleForCompression()))) { // Sampling with a fast compression algorithm @@ -159,7 +162,8 @@ Slice CompressBlock(const Slice& uncompressed_data, const CompressionInfo& info, } } - if (info.type() == kNoCompression) { + int max_compressed_bytes_per_kb = info.options().max_compressed_bytes_per_kb; + if (info.type() == kNoCompression || max_compressed_bytes_per_kb <= 0) { *type = kNoCompression; return uncompressed_data; } @@ -175,8 +179,8 @@ Slice CompressBlock(const Slice& uncompressed_data, const CompressionInfo& info, // Check the compression ratio; if it's not good enough, just fall back to // uncompressed - if (!GoodCompressionRatio(compressed_output->size(), - uncompressed_data.size())) { + if (!GoodCompressionRatio(compressed_output->size(), uncompressed_data.size(), + max_compressed_bytes_per_kb)) { *type = kNoCompression; return uncompressed_data; } @@ -1108,25 +1112,17 @@ void BlockBasedTableBuilder::CompressAndVerifyBlock( const CompressionContext& compression_ctx, UncompressionContext* verify_ctx, std::string* compressed_output, Slice* block_contents, CompressionType* type, Status* out_status) { - // File format contains a sequence of blocks where each block has: - // block_data: uint8[n] - // type: uint8 - // crc: uint32 Rep* r = rep_; bool is_status_ok = ok(); if (!r->IsParallelCompressionEnabled()) { assert(is_status_ok); } - *type = r->compression_type; - uint64_t sample_for_compression = r->sample_for_compression; - bool abort_compression = false; - - StopWatchNano timer( - r->ioptions.clock, - ShouldReportDetailedTime(r->ioptions.env, r->ioptions.stats)); - if (is_status_ok && uncompressed_block_data.size() < kCompressionSizeLimit) { + StopWatchNano timer( + r->ioptions.clock, + ShouldReportDetailedTime(r->ioptions.env, r->ioptions.stats)); + if (is_data_block) { r->compressible_input_data_bytes.fetch_add(uncompressed_block_data.size(), std::memory_order_relaxed); @@ -1139,14 +1135,14 @@ void BlockBasedTableBuilder::CompressAndVerifyBlock( } assert(compression_dict != nullptr); CompressionInfo compression_info(r->compression_opts, compression_ctx, - *compression_dict, *type, - sample_for_compression); + *compression_dict, r->compression_type, + r->sample_for_compression); std::string sampled_output_fast; std::string sampled_output_slow; *block_contents = CompressBlock( uncompressed_block_data, compression_info, type, - r->table_options.format_version, is_data_block /* do_sample */, + r->table_options.format_version, is_data_block /* allow_sample */, compressed_output, &sampled_output_fast, &sampled_output_slow); if (sampled_output_slow.size() > 0 || sampled_output_fast.size() > 0) { @@ -1179,35 +1175,38 @@ void BlockBasedTableBuilder::CompressAndVerifyBlock( BlockContents contents; UncompressionInfo uncompression_info(*verify_ctx, *verify_dict, r->compression_type); - Status stat = UncompressBlockData( + Status uncompress_status = UncompressBlockData( uncompression_info, block_contents->data(), block_contents->size(), &contents, r->table_options.format_version, r->ioptions); - if (stat.ok()) { - bool compressed_ok = - contents.data.compare(uncompressed_block_data) == 0; - if (!compressed_ok) { + if (uncompress_status.ok()) { + bool data_match = contents.data.compare(uncompressed_block_data) == 0; + if (!data_match) { // The result of the compression was invalid. abort. - abort_compression = true; const char* const msg = "Decompressed block did not match pre-compression block"; ROCKS_LOG_ERROR(r->ioptions.logger, "%s", msg); *out_status = Status::Corruption(msg); + *type = kNoCompression; } } else { // Decompression reported an error. abort. *out_status = Status::Corruption(std::string("Could not decompress: ") + - stat.getState()); - abort_compression = true; + uncompress_status.getState()); + *type = kNoCompression; } } + if (timer.IsStarted()) { + RecordTimeToHistogram(r->ioptions.stats, COMPRESSION_TIMES_NANOS, + timer.ElapsedNanos()); + } } else { - // Block is too big to be compressed. + // Status is not OK, or block is too big to be compressed. if (is_data_block) { r->uncompressible_input_data_bytes.fetch_add( uncompressed_block_data.size(), std::memory_order_relaxed); } - abort_compression = true; + *type = kNoCompression; } if (is_data_block) { r->uncompressible_input_data_bytes.fetch_add(kBlockTrailerSize, @@ -1216,26 +1215,32 @@ void BlockBasedTableBuilder::CompressAndVerifyBlock( // Abort compression if the block is too big, or did not pass // verification. - if (abort_compression) { - RecordTick(r->ioptions.stats, NUMBER_BLOCK_NOT_COMPRESSED); - *type = kNoCompression; + if (*type == kNoCompression) { *block_contents = uncompressed_block_data; - } else if (*type != kNoCompression) { - if (ShouldReportDetailedTime(r->ioptions.env, r->ioptions.stats)) { - RecordTimeToHistogram(r->ioptions.stats, COMPRESSION_TIMES_NANOS, - timer.ElapsedNanos()); - } - RecordInHistogram(r->ioptions.stats, BYTES_COMPRESSED, - uncompressed_block_data.size()); + bool compression_attempted = !compressed_output->empty(); + RecordTick(r->ioptions.stats, compression_attempted + ? NUMBER_BLOCK_COMPRESSION_REJECTED + : NUMBER_BLOCK_COMPRESSION_BYPASSED); + RecordTick(r->ioptions.stats, + compression_attempted ? BYTES_COMPRESSION_REJECTED + : BYTES_COMPRESSION_BYPASSED, + uncompressed_block_data.size()); + } else { RecordTick(r->ioptions.stats, NUMBER_BLOCK_COMPRESSED); - } else if (*type != r->compression_type) { - RecordTick(r->ioptions.stats, NUMBER_BLOCK_NOT_COMPRESSED); + RecordTick(r->ioptions.stats, BYTES_COMPRESSED_FROM, + uncompressed_block_data.size()); + RecordTick(r->ioptions.stats, BYTES_COMPRESSED_TO, + compressed_output->size()); } } void BlockBasedTableBuilder::WriteMaybeCompressedBlock( - const Slice& block_contents, CompressionType type, BlockHandle* handle, + const Slice& block_contents, CompressionType comp_type, BlockHandle* handle, BlockType block_type, const Slice* uncompressed_block_data) { + // File format contains a sequence of blocks where each block has: + // block_data: uint8[n] + // compression_type: uint8 + // checksum: uint32 Rep* r = rep_; bool is_data_block = block_type == BlockType::kData; // Old, misleading name of this function: WriteRawBlock @@ -1246,7 +1251,7 @@ void BlockBasedTableBuilder::WriteMaybeCompressedBlock( assert(io_status().ok()); if (uncompressed_block_data == nullptr) { uncompressed_block_data = &block_contents; - assert(type == kNoCompression); + assert(comp_type == kNoCompression); } { @@ -1258,10 +1263,10 @@ void BlockBasedTableBuilder::WriteMaybeCompressedBlock( } std::array trailer; - trailer[0] = type; + trailer[0] = comp_type; uint32_t checksum = ComputeBuiltinChecksumWithLastByte( r->table_options.checksum, block_contents.data(), block_contents.size(), - /*last_byte*/ type); + /*last_byte*/ comp_type); if (block_type == BlockType::kFilter) { Status s = r->filter_builder->MaybePostVerifyFilter(block_contents); diff --git a/table/format.cc b/table/format.cc index b8785b1135..7d709a217b 100644 --- a/table/format.cc +++ b/table/format.cc @@ -530,8 +530,8 @@ Status UncompressBlockData(const UncompressionInfo& uncompression_info, RecordTimeToHistogram(ioptions.stats, DECOMPRESSION_TIMES_NANOS, timer.ElapsedNanos()); } - RecordTimeToHistogram(ioptions.stats, BYTES_DECOMPRESSED, - out_contents->data.size()); + RecordTick(ioptions.stats, BYTES_DECOMPRESSED_FROM, size); + RecordTick(ioptions.stats, BYTES_DECOMPRESSED_TO, out_contents->data.size()); RecordTick(ioptions.stats, NUMBER_BLOCK_DECOMPRESSED); TEST_SYNC_POINT_CALLBACK("UncompressBlockData:TamperWithReturnValue", diff --git a/table/sst_file_dumper.cc b/table/sst_file_dumper.cc index e9916eb5b0..fa3e5b47da 100644 --- a/table/sst_file_dumper.cc +++ b/table/sst_file_dumper.cc @@ -315,7 +315,8 @@ Status SstFileDumper::ShowCompressionSize( const uint64_t compressed_blocks = opts.statistics->getAndResetTickerCount(NUMBER_BLOCK_COMPRESSED); const uint64_t not_compressed_blocks = - opts.statistics->getAndResetTickerCount(NUMBER_BLOCK_NOT_COMPRESSED); + opts.statistics->getAndResetTickerCount( + NUMBER_BLOCK_COMPRESSION_REJECTED); // When the option enable_index_compression is true, // NUMBER_BLOCK_COMPRESSED is incremented for index block(s). if ((compressed_blocks + not_compressed_blocks) > num_data_blocks) { diff --git a/table/table_test.cc b/table/table_test.cc index a701eda01a..8f0f4b1f1a 100644 --- a/table/table_test.cc +++ b/table/table_test.cc @@ -1718,7 +1718,8 @@ TEST_P(BlockBasedTableTest, BasicBlockBasedTableProperties) { MutableCFOptions moptions(options); c.Finish(options, ioptions, moptions, table_options, GetPlainInternalComparator(options.comparator), &keys, &kvmap); - ASSERT_EQ(options.statistics->getTickerCount(NUMBER_BLOCK_NOT_COMPRESSED), 0); + ASSERT_EQ( + options.statistics->getTickerCount(NUMBER_BLOCK_COMPRESSION_REJECTED), 0); auto& props = *c.GetTableReader()->GetTableProperties(); ASSERT_EQ(kvmap.size(), props.num_entries); @@ -5079,6 +5080,69 @@ TEST_P(BlockBasedTableTest, PropertiesBlockRestartPointTest) { } } +TEST_P(BlockBasedTableTest, CompressionRatioThreshold) { + Options options; + if (Snappy_Supported()) { + options.compression = kSnappyCompression; + fprintf(stderr, "using snappy\n"); + } else if (Zlib_Supported()) { + options.compression = kZlibCompression; + fprintf(stderr, "using zlib\n"); + } else if (BZip2_Supported()) { + options.compression = kBZip2Compression; + fprintf(stderr, "using bzip2\n"); + } else if (LZ4_Supported()) { + options.compression = kLZ4Compression; + fprintf(stderr, "using lz4\n"); + } else if (XPRESS_Supported()) { + options.compression = kXpressCompression; + fprintf(stderr, "using xpress\n"); + } else if (ZSTD_Supported()) { + options.compression = kZSTD; + fprintf(stderr, "using ZSTD\n"); + } else { + fprintf(stderr, "skipping test, compression disabled\n"); + return; + } + + BlockBasedTableOptions table_options = GetBlockBasedTableOptions(); + int len = 10000; + Random rnd(301); + std::vector keys; + stl_wrappers::KVMap kvmap; + + // Test the max_compressed_bytes_per_kb option + for (int threshold : {0, 1, 100, 400, 600, 900, 1024}) { + SCOPED_TRACE("threshold=" + std::to_string(threshold)); + options.compression_opts.max_compressed_bytes_per_kb = threshold; + ImmutableOptions ioptions(options); + MutableCFOptions moptions(options); + + for (double compressible_to : {0.25, 0.75}) { + SCOPED_TRACE("compressible_to=" + std::to_string(compressible_to)); + TableConstructor c(BytewiseComparator(), + true /* convert_to_internal_key_ */); + std::string buf; + c.Add("x", test::CompressibleString(&rnd, compressible_to, len, &buf)); + + // write an SST file + c.Finish(options, ioptions, moptions, table_options, + GetPlainInternalComparator(options.comparator), &keys, &kvmap); + + size_t table_file_size = c.TEST_GetSink()->contents().size(); + size_t approx_sst_overhead = 1000; + if (compressible_to < threshold / 1024.0) { + // Should be compressed + EXPECT_NEAR2(len * compressible_to + approx_sst_overhead, + table_file_size, len / 10); + } else { + // Should not be compressed + EXPECT_NEAR2(len + approx_sst_overhead, table_file_size, len / 10); + } + } + } +} + TEST_P(BlockBasedTableTest, PropertiesMetaBlockLast) { // The properties meta-block should come at the end since we always need to // read it when opening a file, unlike index/filter/other meta-blocks, which diff --git a/test_util/testharness.h b/test_util/testharness.h index 69018629a5..d8b6c9679c 100644 --- a/test_util/testharness.h +++ b/test_util/testharness.h @@ -51,6 +51,11 @@ GTEST_SUCCESS_("BYPASSED: " m); \ } while (false) /* user ; */ +// Avoid "loss of precision" warnings when passing in 64-bit integers +#define EXPECT_NEAR2(val1, val2, abs_error) \ + EXPECT_NEAR(static_cast(val1), static_cast(val2), \ + static_cast(abs_error)) + #include #include "port/stack_trace.h" diff --git a/util/stop_watch.h b/util/stop_watch.h index 0ecd1bb118..c2ffeeccc7 100644 --- a/util/stop_watch.h +++ b/util/stop_watch.h @@ -126,6 +126,8 @@ class StopWatchNano { return (clock_ != nullptr) ? ElapsedNanos(reset) : 0U; } + bool IsStarted() { return start_ != 0; } + private: SystemClock* clock_; uint64_t start_;