diff --git a/CMakeLists.txt b/CMakeLists.txt index b49a13572b..0bd7311498 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -601,6 +601,7 @@ set(SOURCES table/block_based/full_filter_block.cc table/block_based/index_builder.cc table/block_based/partitioned_filter_block.cc + table/block_based/uncompression_dict_reader.cc table/block_fetcher.cc table/bloom_block.cc table/cuckoo/cuckoo_table_builder.cc diff --git a/HISTORY.md b/HISTORY.md index 04f194e925..d452a68a30 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -6,9 +6,10 @@ ### Public API Change * Now DB::Close() will return Aborted() error when there is unreleased snapshot. Users can retry after all snapshots are released. -* Index and filter blocks are now handled similarly to data blocks with regards to the block cache: instead of storing reader objects in the cache, only the blocks themselves are cached. In addition, index and filter blocks (as well as filter partitions) no longer get evicted from the cache when a table is closed. Moreover, index blocks can now use the compressed block cache (if any). +* Index, filter, and compression dictionary blocks are now handled similarly to data blocks with regards to the block cache: instead of storing objects in the cache, only the blocks themselves are cached. In addition, index, filter, and compression dictionary blocks (as well as filter partitions) no longer get evicted from the cache when a table is closed. Moreover, index blocks can now use the compressed block cache (if any), and cached index blocks can be shared among multiple table readers. * Partitions of partitioned indexes no longer affect the read amplification statistics. -* Due to the above refactoring, block cache eviction statistics for indexes and filters are temporarily broken. We plan to reintroduce them in a later phase. +* Due to the above refactoring, block cache eviction statistics for indexes, filters, and compression dictionaries are temporarily broken. We plan to reintroduce them in a later phase. +* Errors related to the retrieval of the compression dictionary are now propagated to the user. * options.keep_log_file_num will be enforced strictly all the time. File names of all log files will be tracked, which may take significantly amount of memory if options.keep_log_file_num is large and either of options.max_log_file_size or options.log_file_time_to_roll is set. * Add initial support for Get/Put with user timestamps. Users can specify timestamps via ReadOptions and WriteOptions when calling DB::Get and DB::Put. * Accessing a partition of a partitioned filter or index through a pinned reference is no longer considered a cache hit. @@ -26,6 +27,7 @@ * Allow DBImplSecondary to remove memtables with obsolete data after replaying MANIFEST and WAL. * Add an option `failed_move_fall_back_to_copy` (default is true) for external SST ingestion. When `move_files` is true and hard link fails, ingestion falls back to copy if `failed_move_fall_back_to_copy` is true. Otherwise, ingestion reports an error. * Add argument `--secondary_path` to ldb to open the database as the secondary instance. This would keep the original DB intact. +* Compression dictionary blocks are now prefetched and pinned in the cache (based on the customer's settings) the same way as index and filter blocks. ### Performance Improvements * Reduce binary search when iterator reseek into the same data block. @@ -35,6 +37,7 @@ * Log Writer will flush after finishing the whole record, rather than a fragment. * Lower MultiGet batching API latency by reading data blocks from disk in parallel * Improve performance of row_cache: make reads with newer snapshots than data in an SST file share the same cache key, except in some transaction cases. +* The compression dictionary is no longer copied to a new object upon retrieval. ### General Improvements * Added new status code kColumnFamilyDropped to distinguish between Column Family Dropped and DB Shutdown in progress. diff --git a/TARGETS b/TARGETS index 9246af3636..122da8b542 100644 --- a/TARGETS +++ b/TARGETS @@ -198,6 +198,7 @@ cpp_library( "table/block_based/full_filter_block.cc", "table/block_based/index_builder.cc", "table/block_based/partitioned_filter_block.cc", + "table/block_based/uncompression_dict_reader.cc", "table/block_fetcher.cc", "table/bloom_block.cc", "table/cuckoo/cuckoo_table_builder.cc", diff --git a/db/db_block_cache_test.cc b/db/db_block_cache_test.cc index 77f37da0d4..422fd83bc2 100644 --- a/db/db_block_cache_test.cc +++ b/db/db_block_cache_test.cc @@ -19,6 +19,9 @@ class DBBlockCacheTest : public DBTestBase { size_t hit_count_ = 0; size_t insert_count_ = 0; size_t failure_count_ = 0; + size_t compression_dict_miss_count_ = 0; + size_t compression_dict_hit_count_ = 0; + size_t compression_dict_insert_count_ = 0; size_t compressed_miss_count_ = 0; size_t compressed_hit_count_ = 0; size_t compressed_insert_count_ = 0; @@ -69,6 +72,15 @@ class DBBlockCacheTest : public DBTestBase { TestGetTickerCount(options, BLOCK_CACHE_COMPRESSED_ADD_FAILURES); } + void RecordCacheCountersForCompressionDict(const Options& options) { + compression_dict_miss_count_ = + TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_MISS); + compression_dict_hit_count_ = + TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_HIT); + compression_dict_insert_count_ = + TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_ADD); + } + void CheckCacheCounters(const Options& options, size_t expected_misses, size_t expected_hits, size_t expected_inserts, size_t expected_failures) { @@ -87,6 +99,28 @@ class DBBlockCacheTest : public DBTestBase { failure_count_ = new_failure_count; } + void CheckCacheCountersForCompressionDict( + const Options& options, size_t expected_compression_dict_misses, + size_t expected_compression_dict_hits, + size_t expected_compression_dict_inserts) { + size_t new_compression_dict_miss_count = + TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_MISS); + size_t new_compression_dict_hit_count = + TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_HIT); + size_t new_compression_dict_insert_count = + TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_ADD); + ASSERT_EQ(compression_dict_miss_count_ + expected_compression_dict_misses, + new_compression_dict_miss_count); + ASSERT_EQ(compression_dict_hit_count_ + expected_compression_dict_hits, + new_compression_dict_hit_count); + ASSERT_EQ( + compression_dict_insert_count_ + expected_compression_dict_inserts, + new_compression_dict_insert_count); + compression_dict_miss_count_ = new_compression_dict_miss_count; + compression_dict_hit_count_ = new_compression_dict_hit_count; + compression_dict_insert_count_ = new_compression_dict_insert_count; + } + void CheckCompressedCacheCounters(const Options& options, size_t expected_misses, size_t expected_hits, @@ -671,6 +705,8 @@ TEST_F(DBBlockCacheTest, CacheCompressionDict) { options.table_factory.reset(new BlockBasedTableFactory(table_options)); DestroyAndReopen(options); + RecordCacheCountersForCompressionDict(options); + for (int i = 0; i < kNumFiles; ++i) { ASSERT_EQ(i, NumTableFilesAtLevel(0, 0)); for (int j = 0; j < kNumEntriesPerFile; ++j) { @@ -683,27 +719,26 @@ TEST_F(DBBlockCacheTest, CacheCompressionDict) { ASSERT_EQ(0, NumTableFilesAtLevel(0)); ASSERT_EQ(kNumFiles, NumTableFilesAtLevel(1)); + // Compression dictionary blocks are preloaded. + CheckCacheCountersForCompressionDict( + options, kNumFiles /* expected_compression_dict_misses */, + 0 /* expected_compression_dict_hits */, + kNumFiles /* expected_compression_dict_inserts */); + // Seek to a key in a file. It should cause the SST's dictionary meta-block // to be read. RecordCacheCounters(options); - ASSERT_EQ(0, - TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_MISS)); - ASSERT_EQ(0, TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_ADD)); - ASSERT_EQ( - TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT), - 0); + RecordCacheCountersForCompressionDict(options); ReadOptions read_options; ASSERT_NE("NOT_FOUND", Get(Key(kNumFiles * kNumEntriesPerFile - 1))); - // Two blocks missed/added: dictionary and data block - // One block hit: index since it's prefetched - CheckCacheCounters(options, 2 /* expected_misses */, 1 /* expected_hits */, - 2 /* expected_inserts */, 0 /* expected_failures */); - ASSERT_EQ(1, - TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_MISS)); - ASSERT_EQ(1, TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_ADD)); - ASSERT_GT( - TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT), - 0); + // Two block hits: index and dictionary since they are prefetched + // One block missed/added: data block + CheckCacheCounters(options, 1 /* expected_misses */, 2 /* expected_hits */, + 1 /* expected_inserts */, 0 /* expected_failures */); + CheckCacheCountersForCompressionDict( + options, 0 /* expected_compression_dict_misses */, + 1 /* expected_compression_dict_hits */, + 0 /* expected_compression_dict_inserts */); } } diff --git a/db/version_set.cc b/db/version_set.cc index 281065d050..7d477a6806 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -3420,16 +3420,10 @@ VersionSet::VersionSet(const std::string& dbname, env_options_(storage_options), block_cache_tracer_(block_cache_tracer) {} -void CloseTables(void* ptr, size_t) { - TableReader* table_reader = reinterpret_cast(ptr); - table_reader->Close(); -} - VersionSet::~VersionSet() { // we need to delete column_family_set_ because its destructor depends on // VersionSet Cache* table_cache = column_family_set_->get_table_cache(); - table_cache->ApplyToAllCacheEntries(&CloseTables, false /* thread_safe */); column_family_set_.reset(); for (auto& file : obsolete_files_) { if (file.metadata->table_reader_handle) { diff --git a/include/rocksdb/cache.h b/include/rocksdb/cache.h index 410c2cf827..6bde575e0f 100644 --- a/include/rocksdb/cache.h +++ b/include/rocksdb/cache.h @@ -250,11 +250,6 @@ class Cache { virtual std::string GetPrintableOptions() const { return ""; } - // Mark the last inserted object as being a raw data block. This will be used - // in tests. The default implementation does nothing. - virtual void TEST_mark_as_data_block(const Slice& /*key*/, - size_t /*charge*/) {} - MemoryAllocator* memory_allocator() const { return memory_allocator_.get(); } private: diff --git a/src.mk b/src.mk index 4d635173b8..0f04fc7391 100644 --- a/src.mk +++ b/src.mk @@ -121,6 +121,7 @@ LIB_SOURCES = \ table/block_based/full_filter_block.cc \ table/block_based/index_builder.cc \ table/block_based/partitioned_filter_block.cc \ + table/block_based/uncompression_dict_reader.cc \ table/block_fetcher.cc \ table/bloom_block.cc \ table/cuckoo/cuckoo_table_builder.cc \ diff --git a/table/block_based/block_based_filter_block_test.cc b/table/block_based/block_based_filter_block_test.cc index 70bbde96ac..d223dec6e1 100644 --- a/table/block_based/block_based_filter_block_test.cc +++ b/table/block_based/block_based_filter_block_test.cc @@ -45,10 +45,7 @@ class TestHashFilter : public FilterPolicy { class MockBlockBasedTable : public BlockBasedTable { public: explicit MockBlockBasedTable(Rep* rep) - : BlockBasedTable(rep, nullptr /* block_cache_tracer */) { - // Initialize what Open normally does as much as necessary for the test - rep->cache_key_prefix_size = 10; - } + : BlockBasedTable(rep, nullptr /* block_cache_tracer */) {} }; class FilterBlockTest : public testing::Test { @@ -64,7 +61,6 @@ class FilterBlockTest : public testing::Test { : ioptions_(options_), env_options_(options_), icomp_(options_.comparator) { - table_options_.no_block_cache = true; table_options_.filter_policy.reset(new TestHashFilter); constexpr bool skip_filters = false; @@ -271,7 +267,6 @@ class BlockBasedFilterBlockTest : public testing::Test { : ioptions_(options_), env_options_(options_), icomp_(options_.comparator) { - table_options_.no_block_cache = true; table_options_.filter_policy.reset(NewBloomFilterPolicy(10)); constexpr bool skip_filters = false; diff --git a/table/block_based/block_based_table_reader.cc b/table/block_based/block_based_table_reader.cc index 000bc295fc..314763ec3b 100644 --- a/table/block_based/block_based_table_reader.cc +++ b/table/block_based/block_based_table_reader.cc @@ -63,7 +63,6 @@ extern const std::string kHashIndexPrefixesMetadataBlock; typedef BlockBasedTable::IndexReader IndexReader; BlockBasedTable::~BlockBasedTable() { - Close(); delete rep_; } @@ -148,8 +147,6 @@ void DeleteCachedEntry(const Slice& /*key*/, void* value) { delete entry; } -void DeleteCachedUncompressionDictEntry(const Slice& key, void* value); - // Release the cached entry and decrement its ref count. void ForceReleaseCachedEntry(void* arg, void* h) { Cache* cache = reinterpret_cast(arg); @@ -1419,37 +1416,6 @@ Status BlockBasedTable::ReadRangeDelBlock( return s; } -Status BlockBasedTable::ReadCompressionDictBlock( - FilePrefetchBuffer* prefetch_buffer, - std::unique_ptr* compression_dict_block) const { - assert(compression_dict_block != nullptr); - Status s; - if (!rep_->compression_dict_handle.IsNull()) { - std::unique_ptr compression_dict_cont{new BlockContents()}; - PersistentCacheOptions cache_options; - ReadOptions read_options; - read_options.verify_checksums = true; - BlockFetcher compression_block_fetcher( - rep_->file.get(), prefetch_buffer, rep_->footer, read_options, - rep_->compression_dict_handle, compression_dict_cont.get(), - rep_->ioptions, false /* decompress */, false /*maybe_compressed*/, - BlockType::kCompressionDictionary, UncompressionDict::GetEmptyDict(), - cache_options); - s = compression_block_fetcher.ReadBlockContents(); - - if (!s.ok()) { - ROCKS_LOG_WARN( - rep_->ioptions.info_log, - "Encountered error while reading data from compression dictionary " - "block %s", - s.ToString().c_str()); - } else { - *compression_dict_block = std::move(compression_dict_cont); - } - } - return s; -} - Status BlockBasedTable::PrefetchIndexAndFilterBlocks( FilePrefetchBuffer* prefetch_buffer, InternalIterator* meta_iter, BlockBasedTable* new_table, bool prefetch_all, @@ -1555,23 +1521,16 @@ Status BlockBasedTable::PrefetchIndexAndFilterBlocks( } } - // TODO(ajkr): also prefetch compression dictionary block - // TODO(ajkr): also pin compression dictionary block when - // `pin_l0_filter_and_index_blocks_in_cache == true`. - if (!table_options.cache_index_and_filter_blocks) { - std::unique_ptr compression_dict_block; - s = ReadCompressionDictBlock(prefetch_buffer, &compression_dict_block); + if (!rep_->compression_dict_handle.IsNull()) { + std::unique_ptr uncompression_dict_reader; + s = UncompressionDictReader::Create(this, prefetch_buffer, use_cache, + prefetch_all, pin_all, lookup_context, + &uncompression_dict_reader); if (!s.ok()) { return s; } - if (!rep_->compression_dict_handle.IsNull()) { - assert(compression_dict_block != nullptr); - // TODO(ajkr): find a way to avoid the `compression_dict_block` data copy - rep_->uncompression_dict.reset(new UncompressionDict( - compression_dict_block->data.ToString(), - rep_->blocks_definitely_zstd_compressed, rep_->ioptions.statistics)); - } + rep_->uncompression_dict_reader = std::move(uncompression_dict_reader); } assert(s.ok()); @@ -1609,8 +1568,8 @@ size_t BlockBasedTable::ApproximateMemoryUsage() const { if (rep_->index_reader) { usage += rep_->index_reader->ApproximateMemoryUsage(); } - if (rep_->uncompression_dict) { - usage += rep_->uncompression_dict->ApproximateMemoryUsage(); + if (rep_->uncompression_dict_reader) { + usage += rep_->uncompression_dict_reader->ApproximateMemoryUsage(); } return usage; } @@ -1757,9 +1716,6 @@ Status BlockBasedTable::GetDataBlockFromCache( Cache::Handle* cache_handle = nullptr; s = block_cache->Insert(block_cache_key, block_holder.get(), charge, &DeleteCachedEntry, &cache_handle); -#ifndef NDEBUG - block_cache->TEST_mark_as_data_block(block_cache_key, charge); -#endif // NDEBUG if (s.ok()) { assert(cache_handle != nullptr); block->SetCachedValue(block_holder.release(), block_cache, @@ -1863,9 +1819,6 @@ Status BlockBasedTable::PutDataBlockToCache( s = block_cache->Insert(block_cache_key, block_holder.get(), charge, &DeleteCachedEntry, &cache_handle, priority); -#ifndef NDEBUG - block_cache->TEST_mark_as_data_block(block_cache_key, charge); -#endif // NDEBUG if (s.ok()) { assert(cache_handle != nullptr); cached_block->SetCachedValue(block_holder.release(), block_cache, @@ -1914,86 +1867,6 @@ std::unique_ptr BlockBasedTable::CreateFilterBlockReader( } } -CachableEntry BlockBasedTable::GetUncompressionDict( - FilePrefetchBuffer* prefetch_buffer, bool no_io, GetContext* get_context, - BlockCacheLookupContext* lookup_context) const { - if (!rep_->table_options.cache_index_and_filter_blocks) { - // block cache is either disabled or not used for meta-blocks. In either - // case, BlockBasedTableReader is the owner of the uncompression dictionary. - return {rep_->uncompression_dict.get(), nullptr /* cache */, - nullptr /* cache_handle */, false /* own_value */}; - } - if (rep_->compression_dict_handle.IsNull()) { - return CachableEntry(); - } - char cache_key_buf[kMaxCacheKeyPrefixSize + kMaxVarint64Length]; - auto cache_key = - GetCacheKey(rep_->cache_key_prefix, rep_->cache_key_prefix_size, - rep_->compression_dict_handle, cache_key_buf); - auto cache_handle = - GetEntryFromCache(rep_->table_options.block_cache.get(), cache_key, - BlockType::kCompressionDictionary, get_context); - UncompressionDict* dict = nullptr; - bool is_cache_hit = false; - size_t usage = 0; - if (cache_handle != nullptr) { - dict = reinterpret_cast( - rep_->table_options.block_cache->Value(cache_handle)); - is_cache_hit = true; - usage = dict->ApproximateMemoryUsage(); - } else if (no_io) { - // Do not invoke any io. - } else { - std::unique_ptr compression_dict_block; - Status s = - ReadCompressionDictBlock(prefetch_buffer, &compression_dict_block); - if (s.ok()) { - assert(compression_dict_block != nullptr); - // TODO(ajkr): find a way to avoid the `compression_dict_block` data copy - std::unique_ptr uncompression_dict( - new UncompressionDict(compression_dict_block->data.ToString(), - rep_->blocks_definitely_zstd_compressed, - rep_->ioptions.statistics)); - usage = uncompression_dict->ApproximateMemoryUsage(); - s = rep_->table_options.block_cache->Insert( - cache_key, uncompression_dict.get(), usage, - &DeleteCachedUncompressionDictEntry, &cache_handle, - rep_->table_options.cache_index_and_filter_blocks_with_high_priority - ? Cache::Priority::HIGH - : Cache::Priority::LOW); - - if (s.ok()) { - UpdateCacheInsertionMetrics(BlockType::kCompressionDictionary, - get_context, usage); - dict = uncompression_dict.release(); - } else { - RecordTick(rep_->ioptions.statistics, BLOCK_CACHE_ADD_FAILURES); - assert(dict == nullptr); - assert(cache_handle == nullptr); - } - } - } - if (block_cache_tracer_ && block_cache_tracer_->is_tracing_enabled() && - lookup_context) { - // Avoid making copy of block_key and cf_name when constructing the access - // record. - BlockCacheTraceRecord access_record( - rep_->ioptions.env->NowMicros(), - /*block_key=*/"", TraceType::kBlockTraceUncompressionDictBlock, - /*block_size=*/usage, rep_->cf_id_for_tracing(), - /*cf_name=*/"", rep_->level_for_tracing(), - rep_->sst_number_for_tracing(), lookup_context->caller, is_cache_hit, - /*no_insert=*/no_io, lookup_context->get_id, - lookup_context->get_from_user_specified_snapshot, - /*referenced_key=*/""); - block_cache_tracer_->WriteBlockAccess(access_record, cache_key, - rep_->cf_name_for_tracing(), - lookup_context->referenced_key); - } - return {dict, cache_handle ? rep_->table_options.block_cache.get() : nullptr, - cache_handle, false /* own_value */}; -} - // disable_prefix_seek should be set to true when prefix_extractor found in SST // differs from the one in mutable_cf_options and index type is HashBasedIndex InternalIteratorBase* BlockBasedTable::NewIndexIterator( @@ -2028,13 +1901,17 @@ TBlockIter* BlockBasedTable::NewDataBlockIterator( return iter; } - const bool no_io = (ro.read_tier == kBlockCacheTier); - auto uncompression_dict_storage = - GetUncompressionDict(prefetch_buffer, no_io, get_context, lookup_context); - const UncompressionDict& uncompression_dict = - uncompression_dict_storage.GetValue() == nullptr - ? UncompressionDict::GetEmptyDict() - : *uncompression_dict_storage.GetValue(); + UncompressionDict uncompression_dict; + if (rep_->uncompression_dict_reader) { + const bool no_io = (ro.read_tier == kBlockCacheTier); + s = rep_->uncompression_dict_reader->GetOrReadUncompressionDictionary( + prefetch_buffer, no_io, get_context, lookup_context, + &uncompression_dict); + if (!s.ok()) { + iter->Invalidate(s); + return iter; + } + } CachableEntry block; s = RetrieveBlock(prefetch_buffer, ro, handle, uncompression_dict, &block, @@ -2268,7 +2145,9 @@ Status BlockBasedTable::MaybeReadBlockAndLoadToCache( if (block_entry->GetValue() == nullptr && !no_io && ro.fill_cache) { Statistics* statistics = rep_->ioptions.statistics; const bool maybe_compressed = - block_type != BlockType::kFilter && rep_->blocks_maybe_compressed; + block_type != BlockType::kFilter && + block_type != BlockType::kCompressionDictionary && + rep_->blocks_maybe_compressed; const bool do_uncompress = maybe_compressed && !block_cache_compressed; CompressionType raw_block_comp_type; BlockContents raw_block_contents; @@ -2321,6 +2200,9 @@ Status BlockBasedTable::MaybeReadBlockAndLoadToCache( case BlockType::kFilter: trace_block_type = TraceType::kBlockTraceFilterBlock; break; + case BlockType::kCompressionDictionary: + trace_block_type = TraceType::kBlockTraceUncompressionDictBlock; + break; case BlockType::kRangeDeletion: trace_block_type = TraceType::kBlockTraceRangeDeletionBlock; break; @@ -2568,7 +2450,9 @@ Status BlockBasedTable::RetrieveBlock( } const bool maybe_compressed = - block_type != BlockType::kFilter && rep_->blocks_maybe_compressed; + block_type != BlockType::kFilter && + block_type != BlockType::kCompressionDictionary && + rep_->blocks_maybe_compressed; const bool do_uncompress = maybe_compressed; std::unique_ptr block; @@ -3504,12 +3388,17 @@ void BlockBasedTable::MultiGet(const ReadOptions& read_options, { MultiGetRange data_block_range(sst_file_range, sst_file_range.begin(), sst_file_range.end()); - auto uncompression_dict_storage = GetUncompressionDict( - nullptr, no_io, sst_file_range.begin()->get_context, &lookup_context); - const UncompressionDict& uncompression_dict = - uncompression_dict_storage.GetValue() == nullptr - ? UncompressionDict::GetEmptyDict() - : *uncompression_dict_storage.GetValue(); + + UncompressionDict uncompression_dict; + Status uncompression_dict_status; + if (rep_->uncompression_dict_reader) { + uncompression_dict_status = + rep_->uncompression_dict_reader->GetOrReadUncompressionDictionary( + nullptr /* prefetch_buffer */, no_io, + sst_file_range.begin()->get_context, &lookup_context, + &uncompression_dict); + } + size_t total_len = 0; ReadOptions ro = read_options; ro.read_tier = kBlockCacheTier; @@ -3535,6 +3424,14 @@ void BlockBasedTable::MultiGet(const ReadOptions& read_options, sst_file_range.SkipKey(miter); continue; } + + if (!uncompression_dict_status.ok()) { + *(miter->s) = uncompression_dict_status; + data_block_range.SkipKey(miter); + sst_file_range.SkipKey(miter); + continue; + } + statuses.emplace_back(); results.emplace_back(); if (v.handle.offset() == offset) { @@ -4191,23 +4088,25 @@ Status BlockBasedTable::DumpTable(WritableFile* out_file) { } // Output compression dictionary - if (!rep_->compression_dict_handle.IsNull()) { - std::unique_ptr compression_dict_block; - s = ReadCompressionDictBlock(nullptr /* prefetch_buffer */, - &compression_dict_block); + if (rep_->uncompression_dict_reader) { + UncompressionDict uncompression_dict; + s = rep_->uncompression_dict_reader->GetOrReadUncompressionDictionary( + nullptr /* prefetch_buffer */, false /* no_io */, + nullptr /* get_context */, nullptr /* lookup_context */, + &uncompression_dict); if (!s.ok()) { return s; } - assert(compression_dict_block != nullptr); - auto compression_dict = compression_dict_block->data; + + const Slice& raw_dict = uncompression_dict.GetRawDict(); out_file->Append( "Compression Dictionary:\n" "--------------------------------------\n"); out_file->Append(" size (bytes): "); - out_file->Append(rocksdb::ToString(compression_dict.size())); + out_file->Append(rocksdb::ToString(raw_dict.size())); out_file->Append("\n\n"); out_file->Append(" HEX "); - out_file->Append(compression_dict.ToString(true).c_str()); + out_file->Append(raw_dict.ToString(true).c_str()); out_file->Append("\n\n"); } @@ -4233,29 +4132,6 @@ Status BlockBasedTable::DumpTable(WritableFile* out_file) { return s; } -void BlockBasedTable::Close() { - if (rep_->closed) { - return; - } - - // cleanup index, filter, and compression dictionary blocks - // to avoid accessing dangling pointers - if (!rep_->table_options.no_block_cache) { - if (!rep_->compression_dict_handle.IsNull()) { - // Get the compression dictionary block key - char cache_key[kMaxCacheKeyPrefixSize + kMaxVarint64Length]; - auto key = - GetCacheKey(rep_->cache_key_prefix, rep_->cache_key_prefix_size, - rep_->compression_dict_handle, cache_key); - - Cache* const cache = rep_->table_options.block_cache.get(); - cache->Erase(key); - } - } - - rep_->closed = true; -} - Status BlockBasedTable::DumpIndexBlock(WritableFile* out_file) { out_file->Append( "Index Details:\n" @@ -4431,15 +4307,4 @@ void BlockBasedTable::DumpKeyValue(const Slice& key, const Slice& value, out_file->Append("\n ------\n"); } -namespace { - -void DeleteCachedUncompressionDictEntry(const Slice& /*key*/, void* value) { - UncompressionDict* dict = reinterpret_cast(value); - RecordTick(dict->statistics(), BLOCK_CACHE_COMPRESSION_DICT_BYTES_EVICT, - dict->ApproximateMemoryUsage()); - delete dict; -} - -} // anonymous namespace - } // namespace rocksdb diff --git a/table/block_based/block_based_table_reader.h b/table/block_based/block_based_table_reader.h index 3a16e2995f..85346d75c7 100644 --- a/table/block_based/block_based_table_reader.h +++ b/table/block_based/block_based_table_reader.h @@ -29,6 +29,7 @@ #include "table/block_based/block_type.h" #include "table/block_based/cachable_entry.h" #include "table/block_based/filter_block.h" +#include "table/block_based/uncompression_dict_reader.h" #include "table/format.h" #include "table/get_context.h" #include "table/multiget_context.h" @@ -176,8 +177,6 @@ class BlockBasedTable : public TableReader { Status VerifyChecksum(TableReaderCaller caller) override; - void Close() override; - ~BlockBasedTable(); bool TEST_FilterBlockInCache() const; @@ -242,8 +241,11 @@ class BlockBasedTable : public TableReader { template friend class FilterBlockReaderCommon; + friend class PartitionIndexReader; + friend class UncompressionDictReader; + protected: Rep* rep_; explicit BlockBasedTable(Rep* rep, BlockCacheTracer* const block_cache_tracer) @@ -313,10 +315,6 @@ class BlockBasedTable : public TableReader { CachableEntry, MultiGetContext::MAX_BATCH_SIZE>* results, char* scratch, const UncompressionDict& uncompression_dict) const; - CachableEntry GetUncompressionDict( - FilePrefetchBuffer* prefetch_buffer, bool no_io, GetContext* get_context, - BlockCacheLookupContext* lookup_context) const; - // Get the iterator from the index reader. // // If input_iter is not set, return a new Iterator. @@ -416,9 +414,6 @@ class BlockBasedTable : public TableReader { InternalIterator* meta_iter, const InternalKeyComparator& internal_comparator, BlockCacheLookupContext* lookup_context); - Status ReadCompressionDictBlock( - FilePrefetchBuffer* prefetch_buffer, - std::unique_ptr* compression_dict_block) const; Status PrefetchIndexAndFilterBlocks( FilePrefetchBuffer* prefetch_buffer, InternalIterator* meta_iter, BlockBasedTable* new_table, bool prefetch_all, @@ -514,7 +509,7 @@ struct BlockBasedTable::Rep { std::unique_ptr index_reader; std::unique_ptr filter; - std::unique_ptr uncompression_dict; + std::unique_ptr uncompression_dict_reader; enum class FilterType { kNoFilter, @@ -566,7 +561,6 @@ struct BlockBasedTable::Rep { bool index_key_includes_seq = true; bool index_value_is_full = true; - bool closed = false; const bool immortal_table; SequenceNumber get_global_seqno(BlockType block_type) const { diff --git a/table/block_based/full_filter_block_test.cc b/table/block_based/full_filter_block_test.cc index e8fcce07d7..b87db6def9 100644 --- a/table/block_based/full_filter_block_test.cc +++ b/table/block_based/full_filter_block_test.cc @@ -44,10 +44,7 @@ class TestFilterBitsBuilder : public FilterBitsBuilder { class MockBlockBasedTable : public BlockBasedTable { public: explicit MockBlockBasedTable(Rep* rep) - : BlockBasedTable(rep, nullptr /* block_cache_tracer */) { - // Initialize what Open normally does as much as necessary for the test - rep->cache_key_prefix_size = 10; - } + : BlockBasedTable(rep, nullptr /* block_cache_tracer */) {} }; class TestFilterBitsReader : public FilterBitsReader { @@ -116,7 +113,6 @@ class PluginFullFilterBlockTest : public testing::Test { : ioptions_(options_), env_options_(options_), icomp_(options_.comparator) { - table_options_.no_block_cache = true; table_options_.filter_policy.reset(new TestHashFilter); constexpr bool skip_filters = false; @@ -210,7 +206,6 @@ class FullFilterBlockTest : public testing::Test { : ioptions_(options_), env_options_(options_), icomp_(options_.comparator) { - table_options_.no_block_cache = true; table_options_.filter_policy.reset(NewBloomFilterPolicy(10, false)); constexpr bool skip_filters = false; diff --git a/table/block_based/partitioned_filter_block.cc b/table/block_based/partitioned_filter_block.cc index ae57e85dca..158ed84abe 100644 --- a/table/block_based/partitioned_filter_block.cc +++ b/table/block_based/partitioned_filter_block.cc @@ -324,7 +324,7 @@ void PartitionedFilterBlockReader::CacheDependencies(bool pin) { prefetch_buffer.reset(new FilePrefetchBuffer()); s = prefetch_buffer->Prefetch(rep->file.get(), prefetch_off, - static_cast(prefetch_len)); + static_cast(prefetch_len)); // After prefetch, read the partitions one by one ReadOptions read_options; diff --git a/table/block_based/partitioned_filter_block_test.cc b/table/block_based/partitioned_filter_block_test.cc index 5e9e467723..aa667afedf 100644 --- a/table/block_based/partitioned_filter_block_test.cc +++ b/table/block_based/partitioned_filter_block_test.cc @@ -27,7 +27,6 @@ class MockedBlockBasedTable : public BlockBasedTable { MockedBlockBasedTable(Rep* rep, PartitionedIndexBuilder* pib) : BlockBasedTable(rep, /*block_cache_tracer=*/nullptr) { // Initialize what Open normally does as much as necessary for the test - rep->cache_key_prefix_size = 10; rep->index_key_includes_seq = pib->seperator_is_key_plus_seq(); rep->index_value_is_full = !pib->get_use_value_delta_encoding(); } @@ -67,9 +66,6 @@ class PartitionedFilterBlockTest env_options_(options_), icomp_(options_.comparator) { table_options_.filter_policy.reset(NewBloomFilterPolicy(10, false)); - table_options_.no_block_cache = true; // Otherwise BlockBasedTable::Close - // will access variable that are not - // initialized in our mocked version table_options_.format_version = GetParam(); table_options_.index_block_restart_interval = 3; } diff --git a/table/block_based/uncompression_dict_reader.cc b/table/block_based/uncompression_dict_reader.cc new file mode 100644 index 0000000000..d74dbf6c49 --- /dev/null +++ b/table/block_based/uncompression_dict_reader.cc @@ -0,0 +1,138 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// + +#include "table/block_based/uncompression_dict_reader.h" +#include "monitoring/perf_context_imp.h" +#include "table/block_based/block_based_table_reader.h" +#include "util/compression.h" + +namespace rocksdb { + +Status UncompressionDictReader::Create( + const BlockBasedTable* table, FilePrefetchBuffer* prefetch_buffer, + bool use_cache, bool prefetch, bool pin, + BlockCacheLookupContext* lookup_context, + std::unique_ptr* uncompression_dict_reader) { + assert(table); + assert(table->get_rep()); + assert(!pin || prefetch); + assert(uncompression_dict_reader); + + CachableEntry uncompression_dict_block; + if (prefetch || !use_cache) { + const Status s = ReadUncompressionDictionaryBlock( + table, prefetch_buffer, ReadOptions(), nullptr /* get_context */, + lookup_context, &uncompression_dict_block); + if (!s.ok()) { + return s; + } + + if (use_cache && !pin) { + uncompression_dict_block.Reset(); + } + } + + uncompression_dict_reader->reset( + new UncompressionDictReader(table, std::move(uncompression_dict_block))); + + return Status::OK(); +} + +Status UncompressionDictReader::ReadUncompressionDictionaryBlock( + const BlockBasedTable* table, FilePrefetchBuffer* prefetch_buffer, + const ReadOptions& read_options, GetContext* get_context, + BlockCacheLookupContext* lookup_context, + CachableEntry* uncompression_dict_block) { + // TODO: add perf counter for compression dictionary read time + + assert(table); + assert(uncompression_dict_block); + assert(uncompression_dict_block->IsEmpty()); + + const BlockBasedTable::Rep* const rep = table->get_rep(); + assert(rep); + assert(!rep->compression_dict_handle.IsNull()); + + const Status s = table->RetrieveBlock( + prefetch_buffer, read_options, rep->compression_dict_handle, + UncompressionDict::GetEmptyDict(), uncompression_dict_block, + BlockType::kCompressionDictionary, get_context, lookup_context); + + if (!s.ok()) { + ROCKS_LOG_WARN( + rep->ioptions.info_log, + "Encountered error while reading data from compression dictionary " + "block %s", + s.ToString().c_str()); + } + + return s; +} + +Status UncompressionDictReader::GetOrReadUncompressionDictionaryBlock( + FilePrefetchBuffer* prefetch_buffer, bool no_io, GetContext* get_context, + BlockCacheLookupContext* lookup_context, + CachableEntry* uncompression_dict_block) const { + assert(uncompression_dict_block); + + if (!uncompression_dict_block_.IsEmpty()) { + uncompression_dict_block->SetUnownedValue( + uncompression_dict_block_.GetValue()); + return Status::OK(); + } + + ReadOptions read_options; + if (no_io) { + read_options.read_tier = kBlockCacheTier; + } + + return ReadUncompressionDictionaryBlock(table_, prefetch_buffer, read_options, + get_context, lookup_context, + uncompression_dict_block); +} + +Status UncompressionDictReader::GetOrReadUncompressionDictionary( + FilePrefetchBuffer* prefetch_buffer, bool no_io, GetContext* get_context, + BlockCacheLookupContext* lookup_context, + UncompressionDict* uncompression_dict) const { + CachableEntry uncompression_dict_block; + const Status s = GetOrReadUncompressionDictionaryBlock( + prefetch_buffer, no_io, get_context, lookup_context, + &uncompression_dict_block); + + if (!s.ok()) { + return s; + } + + assert(uncompression_dict); + assert(table_); + assert(table_->get_rep()); + + UncompressionDict dict(uncompression_dict_block.GetValue()->data, + table_->get_rep()->blocks_definitely_zstd_compressed); + *uncompression_dict = std::move(dict); + uncompression_dict_block.TransferTo(uncompression_dict); + + return Status::OK(); +} + +size_t UncompressionDictReader::ApproximateMemoryUsage() const { + assert(!uncompression_dict_block_.GetOwnValue() || + uncompression_dict_block_.GetValue() != nullptr); + size_t usage = uncompression_dict_block_.GetOwnValue() + ? uncompression_dict_block_.GetValue()->ApproximateMemoryUsage() + : 0; + +#ifdef ROCKSDB_MALLOC_USABLE_SIZE + usage += malloc_usable_size(const_cast(this)); +#else + usage += sizeof(*this); +#endif // ROCKSDB_MALLOC_USABLE_SIZE + + return usage; +} + +} // namespace rocksdb diff --git a/table/block_based/uncompression_dict_reader.h b/table/block_based/uncompression_dict_reader.h new file mode 100644 index 0000000000..808149e96b --- /dev/null +++ b/table/block_based/uncompression_dict_reader.h @@ -0,0 +1,64 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// + +#pragma once + +#include +#include "table/block_based/cachable_entry.h" +#include "table/format.h" + +namespace rocksdb { + +class BlockBasedTable; +struct BlockCacheLookupContext; +class FilePrefetchBuffer; +class GetContext; +struct ReadOptions; +struct UncompressionDict; + +// Provides access to the uncompression dictionary regardless of whether +// it is owned by the reader or stored in the cache, or whether it is pinned +// in the cache or not. +class UncompressionDictReader { + public: + static Status Create( + const BlockBasedTable* table, FilePrefetchBuffer* prefetch_buffer, + bool use_cache, bool prefetch, bool pin, + BlockCacheLookupContext* lookup_context, + std::unique_ptr* uncompression_dict_reader); + + Status GetOrReadUncompressionDictionary( + FilePrefetchBuffer* prefetch_buffer, bool no_io, GetContext* get_context, + BlockCacheLookupContext* lookup_context, + UncompressionDict* uncompression_dict) const; + + size_t ApproximateMemoryUsage() const; + + private: + UncompressionDictReader( + const BlockBasedTable* t, + CachableEntry&& uncompression_dict_block) + : table_(t), + uncompression_dict_block_(std::move(uncompression_dict_block)) { + assert(table_); + } + + static Status ReadUncompressionDictionaryBlock( + const BlockBasedTable* table, FilePrefetchBuffer* prefetch_buffer, + const ReadOptions& read_options, GetContext* get_context, + BlockCacheLookupContext* lookup_context, + CachableEntry* uncompression_dict_block); + + Status GetOrReadUncompressionDictionaryBlock( + FilePrefetchBuffer* prefetch_buffer, bool no_io, GetContext* get_context, + BlockCacheLookupContext* lookup_context, + CachableEntry* uncompression_dict_block) const; + + const BlockBasedTable* table_; + CachableEntry uncompression_dict_block_; +}; + +} // namespace rocksdb diff --git a/table/table_reader.h b/table/table_reader.h index 72d11a7bd2..eb383c8fe8 100644 --- a/table/table_reader.h +++ b/table/table_reader.h @@ -124,8 +124,6 @@ class TableReader { virtual Status VerifyChecksum(TableReaderCaller /*caller*/) { return Status::NotSupported("VerifyChecksum() not supported"); } - - virtual void Close() {} }; } // namespace rocksdb diff --git a/table/table_test.cc b/table/table_test.cc index bb03431166..6cd26bc732 100644 --- a/table/table_test.cc +++ b/table/table_test.cc @@ -2889,176 +2889,6 @@ TEST_P(BlockBasedTableTest, BlockReadCountTest) { } } -// A wrapper around LRICache that also keeps track of data blocks (in contrast -// with the objects) in the cache. The class is very simple and can be used only -// for trivial tests. -class MockCache : public LRUCache { - public: - MockCache(size_t capacity, int num_shard_bits, bool strict_capacity_limit, - double high_pri_pool_ratio) - : LRUCache(capacity, num_shard_bits, strict_capacity_limit, - high_pri_pool_ratio) {} - Status Insert(const Slice& key, void* value, size_t charge, - void (*deleter)(const Slice& key, void* value), - Handle** handle = nullptr, - Priority priority = Priority::LOW) override { - // Replace the deleter with our own so that we keep track of data blocks - // erased from the cache - deleters_[key.ToString()] = deleter; - return ShardedCache::Insert(key, value, charge, &MockDeleter, handle, - priority); - } - // This is called by the application right after inserting a data block - void TEST_mark_as_data_block(const Slice& key, size_t charge) override { - marked_data_in_cache_[key.ToString()] = charge; - marked_size_ += charge; - } - using DeleterFunc = void (*)(const Slice& key, void* value); - static std::map deleters_; - static std::map marked_data_in_cache_; - static size_t marked_size_; - static void MockDeleter(const Slice& key, void* value) { - // If the item was marked for being data block, decrease its usage from the - // total data block usage of the cache - if (marked_data_in_cache_.find(key.ToString()) != - marked_data_in_cache_.end()) { - marked_size_ -= marked_data_in_cache_[key.ToString()]; - } - // Then call the origianl deleter - assert(deleters_.find(key.ToString()) != deleters_.end()); - auto deleter = deleters_[key.ToString()]; - deleter(key, value); - } -}; - -size_t MockCache::marked_size_ = 0; -std::map MockCache::deleters_; -std::map MockCache::marked_data_in_cache_; - -// Block cache can contain raw data blocks as well as general objects. If an -// object depends on the table to be live, it then must be destructed before the -// table is closed. This test makes sure that the only items remains in the -// cache after the table is closed are raw data blocks. -TEST_P(BlockBasedTableTest, NoObjectInCacheAfterTableClose) { - std::vector compression_types{kNoCompression}; - - // The following are the compression library versions supporting compression - // dictionaries. See the test case CacheCompressionDict in the - // DBBlockCacheTest suite. -#ifdef ZLIB - compression_types.push_back(kZlibCompression); -#endif // ZLIB -#if LZ4_VERSION_NUMBER >= 10400 - compression_types.push_back(kLZ4Compression); - compression_types.push_back(kLZ4HCCompression); -#endif // LZ4_VERSION_NUMBER >= 10400 -#if ZSTD_VERSION_NUMBER >= 500 - compression_types.push_back(kZSTD); -#endif // ZSTD_VERSION_NUMBER >= 500 - - for (int level: {-1, 0, 1, 10}) { - for (auto index_type : - {BlockBasedTableOptions::IndexType::kBinarySearch, - BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch}) { - for (bool block_based_filter : {true, false}) { - for (bool partition_filter : {true, false}) { - if (partition_filter && - (block_based_filter || - index_type != - BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch)) { - continue; - } - for (bool index_and_filter_in_cache : {true, false}) { - for (bool pin_l0 : {true, false}) { - for (bool pin_top_level : {true, false}) { - if (pin_l0 && !index_and_filter_in_cache) { - continue; - } - - for (auto compression_type : compression_types) { - for (uint32_t max_dict_bytes : {0, 1 << 14}) { - if (compression_type == kNoCompression && max_dict_bytes) - continue; - - // Create a table - Options opt; - std::unique_ptr ikc; - ikc.reset(new test::PlainInternalKeyComparator( - opt.comparator)); - opt.compression = compression_type; - opt.compression_opts.max_dict_bytes = max_dict_bytes; - BlockBasedTableOptions table_options = - GetBlockBasedTableOptions(); - table_options.block_size = 1024; - table_options.index_type = index_type; - table_options.pin_l0_filter_and_index_blocks_in_cache = - pin_l0; - table_options.pin_top_level_index_and_filter = - pin_top_level; - table_options.partition_filters = partition_filter; - table_options.cache_index_and_filter_blocks = - index_and_filter_in_cache; - // big enough so we don't ever lose cached values. - table_options.block_cache = std::make_shared( - 16 * 1024 * 1024, 4, false, 0.0); - table_options.filter_policy.reset( - rocksdb::NewBloomFilterPolicy(10, block_based_filter)); - opt.table_factory.reset(NewBlockBasedTableFactory( - table_options)); - - bool convert_to_internal_key = false; - TableConstructor c(BytewiseComparator(), - convert_to_internal_key, level); - std::string user_key = "k01"; - std::string key = - InternalKey(user_key, 0, kTypeValue).Encode().ToString(); - c.Add(key, "hello"); - std::vector keys; - stl_wrappers::KVMap kvmap; - const ImmutableCFOptions ioptions(opt); - const MutableCFOptions moptions(opt); - c.Finish(opt, ioptions, moptions, table_options, *ikc, - &keys, &kvmap); - - // Doing a read to make index/filter loaded into the cache - auto table_reader = - dynamic_cast(c.GetTableReader()); - PinnableSlice value; - GetContext get_context(opt.comparator, nullptr, nullptr, - nullptr, GetContext::kNotFound, user_key, &value, - nullptr, nullptr, nullptr, nullptr); - InternalKey ikey(user_key, 0, kTypeValue); - auto s = table_reader->Get(ReadOptions(), key, &get_context, - moptions.prefix_extractor.get()); - ASSERT_EQ(get_context.State(), GetContext::kFound); - ASSERT_STREQ(value.data(), "hello"); - - // Close the table - c.ResetTableReader(); - - auto usage = table_options.block_cache->GetUsage(); - auto pinned_usage = - table_options.block_cache->GetPinnedUsage(); - // The only usage must be for marked data blocks - ASSERT_EQ(usage, MockCache::marked_size_); - // There must be some pinned data since PinnableSlice has - // not released them yet - ASSERT_GT(pinned_usage, 0); - // Release pinnable slice reousrces - value.Reset(); - pinned_usage = table_options.block_cache->GetPinnedUsage(); - ASSERT_EQ(pinned_usage, 0); - } - } - } - } - } - } - } - } - } // level -} - TEST_P(BlockBasedTableTest, BlockCacheLeak) { // Check that when we reopen a table we don't lose access to blocks already // in the cache. This test checks whether the Table actually makes use of the diff --git a/util/compression.h b/util/compression.h index aa8af74499..5dbb6c244a 100644 --- a/util/compression.h +++ b/util/compression.h @@ -21,6 +21,7 @@ #include #include "memory/memory_allocator.h" +#include "rocksdb/cleanable.h" #include "rocksdb/options.h" #include "rocksdb/table.h" #include "util/coding.h" @@ -216,36 +217,60 @@ struct CompressionDict { // Holds dictionary and related data, like ZSTD's digested uncompression // dictionary. -struct UncompressionDict { -#ifdef ROCKSDB_ZSTD_DDICT - ZSTD_DDict* zstd_ddict_; -#endif // ROCKSDB_ZSTD_DDICT - // Block containing the data for the compression dictionary. It may be - // redundant with the data held in `zstd_ddict_`. +struct UncompressionDict : public Cleanable { + // Block containing the data for the compression dictionary. It is non-empty + // only if the constructor that takes a string parameter is used. std::string dict_; - // This `Statistics` pointer is intended to be used upon block cache eviction, - // so only needs to be populated on `UncompressionDict`s that'll be inserted - // into block cache. - Statistics* statistics_; + + // Slice pointing to the compression dictionary data. Points to + // dict_ if the string constructor is used. In the case of the Slice + // constructor, it is a copy of the Slice passed by the caller. + Slice slice_; #ifdef ROCKSDB_ZSTD_DDICT - UncompressionDict(std::string dict, bool using_zstd, - Statistics* _statistics = nullptr) { -#else // ROCKSDB_ZSTD_DDICT - UncompressionDict(std::string dict, bool /*using_zstd*/, - Statistics* _statistics = nullptr) { + // Processed version of the contents of slice_ for ZSTD compression. + ZSTD_DDict* zstd_ddict_ = nullptr; #endif // ROCKSDB_ZSTD_DDICT - dict_ = std::move(dict); - statistics_ = _statistics; + + // Slice constructor: it is the caller's responsibility to either + // a) make sure slice remains valid throughout the lifecycle of this object OR + // b) transfer the management of the underlying resource (e.g. cache handle) + // to this object, in which case UncompressionDict is self-contained, and the + // resource is guaranteed to be released (via the cleanup logic in Cleanable) + // when UncompressionDict is destroyed. #ifdef ROCKSDB_ZSTD_DDICT - zstd_ddict_ = nullptr; - if (!dict_.empty() && using_zstd) { - zstd_ddict_ = ZSTD_createDDict_byReference(dict_.data(), dict_.size()); + UncompressionDict(Slice slice, bool using_zstd) +#else // ROCKSDB_ZSTD_DDICT + UncompressionDict(Slice slice, bool /*using_zstd*/) +#endif // ROCKSDB_ZSTD_DDICT + : slice_(std::move(slice)) { +#ifdef ROCKSDB_ZSTD_DDICT + if (!slice_.empty() && using_zstd) { + zstd_ddict_ = ZSTD_createDDict_byReference(slice_.data(), slice_.size()); assert(zstd_ddict_ != nullptr); } #endif // ROCKSDB_ZSTD_DDICT } + // String constructor: results in a self-contained UncompressionDict. + UncompressionDict(std::string dict, bool using_zstd) + : UncompressionDict(Slice(dict), using_zstd) { + dict_ = std::move(dict); + } + + UncompressionDict(UncompressionDict&& rhs) + : dict_(std::move(rhs.dict_)), + slice_(std::move(rhs.slice_)) +#ifdef ROCKSDB_ZSTD_DDICT + , + zstd_ddict_(rhs.zstd_ddict_) +#endif + { +#ifdef ROCKSDB_ZSTD_DDICT + rhs.zstd_ddict_ = nullptr; +#endif + } + ~UncompressionDict() { #ifdef ROCKSDB_ZSTD_DDICT size_t res = 0; @@ -257,20 +282,34 @@ struct UncompressionDict { #endif // ROCKSDB_ZSTD_DDICT } + UncompressionDict& operator=(UncompressionDict&& rhs) { + if (this == &rhs) { + return *this; + } + + dict_ = std::move(rhs.dict_); + slice_ = std::move(rhs.slice_); + +#ifdef ROCKSDB_ZSTD_DDICT + zstd_ddict_ = rhs.zstd_ddict_; + rhs.zstd_ddict_ = nullptr; +#endif + + return *this; + } + + const Slice& GetRawDict() const { return slice_; } + #ifdef ROCKSDB_ZSTD_DDICT const ZSTD_DDict* GetDigestedZstdDDict() const { return zstd_ddict_; } #endif // ROCKSDB_ZSTD_DDICT - Slice GetRawDict() const { return dict_; } - static const UncompressionDict& GetEmptyDict() { static UncompressionDict empty_dict{}; return empty_dict; } - Statistics* statistics() const { return statistics_; } - - size_t ApproximateMemoryUsage() { + size_t ApproximateMemoryUsage() const { size_t usage = 0; usage += sizeof(struct UncompressionDict); #ifdef ROCKSDB_ZSTD_DDICT @@ -281,11 +320,9 @@ struct UncompressionDict { } UncompressionDict() = default; - // Disable copy/move + // Disable copy UncompressionDict(const CompressionDict&) = delete; UncompressionDict& operator=(const CompressionDict&) = delete; - UncompressionDict(CompressionDict&&) = delete; - UncompressionDict& operator=(CompressionDict&&) = delete; }; class CompressionContext { @@ -725,7 +762,7 @@ inline CacheAllocationPtr Zlib_Uncompress( return nullptr; } - Slice compression_dict = info.dict().GetRawDict(); + const Slice& compression_dict = info.dict().GetRawDict(); if (compression_dict.size()) { // Initialize the compression library's dictionary st = inflateSetDictionary( @@ -1040,7 +1077,7 @@ inline CacheAllocationPtr LZ4_Uncompress(const UncompressionInfo& info, auto output = AllocateBlock(output_len, allocator); #if LZ4_VERSION_NUMBER >= 10400 // r124+ LZ4_streamDecode_t* stream = LZ4_createStreamDecode(); - Slice compression_dict = info.dict().GetRawDict(); + const Slice& compression_dict = info.dict().GetRawDict(); if (compression_dict.size()) { LZ4_setStreamDecode(stream, compression_dict.data(), static_cast(compression_dict.size()));