diff --git a/table/block_based_table_builder.cc b/table/block_based_table_builder.cc index 94bcd1b9f2..59c385d65a 100644 --- a/table/block_based_table_builder.cc +++ b/table/block_based_table_builder.cc @@ -103,19 +103,19 @@ bool GoodCompressionRatio(size_t compressed_size, size_t raw_size) { } // namespace // format_version is the block format as defined in include/rocksdb/table.h -Slice CompressBlock(const Slice& raw, const CompressionInfo& compression_info, +Slice CompressBlock(const Slice& raw, const CompressionContext& compression_ctx, CompressionType* type, uint32_t format_version, std::string* compressed_output) { - *type = compression_info.type(); - if (compression_info.type() == kNoCompression) { + *type = compression_ctx.type(); + if (compression_ctx.type() == kNoCompression) { return raw; } // Will return compressed block contents if (1) the compression method is // supported in this platform and (2) the compression rate is "good enough". - switch (compression_info.type()) { + switch (compression_ctx.type()) { case kSnappyCompression: - if (Snappy_Compress(compression_info, raw.data(), raw.size(), + if (Snappy_Compress(compression_ctx, raw.data(), raw.size(), compressed_output) && GoodCompressionRatio(compressed_output->size(), raw.size())) { return *compressed_output; @@ -123,7 +123,7 @@ Slice CompressBlock(const Slice& raw, const CompressionInfo& compression_info, break; // fall back to no compression. case kZlibCompression: if (Zlib_Compress( - compression_info, + compression_ctx, GetCompressFormatForVersion(kZlibCompression, format_version), raw.data(), raw.size(), compressed_output) && GoodCompressionRatio(compressed_output->size(), raw.size())) { @@ -132,7 +132,7 @@ Slice CompressBlock(const Slice& raw, const CompressionInfo& compression_info, break; // fall back to no compression. case kBZip2Compression: if (BZip2_Compress( - compression_info, + compression_ctx, GetCompressFormatForVersion(kBZip2Compression, format_version), raw.data(), raw.size(), compressed_output) && GoodCompressionRatio(compressed_output->size(), raw.size())) { @@ -141,7 +141,7 @@ Slice CompressBlock(const Slice& raw, const CompressionInfo& compression_info, break; // fall back to no compression. case kLZ4Compression: if (LZ4_Compress( - compression_info, + compression_ctx, GetCompressFormatForVersion(kLZ4Compression, format_version), raw.data(), raw.size(), compressed_output) && GoodCompressionRatio(compressed_output->size(), raw.size())) { @@ -150,7 +150,7 @@ Slice CompressBlock(const Slice& raw, const CompressionInfo& compression_info, break; // fall back to no compression. case kLZ4HCCompression: if (LZ4HC_Compress( - compression_info, + compression_ctx, GetCompressFormatForVersion(kLZ4HCCompression, format_version), raw.data(), raw.size(), compressed_output) && GoodCompressionRatio(compressed_output->size(), raw.size())) { @@ -166,7 +166,7 @@ Slice CompressBlock(const Slice& raw, const CompressionInfo& compression_info, break; case kZSTD: case kZSTDNotFinalCompression: - if (ZSTD_Compress(compression_info, raw.data(), raw.size(), + if (ZSTD_Compress(compression_ctx, raw.data(), raw.size(), compressed_output) && GoodCompressionRatio(compressed_output->size(), raw.size())) { return *compressed_output; @@ -260,9 +260,8 @@ struct BlockBasedTableBuilder::Rep { PartitionedIndexBuilder* p_index_builder_ = nullptr; std::string last_key; - CompressionType compression_type; - CompressionOptions compression_opts; - CompressionDict compression_dict; + // Compression dictionary or nullptr + const std::string* compression_dict; CompressionContext compression_ctx; std::unique_ptr verify_ctx; TableProperties props; @@ -313,9 +312,8 @@ struct BlockBasedTableBuilder::Rep { table_options.data_block_hash_table_util_ratio), range_del_block(1 /* block_restart_interval */), internal_prefix_transform(_moptions.prefix_extractor.get()), - compression_type(_compression_type), - compression_opts(_compression_opts), - compression_ctx(_compression_type), + compression_dict(_compression_dict), + compression_ctx(_compression_type, _compression_opts), use_delta_encoding_for_index_values(table_opt.format_version >= 4 && !table_opt.block_align), compressed_cache_key_prefix_size(0), @@ -326,15 +324,6 @@ struct BlockBasedTableBuilder::Rep { column_family_name(_column_family_name), creation_time(_creation_time), oldest_key_time(_oldest_key_time) { - if (_compression_dict != nullptr) { - compression_dict.Init(*_compression_dict, - CompressionDict::Mode::kCompression, - _compression_type, _compression_opts.level); - } else { - compression_dict.Init(Slice() /* dict */, - CompressionDict::Mode::kEmpty, - _compression_type, _compression_opts.level); - } if (table_options.index_type == BlockBasedTableOptions::kTwoLevelIndexSearch) { p_index_builder_ = PartitionedIndexBuilder::CreateIndexBuilder( @@ -365,7 +354,7 @@ struct BlockBasedTableBuilder::Rep { _moptions.prefix_extractor != nullptr)); if (table_options.verify_compression) { verify_ctx.reset(new UncompressionContext(UncompressionContext::NoCache(), - compression_type)); + compression_ctx.type())); } } @@ -509,7 +498,7 @@ void BlockBasedTableBuilder::WriteBlock(const Slice& raw_block_contents, assert(ok()); Rep* r = rep_; - auto type = r->compression_type; + auto type = r->compression_ctx.type(); Slice block_contents; bool abort_compression = false; @@ -517,12 +506,24 @@ void BlockBasedTableBuilder::WriteBlock(const Slice& raw_block_contents, ShouldReportDetailedTime(r->ioptions.env, r->ioptions.statistics)); if (raw_block_contents.size() < kCompressionSizeLimit) { - CompressionInfo compression_info( - r->compression_opts, r->compression_ctx, - is_data_block ? r->compression_dict : CompressionDict::GetEmptyDict(), - r->compression_type); + Slice compression_dict; + if (is_data_block && r->compression_dict && r->compression_dict->size()) { + r->compression_ctx.dict() = *r->compression_dict; + if (r->table_options.verify_compression) { + assert(r->verify_ctx != nullptr); + r->verify_ctx->dict() = *r->compression_dict; + } + } else { + // Clear dictionary + r->compression_ctx.dict() = Slice(); + if (r->table_options.verify_compression) { + assert(r->verify_ctx != nullptr); + r->verify_ctx->dict() = Slice(); + } + } + block_contents = - CompressBlock(raw_block_contents, compression_info, &type, + CompressBlock(raw_block_contents, r->compression_ctx, &type, r->table_options.format_version, &r->compressed_output); // Some of the compression algorithms are known to be unreliable. If @@ -531,12 +532,8 @@ void BlockBasedTableBuilder::WriteBlock(const Slice& raw_block_contents, if (type != kNoCompression && r->table_options.verify_compression) { // Retrieve the uncompressed contents into a new buffer BlockContents contents; - UncompressionInfo uncompression_info( - *r->verify_ctx, - is_data_block ? r->compression_dict : CompressionDict::GetEmptyDict(), - r->compression_type); Status stat = UncompressBlockContentsForCompressionType( - uncompression_info, block_contents.data(), block_contents.size(), + *r->verify_ctx, block_contents.data(), block_contents.size(), &contents, r->table_options.format_version, r->ioptions); if (stat.ok()) { @@ -783,7 +780,7 @@ void BlockBasedTableBuilder::WritePropertiesBlock( ? rep_->ioptions.merge_operator->Name() : "nullptr"; rep_->props.compression_name = - CompressionTypeToString(rep_->compression_type); + CompressionTypeToString(rep_->compression_ctx.type()); rep_->props.prefix_extractor_name = rep_->moptions.prefix_extractor != nullptr ? rep_->moptions.prefix_extractor->Name() @@ -832,10 +829,10 @@ void BlockBasedTableBuilder::WritePropertiesBlock( void BlockBasedTableBuilder::WriteCompressionDictBlock( MetaIndexBuilder* meta_index_builder) { - if (rep_->compression_dict.GetRawDict().size()) { + if (rep_->compression_dict && rep_->compression_dict->size()) { BlockHandle compression_dict_block_handle; if (ok()) { - WriteRawBlock(rep_->compression_dict.GetRawDict(), kNoCompression, + WriteRawBlock(*rep_->compression_dict, kNoCompression, &compression_dict_block_handle); } if (ok()) { diff --git a/table/block_based_table_builder.h b/table/block_based_table_builder.h index 45e8e37e40..eba0bb7c13 100644 --- a/table/block_based_table_builder.h +++ b/table/block_based_table_builder.h @@ -131,7 +131,7 @@ class BlockBasedTableBuilder : public TableBuilder { const uint64_t kCompressionSizeLimit = std::numeric_limits::max(); }; -Slice CompressBlock(const Slice& raw, const CompressionInfo& info, +Slice CompressBlock(const Slice& raw, const CompressionContext& compression_ctx, CompressionType* type, uint32_t format_version, std::string* compressed_output); diff --git a/table/block_based_table_reader.cc b/table/block_based_table_reader.cc index dc8a46cdc1..9f2e02d680 100644 --- a/table/block_based_table_reader.cc +++ b/table/block_based_table_reader.cc @@ -1226,12 +1226,9 @@ Status BlockBasedTable::GetDataBlockFromCache( // Retrieve the uncompressed contents into a new buffer BlockContents contents; - UncompressionContext context(compressed_block->compression_type()); - CompressionDict dict; - dict.Init(compression_dict, CompressionDict::Mode::kUncompression, - compressed_block->compression_type()); - UncompressionInfo info(context, dict, compressed_block->compression_type()); - s = UncompressBlockContents(info, compressed_block->data(), + UncompressionContext uncompresssion_ctx(compressed_block->compression_type(), + compression_dict); + s = UncompressBlockContents(uncompresssion_ctx, compressed_block->data(), compressed_block->size(), &contents, format_version, ioptions); @@ -1304,13 +1301,11 @@ Status BlockBasedTable::PutDataBlockToCache( BlockContents contents; Statistics* statistics = ioptions.statistics; if (raw_block->compression_type() != kNoCompression) { - UncompressionContext context(raw_block->compression_type()); - CompressionDict dict; - dict.Init(compression_dict, CompressionDict::Mode::kUncompression, - raw_block->compression_type()); - UncompressionInfo info(context, dict, raw_block->compression_type()); - s = UncompressBlockContents(info, raw_block->data(), raw_block->size(), - &contents, format_version, ioptions); + UncompressionContext uncompression_ctx(raw_block->compression_type(), + compression_dict); + s = UncompressBlockContents(uncompression_ctx, raw_block->data(), + raw_block->size(), &contents, format_version, + ioptions); } if (!s.ok()) { delete raw_block; diff --git a/table/block_fetcher.cc b/table/block_fetcher.cc index 4b68b09e12..ea97066ec4 100644 --- a/table/block_fetcher.cc +++ b/table/block_fetcher.cc @@ -227,13 +227,10 @@ Status BlockFetcher::ReadBlockContents() { if (do_uncompress_ && compression_type != kNoCompression) { // compressed page, uncompress, update cache - UncompressionContext context(compression_type); - CompressionDict dict; - dict.Init(compression_dict_, CompressionDict::Mode::kUncompression, - compression_type); - UncompressionInfo info(context, dict, compression_type); - status_ = UncompressBlockContents(info, slice_.data(), block_size_, - contents_, footer_.version(), ioptions_); + UncompressionContext uncompression_ctx(compression_type, compression_dict_); + status_ = + UncompressBlockContents(uncompression_ctx, slice_.data(), block_size_, + contents_, footer_.version(), ioptions_); } else { GetBlockContents(); } diff --git a/table/format.cc b/table/format.cc index 6fd9775f99..a4e448870b 100644 --- a/table/format.cc +++ b/table/format.cc @@ -284,18 +284,18 @@ Status ReadFooterFromFile(RandomAccessFileReader* file, } Status UncompressBlockContentsForCompressionType( - const UncompressionInfo& uncompression_info, const char* data, size_t n, + const UncompressionContext& uncompression_ctx, const char* data, size_t n, BlockContents* contents, uint32_t format_version, const ImmutableCFOptions& ioptions) { std::unique_ptr ubuf; - assert(uncompression_info.type() != kNoCompression && + assert(uncompression_ctx.type() != kNoCompression && "Invalid compression type"); StopWatchNano timer(ioptions.env, ShouldReportDetailedTime(ioptions.env, ioptions.statistics)); int decompress_size = 0; - switch (uncompression_info.type()) { + switch (uncompression_ctx.type()) { case kSnappyCompression: { size_t ulength = 0; static char snappy_corrupt_msg[] = @@ -312,7 +312,7 @@ Status UncompressBlockContentsForCompressionType( } case kZlibCompression: ubuf.reset(Zlib_Uncompress( - uncompression_info, data, n, &decompress_size, + uncompression_ctx, data, n, &decompress_size, GetCompressFormatForVersion(kZlibCompression, format_version))); if (!ubuf) { static char zlib_corrupt_msg[] = @@ -336,7 +336,7 @@ Status UncompressBlockContentsForCompressionType( break; case kLZ4Compression: ubuf.reset(LZ4_Uncompress( - uncompression_info, data, n, &decompress_size, + uncompression_ctx, data, n, &decompress_size, GetCompressFormatForVersion(kLZ4Compression, format_version))); if (!ubuf) { static char lz4_corrupt_msg[] = @@ -348,7 +348,7 @@ Status UncompressBlockContentsForCompressionType( break; case kLZ4HCCompression: ubuf.reset(LZ4_Uncompress( - uncompression_info, data, n, &decompress_size, + uncompression_ctx, data, n, &decompress_size, GetCompressFormatForVersion(kLZ4HCCompression, format_version))); if (!ubuf) { static char lz4hc_corrupt_msg[] = @@ -370,8 +370,7 @@ Status UncompressBlockContentsForCompressionType( break; case kZSTD: case kZSTDNotFinalCompression: - ubuf.reset( - ZSTD_Uncompress(uncompression_info, data, n, &decompress_size)); + ubuf.reset(ZSTD_Uncompress(uncompression_ctx, data, n, &decompress_size)); if (!ubuf) { static char zstd_corrupt_msg[] = "ZSTD not supported or corrupted ZSTD compressed block contents"; @@ -401,14 +400,14 @@ Status UncompressBlockContentsForCompressionType( // buffer is returned via 'result' and it is upto the caller to // free this buffer. // format_version is the block format as defined in include/rocksdb/table.h -Status UncompressBlockContents(const UncompressionInfo& uncompression_info, +Status UncompressBlockContents(const UncompressionContext& uncompression_ctx, const char* data, size_t n, BlockContents* contents, uint32_t format_version, const ImmutableCFOptions& ioptions) { assert(data[n] != kNoCompression); - assert(data[n] == uncompression_info.type()); + assert(data[n] == uncompression_ctx.type()); return UncompressBlockContentsForCompressionType( - uncompression_info, data, n, contents, format_version, ioptions); + uncompression_ctx, data, n, contents, format_version, ioptions); } } // namespace rocksdb diff --git a/table/format.h b/table/format.h index 80a318f7b6..ebc9c25397 100644 --- a/table/format.h +++ b/table/format.h @@ -250,17 +250,16 @@ extern Status ReadBlockContents( // free this buffer. // For description of compress_format_version and possible values, see // util/compression.h -extern Status UncompressBlockContents(const UncompressionInfo& info, - const char* data, size_t n, - BlockContents* contents, - uint32_t compress_format_version, - const ImmutableCFOptions& ioptions); +extern Status UncompressBlockContents( + const UncompressionContext& uncompression_ctx, const char* data, size_t n, + BlockContents* contents, uint32_t compress_format_version, + const ImmutableCFOptions& ioptions); // This is an extension to UncompressBlockContents that accepts // a specific compression type. This is used by un-wrapped blocks // with no compression header. extern Status UncompressBlockContentsForCompressionType( - const UncompressionInfo& info, const char* data, size_t n, + const UncompressionContext& uncompression_ctx, const char* data, size_t n, BlockContents* contents, uint32_t compress_format_version, const ImmutableCFOptions& ioptions); diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index 8096a3427e..c32822c8f1 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -1991,28 +1991,28 @@ class Benchmark { return true; } - inline bool CompressSlice(const CompressionInfo& compression_info, + inline bool CompressSlice(const CompressionContext& compression_ctx, const Slice& input, std::string* compressed) { bool ok = true; switch (FLAGS_compression_type_e) { case rocksdb::kSnappyCompression: - ok = Snappy_Compress(compression_info, input.data(), input.size(), + ok = Snappy_Compress(compression_ctx, input.data(), input.size(), compressed); break; case rocksdb::kZlibCompression: - ok = Zlib_Compress(compression_info, 2, input.data(), input.size(), + ok = Zlib_Compress(compression_ctx, 2, input.data(), input.size(), compressed); break; case rocksdb::kBZip2Compression: - ok = BZip2_Compress(compression_info, 2, input.data(), input.size(), + ok = BZip2_Compress(compression_ctx, 2, input.data(), input.size(), compressed); break; case rocksdb::kLZ4Compression: - ok = LZ4_Compress(compression_info, 2, input.data(), input.size(), + ok = LZ4_Compress(compression_ctx, 2, input.data(), input.size(), compressed); break; case rocksdb::kLZ4HCCompression: - ok = LZ4HC_Compress(compression_info, 2, input.data(), input.size(), + ok = LZ4HC_Compress(compression_ctx, 2, input.data(), input.size(), compressed); break; case rocksdb::kXpressCompression: @@ -2020,7 +2020,7 @@ class Benchmark { input.size(), compressed); break; case rocksdb::kZSTD: - ok = ZSTD_Compress(compression_info, input.data(), input.size(), + ok = ZSTD_Compress(compression_ctx, input.data(), input.size(), compressed); break; default: @@ -2103,11 +2103,10 @@ class Benchmark { const int len = FLAGS_block_size; std::string input_str(len, 'y'); std::string compressed; - CompressionOptions opts; - CompressionContext context(FLAGS_compression_type_e); - CompressionInfo info(opts, context, CompressionDict::GetEmptyDict(), - FLAGS_compression_type_e); - bool result = CompressSlice(info, Slice(input_str), &compressed); + CompressionContext compression_ctx(FLAGS_compression_type_e, + Options().compression_opts); + bool result = + CompressSlice(compression_ctx, Slice(input_str), &compressed); if (!result) { fprintf(stdout, "WARNING: %s compression is not enabled\n", @@ -2957,14 +2956,13 @@ void VerifyDBFromDB(std::string& truth_db_name) { int64_t produced = 0; bool ok = true; std::string compressed; - CompressionOptions opts; - CompressionContext context(FLAGS_compression_type_e); - CompressionInfo info(opts, context, CompressionDict::GetEmptyDict(), - FLAGS_compression_type_e); + CompressionContext compression_ctx(FLAGS_compression_type_e, + Options().compression_opts); + // Compress 1G while (ok && bytes < int64_t(1) << 30) { compressed.clear(); - ok = CompressSlice(info, input, &compressed); + ok = CompressSlice(compression_ctx, input, &compressed); produced += compressed.size(); bytes += input.size(); thread->stats.FinishedOps(nullptr, nullptr, 1, kCompress); @@ -2986,17 +2984,11 @@ void VerifyDBFromDB(std::string& truth_db_name) { Slice input = gen.Generate(FLAGS_block_size); std::string compressed; - CompressionContext compression_ctx(FLAGS_compression_type_e); - CompressionOptions compression_opts; - CompressionInfo compression_info(compression_opts, compression_ctx, - CompressionDict::GetEmptyDict(), - FLAGS_compression_type_e); UncompressionContext uncompression_ctx(FLAGS_compression_type_e); - UncompressionInfo uncompression_info(uncompression_ctx, - CompressionDict::GetEmptyDict(), - FLAGS_compression_type_e); + CompressionContext compression_ctx(FLAGS_compression_type_e, + Options().compression_opts); - bool ok = CompressSlice(compression_info, input, &compressed); + bool ok = CompressSlice(compression_ctx, input, &compressed); int64_t bytes = 0; int decompress_size; while (ok && bytes < 1024 * 1048576) { @@ -3016,7 +3008,7 @@ void VerifyDBFromDB(std::string& truth_db_name) { break; } case rocksdb::kZlibCompression: - uncompressed = Zlib_Uncompress(uncompression_info, compressed.data(), + uncompressed = Zlib_Uncompress(uncompression_ctx, compressed.data(), compressed.size(), &decompress_size, 2); ok = uncompressed != nullptr; break; @@ -3026,12 +3018,12 @@ void VerifyDBFromDB(std::string& truth_db_name) { ok = uncompressed != nullptr; break; case rocksdb::kLZ4Compression: - uncompressed = LZ4_Uncompress(uncompression_info, compressed.data(), + uncompressed = LZ4_Uncompress(uncompression_ctx, compressed.data(), compressed.size(), &decompress_size, 2); ok = uncompressed != nullptr; break; case rocksdb::kLZ4HCCompression: - uncompressed = LZ4_Uncompress(uncompression_info, compressed.data(), + uncompressed = LZ4_Uncompress(uncompression_ctx, compressed.data(), compressed.size(), &decompress_size, 2); ok = uncompressed != nullptr; break; @@ -3041,7 +3033,7 @@ void VerifyDBFromDB(std::string& truth_db_name) { ok = uncompressed != nullptr; break; case rocksdb::kZSTD: - uncompressed = ZSTD_Uncompress(uncompression_info, compressed.data(), + uncompressed = ZSTD_Uncompress(uncompression_ctx, compressed.data(), compressed.size(), &decompress_size); ok = uncompressed != nullptr; break; diff --git a/util/compression.h b/util/compression.h index 21822f0204..e918e14fbe 100644 --- a/util/compression.h +++ b/util/compression.h @@ -133,147 +133,13 @@ class ZSTDUncompressCachedData { namespace rocksdb { -// Holds dictionary and related data, like ZSTD's digested dictionary. -struct CompressionDict { - enum class Mode { - kUninit, - kEmpty, // An empty one can be used for both compression and uncompression - kCompression, - kUncompression, - }; -#if ZSTD_VERSION_NUMBER >= 700 - union { - ZSTD_CDict* zstd_cdict_; - ZSTD_DDict* zstd_ddict_; - }; -#endif // ZSTD_VERSION_NUMBER >= 700 - Mode mode_ = Mode::kUninit; - Slice dict_; - - public: - static const CompressionDict& GetEmptyDict() { - static CompressionDict empty_dict{}; - static bool init = false; - if (!init) { - empty_dict.Init(Slice() /* dict */, Mode::kEmpty, - false /* use_zstd_trainer */); - init = true; - } - return empty_dict; - } - - void Init(Slice dict, Mode mode, CompressionType type, int level = -1) { - return Init(dict, mode, type == kZSTD || type == kZSTDNotFinalCompression, - level); - } - - private: -#if ZSTD_VERSION_NUMBER >= 700 - void Init(Slice dict, Mode mode, bool use_zstd_trainer, int level = -1) { -#else // ZSTD_VERSION_NUMBER >= 700 - void Init(Slice dict, Mode mode, bool /* use_zstd_trainer */, - int /*level*/ = -1) { -#endif // ZSTD_VERSION_NUMBER >= 700 - assert(mode_ == Mode::kUninit); - dict_ = std::move(dict); - mode_ = mode; - switch (mode) { - case Mode::kUninit: - assert(false); - break; - case Mode::kEmpty: - break; - case Mode::kCompression: -#if ZSTD_VERSION_NUMBER >= 700 - zstd_cdict_ = nullptr; - if (!dict_.empty() && use_zstd_trainer) { - if (level == CompressionOptions::kDefaultCompressionLevel) { - // 3 is the value of ZSTD_CLEVEL_DEFAULT (not exposed publicly), see - // https://github.com/facebook/zstd/issues/1148 - level = 3; - } - // Should be safe (but slower) if below call fails as we'll use the - // raw dictionary to compress. - zstd_cdict_ = ZSTD_createCDict(dict_.data(), dict_.size(), level); - assert(zstd_cdict_ != nullptr); - } -#endif // ZSTD_VERSION_NUMBER >= 700 - break; - case Mode::kUncompression: -#if ZSTD_VERSION_NUMBER >= 700 - zstd_ddict_ = nullptr; - if (!dict_.empty() && use_zstd_trainer) { - zstd_ddict_ = ZSTD_createDDict(dict_.data(), dict_.size()); - assert(zstd_ddict_ != nullptr); - } -#endif // ZSTD_VERSION_NUMBER >= 700 - break; - } - } - - public: - ~CompressionDict() { -#if ZSTD_VERSION_NUMBER >= 700 - size_t res = 0; - switch (mode_) { - case Mode::kUninit: - break; - case Mode::kEmpty: - break; - case Mode::kCompression: - if (zstd_cdict_ != nullptr) { - res = ZSTD_freeCDict(zstd_cdict_); - } - break; - case Mode::kUncompression: - if (zstd_ddict_ != nullptr) { - res = ZSTD_freeDDict(zstd_ddict_); - } - break; - } - assert(res == 0); // Last I checked they can't fail - (void)res; // prevent unused var warning -#endif // ZSTD_VERSION_NUMBER >= 700 - } - -#if ZSTD_VERSION_NUMBER >= 700 - const ZSTD_CDict* GetDigestedZstdCDict() const { - assert(mode_ != Mode::kUninit); - if (mode_ == Mode::kEmpty) { - return nullptr; - } - assert(mode_ == Mode::kCompression); - return zstd_cdict_; - } - - const ZSTD_DDict* GetDigestedZstdDDict() const { - assert(mode_ != Mode::kUninit); - if (mode_ == Mode::kEmpty) { - return nullptr; - } - assert(mode_ == Mode::kUncompression); - return zstd_ddict_; - } -#endif // ZSTD_VERSION_NUMBER >= 700 - - Slice GetRawDict() const { - assert(mode_ != Mode::kUninit); - assert(mode_ != Mode::kEmpty || dict_.empty()); - return dict_; - } - - CompressionDict() = default; - // Disable copy/move - CompressionDict(const CompressionDict&) = delete; - CompressionDict& operator=(const CompressionDict&) = delete; - CompressionDict(CompressionDict&&) = delete; - CompressionDict& operator=(CompressionDict&&) = delete; -}; - +// Instantiate this class and pass it to the uncompression API below class CompressionContext { -#if defined(ZSTD) && (ZSTD_VERSION_NUMBER >= 500) private: const CompressionType type_; + const CompressionOptions opts_; + Slice dict_; +#if defined(ZSTD) && (ZSTD_VERSION_NUMBER >= 500) ZSTD_CCtx* zstd_ctx_ = nullptr; void CreateNativeContext() { if (type_ == kZSTD || type_ == kZSTDNotFinalCompression) { @@ -297,45 +163,35 @@ class CompressionContext { assert(type_ == kZSTD || type_ == kZSTDNotFinalCompression); return zstd_ctx_; } - - explicit CompressionContext(CompressionType comp_type) : type_(comp_type) { - CreateNativeContext(); - } - #else // ZSTD && (ZSTD_VERSION_NUMBER >= 500) - public: - explicit CompressionContext(CompressionType /* comp_type */) {} private: void CreateNativeContext() {} void DestroyNativeContext() {} #endif // ZSTD && (ZSTD_VERSION_NUMBER >= 500) public: + explicit CompressionContext(CompressionType comp_type) : type_(comp_type) { + CreateNativeContext(); + } + CompressionContext(CompressionType comp_type, const CompressionOptions& opts, + const Slice& comp_dict = Slice()) + : type_(comp_type), opts_(opts), dict_(comp_dict) { + CreateNativeContext(); + } ~CompressionContext() { DestroyNativeContext(); } CompressionContext(const CompressionContext&) = delete; CompressionContext& operator=(const CompressionContext&) = delete; -}; - -class CompressionInfo { - const CompressionOptions& opts_; - const CompressionContext& context_; - const CompressionDict& dict_; - const CompressionType type_; - - public: - CompressionInfo(const CompressionOptions& _opts, - const CompressionContext& _context, - const CompressionDict& _dict, CompressionType _type) - : opts_(_opts), context_(_context), dict_(_dict), type_(_type) {} const CompressionOptions& options() const { return opts_; } - const CompressionContext& context() const { return context_; } - const CompressionDict& dict() const { return dict_; } CompressionType type() const { return type_; } + const Slice& dict() const { return dict_; } + Slice& dict() { return dict_; } }; +// Instantiate this class and pass it to the uncompression API below class UncompressionContext { private: - const CompressionType type_; + CompressionType type_; + Slice dict_; CompressionContextCache* ctx_cache_ = nullptr; ZSTDUncompressCachedData uncomp_cached_data_; @@ -343,8 +199,10 @@ class UncompressionContext { struct NoCache {}; // Do not use context cache, used by TableBuilder UncompressionContext(NoCache, CompressionType comp_type) : type_(comp_type) {} - - explicit UncompressionContext(CompressionType comp_type) : type_(comp_type) { + explicit UncompressionContext(CompressionType comp_type) + : UncompressionContext(comp_type, Slice()) {} + UncompressionContext(CompressionType comp_type, const Slice& comp_dict) + : type_(comp_type), dict_(comp_dict) { if (type_ == kZSTD || type_ == kZSTDNotFinalCompression) { ctx_cache_ = CompressionContextCache::Instance(); uncomp_cached_data_ = ctx_cache_->GetCachedZSTDUncompressData(); @@ -364,21 +222,9 @@ class UncompressionContext { ZSTDUncompressCachedData::ZSTDNativeContext GetZSTDContext() const { return uncomp_cached_data_.Get(); } -}; - -class UncompressionInfo { - const UncompressionContext& context_; - const CompressionDict& dict_; - const CompressionType type_; - - public: - UncompressionInfo(const UncompressionContext& _context, - const CompressionDict& _dict, CompressionType _type) - : context_(_context), dict_(_dict), type_(_type) {} - - const UncompressionContext& context() const { return context_; } - const CompressionDict& dict() const { return dict_; } CompressionType type() const { return type_; } + const Slice& dict() const { return dict_; } + Slice& dict() { return dict_; } }; inline bool Snappy_Supported() { @@ -497,8 +343,9 @@ inline std::string CompressionTypeToString(CompressionType compression_type) { // 2 -- Zlib, BZip2 and LZ4 encode decompressed size as Varint32 just before the // start of compressed block. Snappy format is the same as version 1. -inline bool Snappy_Compress(const CompressionInfo& /*info*/, const char* input, - size_t length, ::std::string* output) { +inline bool Snappy_Compress(const CompressionContext& /*ctx*/, + const char* input, size_t length, + ::std::string* output) { #ifdef SNAPPY output->resize(snappy::MaxCompressedLength(length)); size_t outlen; @@ -563,7 +410,7 @@ inline bool GetDecompressedSizeInfo(const char** input_data, // header in varint32 format // @param compression_dict Data for presetting the compression library's // dictionary. -inline bool Zlib_Compress(const CompressionInfo& info, +inline bool Zlib_Compress(const CompressionContext& ctx, uint32_t compress_format_version, const char* input, size_t length, ::std::string* output) { #ifdef ZLIB @@ -588,25 +435,24 @@ inline bool Zlib_Compress(const CompressionInfo& info, // The default value is 8. See zconf.h for more details. static const int memLevel = 8; int level; - if (info.options().level == CompressionOptions::kDefaultCompressionLevel) { + if (ctx.options().level == CompressionOptions::kDefaultCompressionLevel) { level = Z_DEFAULT_COMPRESSION; } else { - level = info.options().level; + level = ctx.options().level; } z_stream _stream; memset(&_stream, 0, sizeof(z_stream)); - int st = deflateInit2(&_stream, level, Z_DEFLATED, info.options().window_bits, - memLevel, info.options().strategy); + int st = deflateInit2(&_stream, level, Z_DEFLATED, ctx.options().window_bits, + memLevel, ctx.options().strategy); if (st != Z_OK) { return false; } - Slice compression_dict = info.dict().GetRawDict(); - if (compression_dict.size()) { + if (ctx.dict().size()) { // Initialize the compression library's dictionary - st = deflateSetDictionary( - &_stream, reinterpret_cast(compression_dict.data()), - static_cast(compression_dict.size())); + st = deflateSetDictionary(&_stream, + reinterpret_cast(ctx.dict().data()), + static_cast(ctx.dict().size())); if (st != Z_OK) { deflateEnd(&_stream); return false; @@ -634,7 +480,7 @@ inline bool Zlib_Compress(const CompressionInfo& info, deflateEnd(&_stream); return compressed; #else - (void)info; + (void)ctx; (void)compress_format_version; (void)input; (void)length; @@ -649,7 +495,7 @@ inline bool Zlib_Compress(const CompressionInfo& info, // header in varint32 format // @param compression_dict Data for presetting the compression library's // dictionary. -inline char* Zlib_Uncompress(const UncompressionInfo& info, +inline char* Zlib_Uncompress(const UncompressionContext& ctx, const char* input_data, size_t input_length, int* decompress_size, uint32_t compress_format_version, @@ -682,12 +528,11 @@ inline char* Zlib_Uncompress(const UncompressionInfo& info, return nullptr; } - Slice compression_dict = info.dict().GetRawDict(); - if (compression_dict.size()) { + if (ctx.dict().size()) { // Initialize the compression library's dictionary - st = inflateSetDictionary( - &_stream, reinterpret_cast(compression_dict.data()), - static_cast(compression_dict.size())); + st = inflateSetDictionary(&_stream, + reinterpret_cast(ctx.dict().data()), + static_cast(ctx.dict().size())); if (st != Z_OK) { return nullptr; } @@ -740,7 +585,7 @@ inline char* Zlib_Uncompress(const UncompressionInfo& info, inflateEnd(&_stream); return output; #else - (void)info; + (void)ctx; (void)input_data; (void)input_length; (void)decompress_size; @@ -754,7 +599,7 @@ inline char* Zlib_Uncompress(const UncompressionInfo& info, // block header // compress_format_version == 2 -- decompressed size is included in the block // header in varint32 format -inline bool BZip2_Compress(const CompressionInfo& /*info*/, +inline bool BZip2_Compress(const CompressionContext& /*ctx*/, uint32_t compress_format_version, const char* input, size_t length, ::std::string* output) { #ifdef BZIP2 @@ -901,7 +746,7 @@ inline char* BZip2_Uncompress(const char* input_data, size_t input_length, // header in varint32 format // @param compression_dict Data for presetting the compression library's // dictionary. -inline bool LZ4_Compress(const CompressionInfo& info, +inline bool LZ4_Compress(const CompressionContext& ctx, uint32_t compress_format_version, const char* input, size_t length, ::std::string* output) { #ifdef LZ4 @@ -929,10 +774,9 @@ inline bool LZ4_Compress(const CompressionInfo& info, int outlen; #if LZ4_VERSION_NUMBER >= 10400 // r124+ LZ4_stream_t* stream = LZ4_createStream(); - Slice compression_dict = info.dict().GetRawDict(); - if (compression_dict.size()) { - LZ4_loadDict(stream, compression_dict.data(), - static_cast(compression_dict.size())); + if (ctx.dict().size()) { + LZ4_loadDict(stream, ctx.dict().data(), + static_cast(ctx.dict().size())); } #if LZ4_VERSION_NUMBER >= 10700 // r129+ outlen = @@ -955,7 +799,7 @@ inline bool LZ4_Compress(const CompressionInfo& info, output->resize(static_cast(output_header_len + outlen)); return true; #else // LZ4 - (void)info; + (void)ctx; (void)compress_format_version; (void)input; (void)length; @@ -970,7 +814,7 @@ inline bool LZ4_Compress(const CompressionInfo& info, // header in varint32 format // @param compression_dict Data for presetting the compression library's // dictionary. -inline char* LZ4_Uncompress(const UncompressionInfo& info, +inline char* LZ4_Uncompress(const UncompressionContext& ctx, const char* input_data, size_t input_length, int* decompress_size, uint32_t compress_format_version) { @@ -996,10 +840,9 @@ inline char* LZ4_Uncompress(const UncompressionInfo& info, char* output = new char[output_len]; #if LZ4_VERSION_NUMBER >= 10400 // r124+ LZ4_streamDecode_t* stream = LZ4_createStreamDecode(); - Slice compression_dict = info.dict().GetRawDict(); - if (compression_dict.size()) { - LZ4_setStreamDecode(stream, compression_dict.data(), - static_cast(compression_dict.size())); + if (ctx.dict().size()) { + LZ4_setStreamDecode(stream, ctx.dict().data(), + static_cast(ctx.dict().size())); } *decompress_size = LZ4_decompress_safe_continue( stream, input_data, output, static_cast(input_length), @@ -1018,7 +861,7 @@ inline char* LZ4_Uncompress(const UncompressionInfo& info, assert(*decompress_size == static_cast(output_len)); return output; #else // LZ4 - (void)info; + (void)ctx; (void)input_data; (void)input_length; (void)decompress_size; @@ -1033,7 +876,7 @@ inline char* LZ4_Uncompress(const UncompressionInfo& info, // header in varint32 format // @param compression_dict Data for presetting the compression library's // dictionary. -inline bool LZ4HC_Compress(const CompressionInfo& info, +inline bool LZ4HC_Compress(const CompressionContext& ctx, uint32_t compress_format_version, const char* input, size_t length, ::std::string* output) { #ifdef LZ4 @@ -1060,18 +903,17 @@ inline bool LZ4HC_Compress(const CompressionInfo& info, int outlen; int level; - if (info.options().level == CompressionOptions::kDefaultCompressionLevel) { + if (ctx.options().level == CompressionOptions::kDefaultCompressionLevel) { level = 0; // lz4hc.h says any value < 1 will be sanitized to default } else { - level = info.options().level; + level = ctx.options().level; } #if LZ4_VERSION_NUMBER >= 10400 // r124+ LZ4_streamHC_t* stream = LZ4_createStreamHC(); LZ4_resetStreamHC(stream, level); - Slice compression_dict = info.dict().GetRawDict(); const char* compression_dict_data = - compression_dict.size() > 0 ? compression_dict.data() : nullptr; - size_t compression_dict_size = compression_dict.size(); + ctx.dict().size() > 0 ? ctx.dict().data() : nullptr; + size_t compression_dict_size = ctx.dict().size(); LZ4_loadDictHC(stream, compression_dict_data, static_cast(compression_dict_size)); @@ -1102,7 +944,7 @@ inline bool LZ4HC_Compress(const CompressionInfo& info, output->resize(static_cast(output_header_len + outlen)); return true; #else // LZ4 - (void)info; + (void)ctx; (void)compress_format_version; (void)input; (void)length; @@ -1136,7 +978,9 @@ inline char* XPRESS_Uncompress(const char* /*input_data*/, } #endif -inline bool ZSTD_Compress(const CompressionInfo& info, const char* input, +// @param compression_dict Data for presetting the compression library's +// dictionary. +inline bool ZSTD_Compress(const CompressionContext& ctx, const char* input, size_t length, ::std::string* output) { #ifdef ZSTD if (length > std::numeric_limits::max()) { @@ -1151,29 +995,19 @@ inline bool ZSTD_Compress(const CompressionInfo& info, const char* input, output->resize(static_cast(output_header_len + compressBound)); size_t outlen = 0; int level; - if (info.options().level == CompressionOptions::kDefaultCompressionLevel) { + if (ctx.options().level == CompressionOptions::kDefaultCompressionLevel) { // 3 is the value of ZSTD_CLEVEL_DEFAULT (not exposed publicly), see // https://github.com/facebook/zstd/issues/1148 level = 3; } else { - level = info.options().level; + level = ctx.options().level; } #if ZSTD_VERSION_NUMBER >= 500 // v0.5.0+ - ZSTD_CCtx* context = info.context().ZSTDPreallocCtx(); + ZSTD_CCtx* context = ctx.ZSTDPreallocCtx(); assert(context != nullptr); -#if ZSTD_VERSION_NUMBER >= 700 // v0.7.0+ - if (info.dict().GetDigestedZstdCDict() != nullptr) { - outlen = ZSTD_compress_usingCDict(context, &(*output)[output_header_len], - compressBound, input, length, - info.dict().GetDigestedZstdCDict()); - } -#endif // ZSTD_VERSION_NUMBER >= 700 - if (outlen == 0) { - outlen = ZSTD_compress_usingDict(context, &(*output)[output_header_len], - compressBound, input, length, - info.dict().GetRawDict().data(), - info.dict().GetRawDict().size(), level); - } + outlen = ZSTD_compress_usingDict(context, &(*output)[output_header_len], + compressBound, input, length, + ctx.dict().data(), ctx.dict().size(), level); #else // up to v0.4.x outlen = ZSTD_compress(&(*output)[output_header_len], compressBound, input, length, level); @@ -1184,7 +1018,7 @@ inline bool ZSTD_Compress(const CompressionInfo& info, const char* input, output->resize(output_header_len + outlen); return true; #else // ZSTD - (void)info; + (void)ctx; (void)input; (void)length; (void)output; @@ -1194,7 +1028,7 @@ inline bool ZSTD_Compress(const CompressionInfo& info, const char* input, // @param compression_dict Data for presetting the compression library's // dictionary. -inline char* ZSTD_Uncompress(const UncompressionInfo& info, +inline char* ZSTD_Uncompress(const UncompressionContext& ctx, const char* input_data, size_t input_length, int* decompress_size) { #ifdef ZSTD @@ -1205,24 +1039,14 @@ inline char* ZSTD_Uncompress(const UncompressionInfo& info, } char* output = new char[output_len]; - size_t actual_output_length = 0; + size_t actual_output_length; #if ZSTD_VERSION_NUMBER >= 500 // v0.5.0+ - ZSTD_DCtx* context = info.context().GetZSTDContext(); + ZSTD_DCtx* context = ctx.GetZSTDContext(); assert(context != nullptr); -#if ZSTD_VERSION_NUMBER >= 700 // v0.7.0+ - if (info.dict().GetDigestedZstdDDict() != nullptr) { - actual_output_length = ZSTD_decompress_usingDDict( - context, output, output_len, input_data, input_length, - info.dict().GetDigestedZstdDDict()); - } -#endif // ZSTD_VERSION_NUMBER >= 700 - if (actual_output_length == 0) { - actual_output_length = ZSTD_decompress_usingDict( - context, output, output_len, input_data, input_length, - info.dict().GetRawDict().data(), info.dict().GetRawDict().size()); - } + actual_output_length = ZSTD_decompress_usingDict( + context, output, output_len, input_data, input_length, ctx.dict().data(), + ctx.dict().size()); #else // up to v0.4.x - (void) info; actual_output_length = ZSTD_decompress(output, output_len, input_data, input_length); #endif // ZSTD_VERSION_NUMBER >= 500 @@ -1230,7 +1054,7 @@ inline char* ZSTD_Uncompress(const UncompressionInfo& info, *decompress_size = static_cast(actual_output_length); return output; #else // ZSTD - (void)info; + (void)ctx; (void)input_data; (void)input_length; (void)decompress_size; @@ -1245,10 +1069,6 @@ inline std::string ZSTD_TrainDictionary(const std::string& samples, // available for dynamic linking until v1.1.3. For now we enable the feature // in v1.1.3+ only. #if ZSTD_VERSION_NUMBER >= 10103 // v1.1.3+ - assert(samples.empty() == sample_lens.empty()); - if (samples.empty()) { - return ""; - } std::string dict_data(max_dict_bytes, '\0'); size_t dict_len = ZDICT_trainFromBuffer( &dict_data[0], max_dict_bytes, &samples[0], &sample_lens[0], diff --git a/utilities/blob_db/blob_db_impl.cc b/utilities/blob_db/blob_db_impl.cc index 867b83a219..50c9a07846 100644 --- a/utilities/blob_db/blob_db_impl.cc +++ b/utilities/blob_db/blob_db_impl.cc @@ -738,11 +738,9 @@ Slice BlobDBImpl::GetCompressedSlice(const Slice& raw, return raw; } StopWatch compression_sw(env_, statistics_, BLOB_DB_COMPRESSION_MICROS); - CompressionType type = bdb_options_.compression; - CompressionOptions opts; - CompressionContext context(type); - CompressionInfo info(opts, context, CompressionDict::GetEmptyDict(), type); - CompressBlock(raw, info, &type, kBlockBasedTableVersionFormat, + CompressionType ct = bdb_options_.compression; + CompressionContext compression_ctx(ct); + CompressBlock(raw, compression_ctx, &ct, kBlockBasedTableVersionFormat, compression_output); return *compression_output; } @@ -1075,11 +1073,9 @@ Status BlobDBImpl::GetBlobValue(const Slice& key, const Slice& index_entry, { StopWatch decompression_sw(env_, statistics_, BLOB_DB_DECOMPRESSION_MICROS); - UncompressionContext context(bfile->compression()); - UncompressionInfo info(context, CompressionDict::GetEmptyDict(), - bfile->compression()); + UncompressionContext uncompression_ctx(bfile->compression()); s = UncompressBlockContentsForCompressionType( - info, blob_value.data(), blob_value.size(), &contents, + uncompression_ctx, blob_value.data(), blob_value.size(), &contents, kBlockBasedTableVersionFormat, *(cfh->cfd()->ioptions())); } value->PinSelf(contents.data); diff --git a/utilities/blob_db/blob_dump_tool.cc b/utilities/blob_db/blob_dump_tool.cc index e5ab07c47d..7ce0697e3b 100644 --- a/utilities/blob_db/blob_dump_tool.cc +++ b/utilities/blob_db/blob_dump_tool.cc @@ -208,12 +208,10 @@ Status BlobDumpTool::DumpRecord(DisplayType show_key, DisplayType show_blob, if (compression != kNoCompression && (show_uncompressed_blob != DisplayType::kNone || show_summary)) { BlockContents contents; - UncompressionContext context(compression); - UncompressionInfo info(context, CompressionDict::GetEmptyDict(), - compression); + UncompressionContext uncompression_ctx(compression); s = UncompressBlockContentsForCompressionType( - info, slice.data() + key_size, static_cast(value_size), &contents, - 2 /*compress_format_version*/, ImmutableCFOptions(Options())); + uncompression_ctx, slice.data() + key_size, static_cast(value_size), + &contents, 2 /*compress_format_version*/, ImmutableCFOptions(Options())); if (!s.ok()) { return s; } diff --git a/utilities/column_aware_encoding_util.cc b/utilities/column_aware_encoding_util.cc index fca4fea9ea..222ee46803 100644 --- a/utilities/column_aware_encoding_util.cc +++ b/utilities/column_aware_encoding_util.cc @@ -89,9 +89,8 @@ void ColumnAwareEncodingReader::DecodeBlocks( CompressionType type = (CompressionType)slice_final_with_bit[slice_final_with_bit.size() - 1]; if (type != kNoCompression) { - UncompressionContext context(type); - UncompressionInfo info(context, CompressionDict::GetEmptyDict(), type); - UncompressBlockContents(info, slice_final_with_bit.c_str(), + UncompressionContext uncompression_ctx(type); + UncompressBlockContents(uncompression_ctx, slice_final_with_bit.c_str(), slice_final_with_bit.size() - 1, &contents, format_version, ioptions); content_ptr = contents.data.data(); @@ -172,9 +171,8 @@ void ColumnAwareEncodingReader::DecodeBlocksFromRowFormat( CompressionType type = (CompressionType)slice_final_with_bit[slice_final_with_bit.size() - 1]; if (type != kNoCompression) { - UncompressionContext context(type); - UncompressionInfo info(context, CompressionDict::GetEmptyDict(), type); - UncompressBlockContents(info, slice_final_with_bit.c_str(), + UncompressionContext uncompression_ctx(type); + UncompressBlockContents(uncompression_ctx, slice_final_with_bit.c_str(), slice_final_with_bit.size() - 1, &contents, format_version, ioptions); decoded_content = std::string(contents.data.data(), contents.data.size()); @@ -245,12 +243,10 @@ namespace { void CompressDataBlock(const std::string& output_content, Slice* slice_final, CompressionType* type, std::string* compressed_output) { - CompressionContext context(*type); - CompressionOptions opts; - CompressionInfo info(opts, context, CompressionDict::GetEmptyDict(), *type); + CompressionContext compression_ctx(*type); uint32_t format_version = 2; // hard-coded version - *slice_final = CompressBlock(output_content, info, type, format_version, - compressed_output); + *slice_final = CompressBlock(output_content, compression_ctx, type, + format_version, compressed_output); } } // namespace