From 674cf417325cfe78ac7ce6389f2685acce061e65 Mon Sep 17 00:00:00 2001 From: sdong Date: Thu, 12 Mar 2020 21:39:36 -0700 Subject: [PATCH] Divide block_based_table_reader.cc (#6527) Summary: block_based_table_reader.cc is a giant file, which makes it hard for users to navigate the code. Divide the files to multiple files. Some class templates cannot be moved to .cc file. They are moved to .h files. It is still better than including them all in block_based_table_reader.cc. Pull Request resolved: https://github.com/facebook/rocksdb/pull/6527 Test Plan: "make all check" and "make release". Also build using cmake. Differential Revision: D20428455 fbshipit-source-id: ca713c698469f07f35bc0c271358c0874ed4eb28 --- CMakeLists.txt | 5 + TARGETS | 5 + src.mk | 5 + .../block_based/binary_search_index_reader.cc | 73 + .../block_based/binary_search_index_reader.h | 48 + .../block_based/block_based_table_iterator.h | 657 +++++++++ table/block_based/block_based_table_reader.cc | 1196 +---------------- table/block_based/block_based_table_reader.h | 231 +--- .../block_based_table_reader_impl.h | 190 +++ table/block_based/hash_index_reader.cc | 146 ++ table/block_based/hash_index_reader.h | 49 + table/block_based/index_reader_common.cc | 54 + table/block_based/index_reader_common.h | 85 ++ table/block_based/partitioned_index_reader.cc | 175 +++ table/block_based/partitioned_index_reader.h | 51 + table/block_based/reader_common.cc | 47 + table/block_based/reader_common.h | 33 + 17 files changed, 1629 insertions(+), 1421 deletions(-) create mode 100644 table/block_based/binary_search_index_reader.cc create mode 100644 table/block_based/binary_search_index_reader.h create mode 100644 table/block_based/block_based_table_iterator.h create mode 100644 table/block_based/block_based_table_reader_impl.h create mode 100644 table/block_based/hash_index_reader.cc create mode 100644 table/block_based/hash_index_reader.h create mode 100644 table/block_based/index_reader_common.cc create mode 100644 table/block_based/index_reader_common.h create mode 100644 table/block_based/partitioned_index_reader.cc create mode 100644 table/block_based/partitioned_index_reader.h create mode 100644 table/block_based/reader_common.cc create mode 100644 table/block_based/reader_common.h diff --git a/CMakeLists.txt b/CMakeLists.txt index d7d0aed4f9..35ec24dcb7 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -620,6 +620,7 @@ set(SOURCES options/options_sanity_check.cc port/stack_trace.cc table/adaptive/adaptive_table_factory.cc + table/block_based/binary_search_index_reader.cc table/block_based/block.cc table/block_based/block_based_filter_block.cc table/block_based/block_based_table_builder.cc @@ -633,9 +634,13 @@ set(SOURCES table/block_based/filter_policy.cc table/block_based/flush_block_policy.cc table/block_based/full_filter_block.cc + table/block_based/hash_index_reader.cc table/block_based/index_builder.cc + table/block_based/index_reader_common.cc table/block_based/parsed_full_filter_block.cc table/block_based/partitioned_filter_block.cc + table/block_based/partitioned_index_reader.cc + table/block_based/reader_common.cc table/block_based/uncompression_dict_reader.cc table/block_fetcher.cc table/cuckoo/cuckoo_table_builder.cc diff --git a/TARGETS b/TARGETS index 60eec57cd2..9e5ea51cb5 100644 --- a/TARGETS +++ b/TARGETS @@ -231,6 +231,7 @@ cpp_library( "port/port_posix.cc", "port/stack_trace.cc", "table/adaptive/adaptive_table_factory.cc", + "table/block_based/binary_search_index_reader.cc", "table/block_based/block.cc", "table/block_based/block_based_filter_block.cc", "table/block_based/block_based_table_builder.cc", @@ -244,9 +245,13 @@ cpp_library( "table/block_based/filter_policy.cc", "table/block_based/flush_block_policy.cc", "table/block_based/full_filter_block.cc", + "table/block_based/hash_index_reader.cc", "table/block_based/index_builder.cc", + "table/block_based/index_reader_common.cc", "table/block_based/parsed_full_filter_block.cc", "table/block_based/partitioned_filter_block.cc", + "table/block_based/partitioned_index_reader.cc", + "table/block_based/reader_common.cc", "table/block_based/uncompression_dict_reader.cc", "table/block_fetcher.cc", "table/cuckoo/cuckoo_table_builder.cc", diff --git a/src.mk b/src.mk index 50431e6692..08b5609421 100644 --- a/src.mk +++ b/src.mk @@ -119,6 +119,7 @@ LIB_SOURCES = \ port/port_posix.cc \ port/stack_trace.cc \ table/adaptive/adaptive_table_factory.cc \ + table/block_based/binary_search_index_reader.cc \ table/block_based/block.cc \ table/block_based/block_based_filter_block.cc \ table/block_based/block_based_table_builder.cc \ @@ -132,9 +133,13 @@ LIB_SOURCES = \ table/block_based/filter_policy.cc \ table/block_based/flush_block_policy.cc \ table/block_based/full_filter_block.cc \ + table/block_based/hash_index_reader.cc \ table/block_based/index_builder.cc \ + table/block_based/index_reader_common.cc \ table/block_based/parsed_full_filter_block.cc \ table/block_based/partitioned_filter_block.cc \ + table/block_based/partitioned_index_reader.cc \ + table/block_based/reader_common.cc \ table/block_based/uncompression_dict_reader.cc \ table/block_fetcher.cc \ table/cuckoo/cuckoo_table_builder.cc \ diff --git a/table/block_based/binary_search_index_reader.cc b/table/block_based/binary_search_index_reader.cc new file mode 100644 index 0000000000..8c938c9245 --- /dev/null +++ b/table/block_based/binary_search_index_reader.cc @@ -0,0 +1,73 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. +#include "table/block_based/binary_search_index_reader.h" + +namespace ROCKSDB_NAMESPACE { +Status BinarySearchIndexReader::Create( + const BlockBasedTable* table, FilePrefetchBuffer* prefetch_buffer, + bool use_cache, bool prefetch, bool pin, + BlockCacheLookupContext* lookup_context, + std::unique_ptr* index_reader) { + assert(table != nullptr); + assert(table->get_rep()); + assert(!pin || prefetch); + assert(index_reader != nullptr); + + CachableEntry index_block; + if (prefetch || !use_cache) { + const Status s = + ReadIndexBlock(table, prefetch_buffer, ReadOptions(), use_cache, + /*get_context=*/nullptr, lookup_context, &index_block); + if (!s.ok()) { + return s; + } + + if (use_cache && !pin) { + index_block.Reset(); + } + } + + index_reader->reset( + new BinarySearchIndexReader(table, std::move(index_block))); + + return Status::OK(); +} + +InternalIteratorBase* BinarySearchIndexReader::NewIterator( + const ReadOptions& read_options, bool /* disable_prefix_seek */, + IndexBlockIter* iter, GetContext* get_context, + BlockCacheLookupContext* lookup_context) { + const BlockBasedTable::Rep* rep = table()->get_rep(); + const bool no_io = (read_options.read_tier == kBlockCacheTier); + CachableEntry index_block; + const Status s = + GetOrReadIndexBlock(no_io, get_context, lookup_context, &index_block); + if (!s.ok()) { + if (iter != nullptr) { + iter->Invalidate(s); + return iter; + } + + return NewErrorInternalIterator(s); + } + + Statistics* kNullStats = nullptr; + // We don't return pinned data from index blocks, so no need + // to set `block_contents_pinned`. + auto it = index_block.GetValue()->NewIndexIterator( + internal_comparator(), internal_comparator()->user_comparator(), + rep->get_global_seqno(BlockType::kIndex), iter, kNullStats, true, + index_has_first_key(), index_key_includes_seq(), index_value_is_full()); + + assert(it != nullptr); + index_block.TransferTo(it); + + return it; +} +} // namespace ROCKSDB_NAMESPACE diff --git a/table/block_based/binary_search_index_reader.h b/table/block_based/binary_search_index_reader.h new file mode 100644 index 0000000000..e8a05d51eb --- /dev/null +++ b/table/block_based/binary_search_index_reader.h @@ -0,0 +1,48 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. +#pragma once +#include "table/block_based/index_reader_common.h" + +namespace ROCKSDB_NAMESPACE { +// Index that allows binary search lookup for the first key of each block. +// This class can be viewed as a thin wrapper for `Block` class which already +// supports binary search. +class BinarySearchIndexReader : public BlockBasedTable::IndexReaderCommon { + public: + // Read index from the file and create an intance for + // `BinarySearchIndexReader`. + // On success, index_reader will be populated; otherwise it will remain + // unmodified. + static Status Create(const BlockBasedTable* table, + FilePrefetchBuffer* prefetch_buffer, bool use_cache, + bool prefetch, bool pin, + BlockCacheLookupContext* lookup_context, + std::unique_ptr* index_reader); + + InternalIteratorBase* NewIterator( + const ReadOptions& read_options, bool /* disable_prefix_seek */, + IndexBlockIter* iter, GetContext* get_context, + BlockCacheLookupContext* lookup_context) override; + + size_t ApproximateMemoryUsage() const override { + size_t usage = ApproximateIndexBlockMemoryUsage(); +#ifdef ROCKSDB_MALLOC_USABLE_SIZE + usage += malloc_usable_size(const_cast(this)); +#else + usage += sizeof(*this); +#endif // ROCKSDB_MALLOC_USABLE_SIZE + return usage; + } + + private: + BinarySearchIndexReader(const BlockBasedTable* t, + CachableEntry&& index_block) + : IndexReaderCommon(t, std::move(index_block)) {} +}; +} // namespace ROCKSDB_NAMESPACE diff --git a/table/block_based/block_based_table_iterator.h b/table/block_based/block_based_table_iterator.h new file mode 100644 index 0000000000..1eadc5fc82 --- /dev/null +++ b/table/block_based/block_based_table_iterator.h @@ -0,0 +1,657 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. +#pragma once +#include "table/block_based/block_based_table_reader.h" + +#include "table/block_based/block_based_table_reader_impl.h" +#include "table/block_based/reader_common.h" + +namespace ROCKSDB_NAMESPACE { +// Iterates over the contents of BlockBasedTable. +template +class BlockBasedTableIterator : public InternalIteratorBase { + // compaction_readahead_size: its value will only be used if for_compaction = + // true + public: + BlockBasedTableIterator(const BlockBasedTable* table, + const ReadOptions& read_options, + const InternalKeyComparator& icomp, + InternalIteratorBase* index_iter, + bool check_filter, bool need_upper_bound_check, + const SliceTransform* prefix_extractor, + BlockType block_type, TableReaderCaller caller, + size_t compaction_readahead_size = 0) + : table_(table), + read_options_(read_options), + icomp_(icomp), + user_comparator_(icomp.user_comparator()), + index_iter_(index_iter), + pinned_iters_mgr_(nullptr), + block_iter_points_to_real_block_(false), + check_filter_(check_filter), + need_upper_bound_check_(need_upper_bound_check), + prefix_extractor_(prefix_extractor), + block_type_(block_type), + lookup_context_(caller), + compaction_readahead_size_(compaction_readahead_size) {} + + ~BlockBasedTableIterator() { delete index_iter_; } + + void Seek(const Slice& target) override; + void SeekForPrev(const Slice& target) override; + void SeekToFirst() override; + void SeekToLast() override; + void Next() final override; + bool NextAndGetResult(IterateResult* result) override; + void Prev() override; + bool Valid() const override { + return !is_out_of_bound_ && + (is_at_first_key_from_index_ || + (block_iter_points_to_real_block_ && block_iter_.Valid())); + } + Slice key() const override { + assert(Valid()); + if (is_at_first_key_from_index_) { + return index_iter_->value().first_internal_key; + } else { + return block_iter_.key(); + } + } + Slice user_key() const override { + assert(Valid()); + if (is_at_first_key_from_index_) { + return ExtractUserKey(index_iter_->value().first_internal_key); + } else { + return block_iter_.user_key(); + } + } + TValue value() const override { + assert(Valid()); + + // Load current block if not loaded. + if (is_at_first_key_from_index_ && + !const_cast(this) + ->MaterializeCurrentBlock()) { + // Oops, index is not consistent with block contents, but we have + // no good way to report error at this point. Let's return empty value. + return TValue(); + } + + return block_iter_.value(); + } + Status status() const override { + // Prefix index set status to NotFound when the prefix does not exist + if (!index_iter_->status().ok() && !index_iter_->status().IsNotFound()) { + return index_iter_->status(); + } else if (block_iter_points_to_real_block_) { + return block_iter_.status(); + } else { + return Status::OK(); + } + } + + // Whether iterator invalidated for being out of bound. + bool IsOutOfBound() override { return is_out_of_bound_; } + + inline bool MayBeOutOfUpperBound() override { + assert(Valid()); + return !data_block_within_upper_bound_; + } + + void SetPinnedItersMgr(PinnedIteratorsManager* pinned_iters_mgr) override { + pinned_iters_mgr_ = pinned_iters_mgr; + } + bool IsKeyPinned() const override { + // Our key comes either from block_iter_'s current key + // or index_iter_'s current *value*. + return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() && + ((is_at_first_key_from_index_ && index_iter_->IsValuePinned()) || + (block_iter_points_to_real_block_ && block_iter_.IsKeyPinned())); + } + bool IsValuePinned() const override { + // Load current block if not loaded. + if (is_at_first_key_from_index_) { + const_cast(this)->MaterializeCurrentBlock(); + } + // BlockIter::IsValuePinned() is always true. No need to check + return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() && + block_iter_points_to_real_block_; + } + + void ResetDataIter() { + if (block_iter_points_to_real_block_) { + if (pinned_iters_mgr_ != nullptr && pinned_iters_mgr_->PinningEnabled()) { + block_iter_.DelegateCleanupsTo(pinned_iters_mgr_); + } + block_iter_.Invalidate(Status::OK()); + block_iter_points_to_real_block_ = false; + } + } + + void SavePrevIndexValue() { + if (block_iter_points_to_real_block_) { + // Reseek. If they end up with the same data block, we shouldn't re-fetch + // the same data block. + prev_block_offset_ = index_iter_->value().handle.offset(); + } + } + + private: + enum class IterDirection { + kForward, + kBackward, + }; + + const BlockBasedTable* table_; + const ReadOptions read_options_; + const InternalKeyComparator& icomp_; + UserComparatorWrapper user_comparator_; + InternalIteratorBase* index_iter_; + PinnedIteratorsManager* pinned_iters_mgr_; + TBlockIter block_iter_; + + // True if block_iter_ is initialized and points to the same block + // as index iterator. + bool block_iter_points_to_real_block_; + // See InternalIteratorBase::IsOutOfBound(). + bool is_out_of_bound_ = false; + // Whether current data block being fully within iterate upper bound. + bool data_block_within_upper_bound_ = false; + // True if we're standing at the first key of a block, and we haven't loaded + // that block yet. A call to value() will trigger loading the block. + bool is_at_first_key_from_index_ = false; + bool check_filter_; + // TODO(Zhongyi): pick a better name + bool need_upper_bound_check_; + const SliceTransform* prefix_extractor_; + BlockType block_type_; + uint64_t prev_block_offset_ = std::numeric_limits::max(); + BlockCacheLookupContext lookup_context_; + // Readahead size used in compaction, its value is used only if + // lookup_context_.caller = kCompaction. + size_t compaction_readahead_size_; + + size_t readahead_size_ = BlockBasedTable::kInitAutoReadaheadSize; + size_t readahead_limit_ = 0; + int64_t num_file_reads_ = 0; + std::unique_ptr prefetch_buffer_; + + // If `target` is null, seek to first. + void SeekImpl(const Slice* target); + + void InitDataBlock(); + bool MaterializeCurrentBlock(); + void FindKeyForward(); + void FindBlockForward(); + void FindKeyBackward(); + void CheckOutOfBound(); + + // Check if data block is fully within iterate_upper_bound. + // + // Note MyRocks may update iterate bounds between seek. To workaround it, + // we need to check and update data_block_within_upper_bound_ accordingly. + void CheckDataBlockWithinUpperBound(); + + bool CheckPrefixMayMatch(const Slice& ikey, IterDirection direction) { + if (need_upper_bound_check_ && direction == IterDirection::kBackward) { + // Upper bound check isn't sufficnet for backward direction to + // guarantee the same result as total order, so disable prefix + // check. + return true; + } + if (check_filter_ && + !table_->PrefixMayMatch(ikey, read_options_, prefix_extractor_, + need_upper_bound_check_, &lookup_context_)) { + // TODO remember the iterator is invalidated because of prefix + // match. This can avoid the upper level file iterator to falsely + // believe the position is the end of the SST file and move to + // the first key of the next file. + ResetDataIter(); + return false; + } + return true; + } +}; + +// Functions below cannot be moved to .cc file because the class is a template +// The template is in place so that block based table iterator can be served +// partitioned index too. However, the logic is kind of different between the +// two. So we may think of de-template them by having a separate iterator +// for partitioned index. + +template +void BlockBasedTableIterator::Seek(const Slice& target) { + SeekImpl(&target); +} + +template +void BlockBasedTableIterator::SeekToFirst() { + SeekImpl(nullptr); +} + +template +void BlockBasedTableIterator::SeekImpl( + const Slice* target) { + is_out_of_bound_ = false; + is_at_first_key_from_index_ = false; + if (target && !CheckPrefixMayMatch(*target, IterDirection::kForward)) { + ResetDataIter(); + return; + } + + bool need_seek_index = true; + if (block_iter_points_to_real_block_ && block_iter_.Valid()) { + // Reseek. + prev_block_offset_ = index_iter_->value().handle.offset(); + + if (target) { + // We can avoid an index seek if: + // 1. The new seek key is larger than the current key + // 2. The new seek key is within the upper bound of the block + // Since we don't necessarily know the internal key for either + // the current key or the upper bound, we check user keys and + // exclude the equality case. Considering internal keys can + // improve for the boundary cases, but it would complicate the + // code. + if (user_comparator_.Compare(ExtractUserKey(*target), + block_iter_.user_key()) > 0 && + user_comparator_.Compare(ExtractUserKey(*target), + index_iter_->user_key()) < 0) { + need_seek_index = false; + } + } + } + + if (need_seek_index) { + if (target) { + index_iter_->Seek(*target); + } else { + index_iter_->SeekToFirst(); + } + + if (!index_iter_->Valid()) { + ResetDataIter(); + return; + } + } + + IndexValue v = index_iter_->value(); + const bool same_block = block_iter_points_to_real_block_ && + v.handle.offset() == prev_block_offset_; + + // TODO(kolmike): Remove the != kBlockCacheTier condition. + if (!v.first_internal_key.empty() && !same_block && + (!target || icomp_.Compare(*target, v.first_internal_key) <= 0) && + read_options_.read_tier != kBlockCacheTier) { + // Index contains the first key of the block, and it's >= target. + // We can defer reading the block. + is_at_first_key_from_index_ = true; + // ResetDataIter() will invalidate block_iter_. Thus, there is no need to + // call CheckDataBlockWithinUpperBound() to check for iterate_upper_bound + // as that will be done later when the data block is actually read. + ResetDataIter(); + } else { + // Need to use the data block. + if (!same_block) { + InitDataBlock(); + } else { + // When the user does a reseek, the iterate_upper_bound might have + // changed. CheckDataBlockWithinUpperBound() needs to be called + // explicitly if the reseek ends up in the same data block. + // If the reseek ends up in a different block, InitDataBlock() will do + // the iterator upper bound check. + CheckDataBlockWithinUpperBound(); + } + + if (target) { + block_iter_.Seek(*target); + } else { + block_iter_.SeekToFirst(); + } + FindKeyForward(); + } + + CheckOutOfBound(); + + if (target) { + assert(!Valid() || ((block_type_ == BlockType::kIndex && + !table_->get_rep()->index_key_includes_seq) + ? (user_comparator_.Compare(ExtractUserKey(*target), + key()) <= 0) + : (icomp_.Compare(*target, key()) <= 0))); + } +} + +template +void BlockBasedTableIterator::SeekForPrev( + const Slice& target) { + is_out_of_bound_ = false; + is_at_first_key_from_index_ = false; + // For now totally disable prefix seek in auto prefix mode because we don't + // have logic + if (!CheckPrefixMayMatch(target, IterDirection::kBackward)) { + ResetDataIter(); + return; + } + + SavePrevIndexValue(); + + // Call Seek() rather than SeekForPrev() in the index block, because the + // target data block will likely to contain the position for `target`, the + // same as Seek(), rather than than before. + // For example, if we have three data blocks, each containing two keys: + // [2, 4] [6, 8] [10, 12] + // (the keys in the index block would be [4, 8, 12]) + // and the user calls SeekForPrev(7), we need to go to the second block, + // just like if they call Seek(7). + // The only case where the block is difference is when they seek to a position + // in the boundary. For example, if they SeekForPrev(5), we should go to the + // first block, rather than the second. However, we don't have the information + // to distinguish the two unless we read the second block. In this case, we'll + // end up with reading two blocks. + index_iter_->Seek(target); + + if (!index_iter_->Valid()) { + auto seek_status = index_iter_->status(); + // Check for IO error + if (!seek_status.IsNotFound() && !seek_status.ok()) { + ResetDataIter(); + return; + } + + // With prefix index, Seek() returns NotFound if the prefix doesn't exist + if (seek_status.IsNotFound()) { + // Any key less than the target is fine for prefix seek + ResetDataIter(); + return; + } else { + index_iter_->SeekToLast(); + } + // Check for IO error + if (!index_iter_->Valid()) { + ResetDataIter(); + return; + } + } + + InitDataBlock(); + + block_iter_.SeekForPrev(target); + + FindKeyBackward(); + CheckDataBlockWithinUpperBound(); + assert(!block_iter_.Valid() || + icomp_.Compare(target, block_iter_.key()) >= 0); +} + +template +void BlockBasedTableIterator::SeekToLast() { + is_out_of_bound_ = false; + is_at_first_key_from_index_ = false; + SavePrevIndexValue(); + index_iter_->SeekToLast(); + if (!index_iter_->Valid()) { + ResetDataIter(); + return; + } + InitDataBlock(); + block_iter_.SeekToLast(); + FindKeyBackward(); + CheckDataBlockWithinUpperBound(); +} + +template +void BlockBasedTableIterator::Next() { + if (is_at_first_key_from_index_ && !MaterializeCurrentBlock()) { + return; + } + assert(block_iter_points_to_real_block_); + block_iter_.Next(); + FindKeyForward(); + CheckOutOfBound(); +} + +template +bool BlockBasedTableIterator::NextAndGetResult( + IterateResult* result) { + Next(); + bool is_valid = Valid(); + if (is_valid) { + result->key = key(); + result->may_be_out_of_upper_bound = MayBeOutOfUpperBound(); + } + return is_valid; +} + +template +void BlockBasedTableIterator::Prev() { + if (is_at_first_key_from_index_) { + is_at_first_key_from_index_ = false; + + index_iter_->Prev(); + if (!index_iter_->Valid()) { + return; + } + + InitDataBlock(); + block_iter_.SeekToLast(); + } else { + assert(block_iter_points_to_real_block_); + block_iter_.Prev(); + } + + FindKeyBackward(); +} + +template +void BlockBasedTableIterator::InitDataBlock() { + BlockHandle data_block_handle = index_iter_->value().handle; + if (!block_iter_points_to_real_block_ || + data_block_handle.offset() != prev_block_offset_ || + // if previous attempt of reading the block missed cache, try again + block_iter_.status().IsIncomplete()) { + if (block_iter_points_to_real_block_) { + ResetDataIter(); + } + auto* rep = table_->get_rep(); + + // Prefetch additional data for range scans (iterators). Enabled only for + // user reads. + // Implicit auto readahead: + // Enabled after 2 sequential IOs when ReadOptions.readahead_size == 0. + // Explicit user requested readahead: + // Enabled from the very first IO when ReadOptions.readahead_size is set. + if (lookup_context_.caller != TableReaderCaller::kCompaction) { + if (read_options_.readahead_size == 0) { + // Implicit auto readahead + num_file_reads_++; + if (num_file_reads_ > + BlockBasedTable::kMinNumFileReadsToStartAutoReadahead) { + if (!rep->file->use_direct_io() && + (data_block_handle.offset() + + static_cast(block_size(data_block_handle)) > + readahead_limit_)) { + // Buffered I/O + // Discarding the return status of Prefetch calls intentionally, as + // we can fallback to reading from disk if Prefetch fails. + rep->file->Prefetch(data_block_handle.offset(), readahead_size_); + readahead_limit_ = static_cast(data_block_handle.offset() + + readahead_size_); + // Keep exponentially increasing readahead size until + // kMaxAutoReadaheadSize. + readahead_size_ = std::min(BlockBasedTable::kMaxAutoReadaheadSize, + readahead_size_ * 2); + } else if (rep->file->use_direct_io() && !prefetch_buffer_) { + // Direct I/O + // Let FilePrefetchBuffer take care of the readahead. + rep->CreateFilePrefetchBuffer( + BlockBasedTable::kInitAutoReadaheadSize, + BlockBasedTable::kMaxAutoReadaheadSize, &prefetch_buffer_); + } + } + } else if (!prefetch_buffer_) { + // Explicit user requested readahead + // The actual condition is: + // if (read_options_.readahead_size != 0 && !prefetch_buffer_) + rep->CreateFilePrefetchBuffer(read_options_.readahead_size, + read_options_.readahead_size, + &prefetch_buffer_); + } + } else if (!prefetch_buffer_) { + rep->CreateFilePrefetchBuffer(compaction_readahead_size_, + compaction_readahead_size_, + &prefetch_buffer_); + } + + Status s; + table_->NewDataBlockIterator( + read_options_, data_block_handle, &block_iter_, block_type_, + /*get_context=*/nullptr, &lookup_context_, s, prefetch_buffer_.get(), + /*for_compaction=*/lookup_context_.caller == + TableReaderCaller::kCompaction); + block_iter_points_to_real_block_ = true; + CheckDataBlockWithinUpperBound(); + } +} + +template +bool BlockBasedTableIterator::MaterializeCurrentBlock() { + assert(is_at_first_key_from_index_); + assert(!block_iter_points_to_real_block_); + assert(index_iter_->Valid()); + + is_at_first_key_from_index_ = false; + InitDataBlock(); + assert(block_iter_points_to_real_block_); + block_iter_.SeekToFirst(); + + if (!block_iter_.Valid() || + icomp_.Compare(block_iter_.key(), + index_iter_->value().first_internal_key) != 0) { + // Uh oh. + block_iter_.Invalidate(Status::Corruption( + "first key in index doesn't match first key in block")); + return false; + } + + return true; +} + +template +void BlockBasedTableIterator::FindKeyForward() { + // This method's code is kept short to make it likely to be inlined. + + assert(!is_out_of_bound_); + assert(block_iter_points_to_real_block_); + + if (!block_iter_.Valid()) { + // This is the only call site of FindBlockForward(), but it's extracted into + // a separate method to keep FindKeyForward() short and likely to be + // inlined. When transitioning to a different block, we call + // FindBlockForward(), which is much longer and is probably not inlined. + FindBlockForward(); + } else { + // This is the fast path that avoids a function call. + } +} + +template +void BlockBasedTableIterator::FindBlockForward() { + // TODO the while loop inherits from two-level-iterator. We don't know + // whether a block can be empty so it can be replaced by an "if". + do { + if (!block_iter_.status().ok()) { + return; + } + // Whether next data block is out of upper bound, if there is one. + const bool next_block_is_out_of_bound = + read_options_.iterate_upper_bound != nullptr && + block_iter_points_to_real_block_ && !data_block_within_upper_bound_; + assert(!next_block_is_out_of_bound || + user_comparator_.CompareWithoutTimestamp( + *read_options_.iterate_upper_bound, /*a_has_ts=*/false, + index_iter_->user_key(), /*b_has_ts=*/true) <= 0); + ResetDataIter(); + index_iter_->Next(); + if (next_block_is_out_of_bound) { + // The next block is out of bound. No need to read it. + TEST_SYNC_POINT_CALLBACK("BlockBasedTableIterator:out_of_bound", nullptr); + // We need to make sure this is not the last data block before setting + // is_out_of_bound_, since the index key for the last data block can be + // larger than smallest key of the next file on the same level. + if (index_iter_->Valid()) { + is_out_of_bound_ = true; + } + return; + } + + if (!index_iter_->Valid()) { + return; + } + + IndexValue v = index_iter_->value(); + + // TODO(kolmike): Remove the != kBlockCacheTier condition. + if (!v.first_internal_key.empty() && + read_options_.read_tier != kBlockCacheTier) { + // Index contains the first key of the block. Defer reading the block. + is_at_first_key_from_index_ = true; + return; + } + + InitDataBlock(); + block_iter_.SeekToFirst(); + } while (!block_iter_.Valid()); +} + +template +void BlockBasedTableIterator::FindKeyBackward() { + while (!block_iter_.Valid()) { + if (!block_iter_.status().ok()) { + return; + } + + ResetDataIter(); + index_iter_->Prev(); + + if (index_iter_->Valid()) { + InitDataBlock(); + block_iter_.SeekToLast(); + } else { + return; + } + } + + // We could have check lower bound here too, but we opt not to do it for + // code simplicity. +} + +template +void BlockBasedTableIterator::CheckOutOfBound() { + if (read_options_.iterate_upper_bound != nullptr && Valid()) { + is_out_of_bound_ = + user_comparator_.CompareWithoutTimestamp( + *read_options_.iterate_upper_bound, /*a_has_ts=*/false, user_key(), + /*b_has_ts=*/true) <= 0; + } +} + +template +void BlockBasedTableIterator::CheckDataBlockWithinUpperBound() { + if (read_options_.iterate_upper_bound != nullptr && + block_iter_points_to_real_block_) { + data_block_within_upper_bound_ = + (user_comparator_.CompareWithoutTimestamp( + *read_options_.iterate_upper_bound, /*a_has_ts=*/false, + index_iter_->user_key(), + /*b_has_ts=*/true) > 0); + } +} +} // namespace ROCKSDB_NAMESPACE diff --git a/table/block_based/block_based_table_reader.cc b/table/block_based/block_based_table_reader.cc index 6a0d8ef5f9..3c619d9ea2 100644 --- a/table/block_based/block_based_table_reader.cc +++ b/table/block_based/block_based_table_reader.cc @@ -31,13 +31,17 @@ #include "rocksdb/table.h" #include "rocksdb/table_properties.h" +#include "table/block_based/binary_search_index_reader.h" #include "table/block_based/block.h" #include "table/block_based/block_based_filter_block.h" #include "table/block_based/block_based_table_factory.h" +#include "table/block_based/block_based_table_iterator.h" #include "table/block_based/block_prefix_index.h" #include "table/block_based/filter_block.h" #include "table/block_based/full_filter_block.h" +#include "table/block_based/hash_index_reader.h" #include "table/block_based/partitioned_filter_block.h" +#include "table/block_based/partitioned_index_reader.h" #include "table/block_fetcher.h" #include "table/format.h" #include "table/get_context.h" @@ -175,20 +179,6 @@ Status ReadBlockFromFile( return s; } -inline MemoryAllocator* GetMemoryAllocator( - const BlockBasedTableOptions& table_options) { - return table_options.block_cache.get() - ? table_options.block_cache->memory_allocator() - : nullptr; -} - -inline MemoryAllocator* GetMemoryAllocatorForCompressedBlock( - const BlockBasedTableOptions& table_options) { - return table_options.block_cache_compressed.get() - ? table_options.block_cache_compressed->memory_allocator() - : nullptr; -} - // Delete the entry resided in the cache. template void DeleteCachedEntry(const Slice& /*key*/, void* value) { @@ -196,13 +186,6 @@ void DeleteCachedEntry(const Slice& /*key*/, void* value) { delete entry; } -// Release the cached entry and decrement its ref count. -void ForceReleaseCachedEntry(void* arg, void* h) { - Cache* cache = reinterpret_cast(arg); - Cache::Handle* handle = reinterpret_cast(h); - cache->Release(handle, true /* force_erase */); -} - // Release the cached entry and decrement its ref count. // Do not force erase void ReleaseCachedEntry(void* arg, void* h) { @@ -239,555 +222,8 @@ CacheAllocationPtr CopyBufferToHeap(MemoryAllocator* allocator, Slice& buf) { memcpy(heap_buf.get(), buf.data(), buf.size()); return heap_buf; } - } // namespace -// Encapsulates common functionality for the various index reader -// implementations. Provides access to the index block regardless of whether -// it is owned by the reader or stored in the cache, or whether it is pinned -// in the cache or not. -class BlockBasedTable::IndexReaderCommon : public BlockBasedTable::IndexReader { - public: - IndexReaderCommon(const BlockBasedTable* t, - CachableEntry&& index_block) - : table_(t), index_block_(std::move(index_block)) { - assert(table_ != nullptr); - } - - protected: - static Status ReadIndexBlock(const BlockBasedTable* table, - FilePrefetchBuffer* prefetch_buffer, - const ReadOptions& read_options, bool use_cache, - GetContext* get_context, - BlockCacheLookupContext* lookup_context, - CachableEntry* index_block); - - const BlockBasedTable* table() const { return table_; } - - const InternalKeyComparator* internal_comparator() const { - assert(table_ != nullptr); - assert(table_->get_rep() != nullptr); - - return &table_->get_rep()->internal_comparator; - } - - bool index_has_first_key() const { - assert(table_ != nullptr); - assert(table_->get_rep() != nullptr); - return table_->get_rep()->index_has_first_key; - } - - bool index_key_includes_seq() const { - assert(table_ != nullptr); - assert(table_->get_rep() != nullptr); - return table_->get_rep()->index_key_includes_seq; - } - - bool index_value_is_full() const { - assert(table_ != nullptr); - assert(table_->get_rep() != nullptr); - return table_->get_rep()->index_value_is_full; - } - - bool cache_index_blocks() const { - assert(table_ != nullptr); - assert(table_->get_rep() != nullptr); - return table_->get_rep()->table_options.cache_index_and_filter_blocks; - } - - Status GetOrReadIndexBlock(bool no_io, GetContext* get_context, - BlockCacheLookupContext* lookup_context, - CachableEntry* index_block) const; - - size_t ApproximateIndexBlockMemoryUsage() const { - assert(!index_block_.GetOwnValue() || index_block_.GetValue() != nullptr); - return index_block_.GetOwnValue() - ? index_block_.GetValue()->ApproximateMemoryUsage() - : 0; - } - - private: - const BlockBasedTable* table_; - CachableEntry index_block_; -}; - -Status BlockBasedTable::IndexReaderCommon::ReadIndexBlock( - const BlockBasedTable* table, FilePrefetchBuffer* prefetch_buffer, - const ReadOptions& read_options, bool use_cache, GetContext* get_context, - BlockCacheLookupContext* lookup_context, - CachableEntry* index_block) { - PERF_TIMER_GUARD(read_index_block_nanos); - - assert(table != nullptr); - assert(index_block != nullptr); - assert(index_block->IsEmpty()); - - const Rep* const rep = table->get_rep(); - assert(rep != nullptr); - - const Status s = table->RetrieveBlock( - prefetch_buffer, read_options, rep->footer.index_handle(), - UncompressionDict::GetEmptyDict(), index_block, BlockType::kIndex, - get_context, lookup_context, /* for_compaction */ false, use_cache); - - return s; -} - -Status BlockBasedTable::IndexReaderCommon::GetOrReadIndexBlock( - bool no_io, GetContext* get_context, - BlockCacheLookupContext* lookup_context, - CachableEntry* index_block) const { - assert(index_block != nullptr); - - if (!index_block_.IsEmpty()) { - index_block->SetUnownedValue(index_block_.GetValue()); - return Status::OK(); - } - - ReadOptions read_options; - if (no_io) { - read_options.read_tier = kBlockCacheTier; - } - - return ReadIndexBlock(table_, /*prefetch_buffer=*/nullptr, read_options, - cache_index_blocks(), get_context, lookup_context, - index_block); -} - -// Index that allows binary search lookup in a two-level index structure. -class PartitionIndexReader : public BlockBasedTable::IndexReaderCommon { - public: - // Read the partition index from the file and create an instance for - // `PartitionIndexReader`. - // On success, index_reader will be populated; otherwise it will remain - // unmodified. - static Status Create(const BlockBasedTable* table, - FilePrefetchBuffer* prefetch_buffer, bool use_cache, - bool prefetch, bool pin, - BlockCacheLookupContext* lookup_context, - std::unique_ptr* index_reader) { - assert(table != nullptr); - assert(table->get_rep()); - assert(!pin || prefetch); - assert(index_reader != nullptr); - - CachableEntry index_block; - if (prefetch || !use_cache) { - const Status s = - ReadIndexBlock(table, prefetch_buffer, ReadOptions(), use_cache, - /*get_context=*/nullptr, lookup_context, &index_block); - if (!s.ok()) { - return s; - } - - if (use_cache && !pin) { - index_block.Reset(); - } - } - - index_reader->reset( - new PartitionIndexReader(table, std::move(index_block))); - - return Status::OK(); - } - - // return a two-level iterator: first level is on the partition index - InternalIteratorBase* NewIterator( - const ReadOptions& read_options, bool /* disable_prefix_seek */, - IndexBlockIter* iter, GetContext* get_context, - BlockCacheLookupContext* lookup_context) override { - const bool no_io = (read_options.read_tier == kBlockCacheTier); - CachableEntry index_block; - const Status s = - GetOrReadIndexBlock(no_io, get_context, lookup_context, &index_block); - if (!s.ok()) { - if (iter != nullptr) { - iter->Invalidate(s); - return iter; - } - - return NewErrorInternalIterator(s); - } - - const BlockBasedTable::Rep* rep = table()->rep_; - InternalIteratorBase* it = nullptr; - - Statistics* kNullStats = nullptr; - // Filters are already checked before seeking the index - if (!partition_map_.empty()) { - // We don't return pinned data from index blocks, so no need - // to set `block_contents_pinned`. - it = NewTwoLevelIterator( - new BlockBasedTable::PartitionedIndexIteratorState(table(), - &partition_map_), - index_block.GetValue()->NewIndexIterator( - internal_comparator(), internal_comparator()->user_comparator(), - rep->get_global_seqno(BlockType::kIndex), nullptr, kNullStats, - true, index_has_first_key(), index_key_includes_seq(), - index_value_is_full())); - } else { - ReadOptions ro; - ro.fill_cache = read_options.fill_cache; - // We don't return pinned data from index blocks, so no need - // to set `block_contents_pinned`. - it = new BlockBasedTableIterator( - table(), ro, *internal_comparator(), - index_block.GetValue()->NewIndexIterator( - internal_comparator(), internal_comparator()->user_comparator(), - rep->get_global_seqno(BlockType::kIndex), nullptr, kNullStats, - true, index_has_first_key(), index_key_includes_seq(), - index_value_is_full()), - false, true, /* prefix_extractor */ nullptr, BlockType::kIndex, - lookup_context ? lookup_context->caller - : TableReaderCaller::kUncategorized); - } - - assert(it != nullptr); - index_block.TransferTo(it); - - return it; - - // TODO(myabandeh): Update TwoLevelIterator to be able to make use of - // on-stack BlockIter while the state is on heap. Currentlly it assumes - // the first level iter is always on heap and will attempt to delete it - // in its destructor. - } - - void CacheDependencies(bool pin) override { - // Before read partitions, prefetch them to avoid lots of IOs - BlockCacheLookupContext lookup_context{TableReaderCaller::kPrefetch}; - const BlockBasedTable::Rep* rep = table()->rep_; - IndexBlockIter biter; - BlockHandle handle; - Statistics* kNullStats = nullptr; - - CachableEntry index_block; - Status s = GetOrReadIndexBlock(false /* no_io */, nullptr /* get_context */, - &lookup_context, &index_block); - if (!s.ok()) { - ROCKS_LOG_WARN(rep->ioptions.info_log, - "Error retrieving top-level index block while trying to " - "cache index partitions: %s", - s.ToString().c_str()); - return; - } - - // We don't return pinned data from index blocks, so no need - // to set `block_contents_pinned`. - index_block.GetValue()->NewIndexIterator( - internal_comparator(), internal_comparator()->user_comparator(), - rep->get_global_seqno(BlockType::kIndex), &biter, kNullStats, true, - index_has_first_key(), index_key_includes_seq(), index_value_is_full()); - // Index partitions are assumed to be consecuitive. Prefetch them all. - // Read the first block offset - biter.SeekToFirst(); - if (!biter.Valid()) { - // Empty index. - return; - } - handle = biter.value().handle; - uint64_t prefetch_off = handle.offset(); - - // Read the last block's offset - biter.SeekToLast(); - if (!biter.Valid()) { - // Empty index. - return; - } - handle = biter.value().handle; - uint64_t last_off = handle.offset() + block_size(handle); - uint64_t prefetch_len = last_off - prefetch_off; - std::unique_ptr prefetch_buffer; - rep->CreateFilePrefetchBuffer(0, 0, &prefetch_buffer); - s = prefetch_buffer->Prefetch(rep->file.get(), prefetch_off, - static_cast(prefetch_len)); - - // After prefetch, read the partitions one by one - biter.SeekToFirst(); - auto ro = ReadOptions(); - for (; biter.Valid(); biter.Next()) { - handle = biter.value().handle; - CachableEntry block; - // TODO: Support counter batch update for partitioned index and - // filter blocks - s = table()->MaybeReadBlockAndLoadToCache( - prefetch_buffer.get(), ro, handle, UncompressionDict::GetEmptyDict(), - &block, BlockType::kIndex, /*get_context=*/nullptr, &lookup_context, - /*contents=*/nullptr); - - assert(s.ok() || block.GetValue() == nullptr); - if (s.ok() && block.GetValue() != nullptr) { - if (block.IsCached()) { - if (pin) { - partition_map_[handle.offset()] = std::move(block); - } - } - } - } - } - - size_t ApproximateMemoryUsage() const override { - size_t usage = ApproximateIndexBlockMemoryUsage(); -#ifdef ROCKSDB_MALLOC_USABLE_SIZE - usage += malloc_usable_size(const_cast(this)); -#else - usage += sizeof(*this); -#endif // ROCKSDB_MALLOC_USABLE_SIZE - // TODO(myabandeh): more accurate estimate of partition_map_ mem usage - return usage; - } - - private: - PartitionIndexReader(const BlockBasedTable* t, - CachableEntry&& index_block) - : IndexReaderCommon(t, std::move(index_block)) {} - - std::unordered_map> partition_map_; -}; - -// Index that allows binary search lookup for the first key of each block. -// This class can be viewed as a thin wrapper for `Block` class which already -// supports binary search. -class BinarySearchIndexReader : public BlockBasedTable::IndexReaderCommon { - public: - // Read index from the file and create an intance for - // `BinarySearchIndexReader`. - // On success, index_reader will be populated; otherwise it will remain - // unmodified. - static Status Create(const BlockBasedTable* table, - FilePrefetchBuffer* prefetch_buffer, bool use_cache, - bool prefetch, bool pin, - BlockCacheLookupContext* lookup_context, - std::unique_ptr* index_reader) { - assert(table != nullptr); - assert(table->get_rep()); - assert(!pin || prefetch); - assert(index_reader != nullptr); - - CachableEntry index_block; - if (prefetch || !use_cache) { - const Status s = - ReadIndexBlock(table, prefetch_buffer, ReadOptions(), use_cache, - /*get_context=*/nullptr, lookup_context, &index_block); - if (!s.ok()) { - return s; - } - - if (use_cache && !pin) { - index_block.Reset(); - } - } - - index_reader->reset( - new BinarySearchIndexReader(table, std::move(index_block))); - - return Status::OK(); - } - - InternalIteratorBase* NewIterator( - const ReadOptions& read_options, bool /* disable_prefix_seek */, - IndexBlockIter* iter, GetContext* get_context, - BlockCacheLookupContext* lookup_context) override { - const BlockBasedTable::Rep* rep = table()->get_rep(); - const bool no_io = (read_options.read_tier == kBlockCacheTier); - CachableEntry index_block; - const Status s = - GetOrReadIndexBlock(no_io, get_context, lookup_context, &index_block); - if (!s.ok()) { - if (iter != nullptr) { - iter->Invalidate(s); - return iter; - } - - return NewErrorInternalIterator(s); - } - - Statistics* kNullStats = nullptr; - // We don't return pinned data from index blocks, so no need - // to set `block_contents_pinned`. - auto it = index_block.GetValue()->NewIndexIterator( - internal_comparator(), internal_comparator()->user_comparator(), - rep->get_global_seqno(BlockType::kIndex), iter, kNullStats, true, - index_has_first_key(), index_key_includes_seq(), index_value_is_full()); - - assert(it != nullptr); - index_block.TransferTo(it); - - return it; - } - - size_t ApproximateMemoryUsage() const override { - size_t usage = ApproximateIndexBlockMemoryUsage(); -#ifdef ROCKSDB_MALLOC_USABLE_SIZE - usage += malloc_usable_size(const_cast(this)); -#else - usage += sizeof(*this); -#endif // ROCKSDB_MALLOC_USABLE_SIZE - return usage; - } - - private: - BinarySearchIndexReader(const BlockBasedTable* t, - CachableEntry&& index_block) - : IndexReaderCommon(t, std::move(index_block)) {} -}; - -// Index that leverages an internal hash table to quicken the lookup for a given -// key. -class HashIndexReader : public BlockBasedTable::IndexReaderCommon { - public: - static Status Create(const BlockBasedTable* table, - FilePrefetchBuffer* prefetch_buffer, - InternalIterator* meta_index_iter, bool use_cache, - bool prefetch, bool pin, - BlockCacheLookupContext* lookup_context, - std::unique_ptr* index_reader) { - assert(table != nullptr); - assert(index_reader != nullptr); - assert(!pin || prefetch); - - const BlockBasedTable::Rep* rep = table->get_rep(); - assert(rep != nullptr); - - CachableEntry index_block; - if (prefetch || !use_cache) { - const Status s = - ReadIndexBlock(table, prefetch_buffer, ReadOptions(), use_cache, - /*get_context=*/nullptr, lookup_context, &index_block); - if (!s.ok()) { - return s; - } - - if (use_cache && !pin) { - index_block.Reset(); - } - } - - // Note, failure to create prefix hash index does not need to be a - // hard error. We can still fall back to the original binary search index. - // So, Create will succeed regardless, from this point on. - - index_reader->reset(new HashIndexReader(table, std::move(index_block))); - - // Get prefixes block - BlockHandle prefixes_handle; - Status s = FindMetaBlock(meta_index_iter, kHashIndexPrefixesBlock, - &prefixes_handle); - if (!s.ok()) { - // TODO: log error - return Status::OK(); - } - - // Get index metadata block - BlockHandle prefixes_meta_handle; - s = FindMetaBlock(meta_index_iter, kHashIndexPrefixesMetadataBlock, - &prefixes_meta_handle); - if (!s.ok()) { - // TODO: log error - return Status::OK(); - } - - RandomAccessFileReader* const file = rep->file.get(); - const Footer& footer = rep->footer; - const ImmutableCFOptions& ioptions = rep->ioptions; - const PersistentCacheOptions& cache_options = rep->persistent_cache_options; - MemoryAllocator* const memory_allocator = - GetMemoryAllocator(rep->table_options); - - // Read contents for the blocks - BlockContents prefixes_contents; - BlockFetcher prefixes_block_fetcher( - file, prefetch_buffer, footer, ReadOptions(), prefixes_handle, - &prefixes_contents, ioptions, true /*decompress*/, - true /*maybe_compressed*/, BlockType::kHashIndexPrefixes, - UncompressionDict::GetEmptyDict(), cache_options, memory_allocator); - s = prefixes_block_fetcher.ReadBlockContents(); - if (!s.ok()) { - return s; - } - BlockContents prefixes_meta_contents; - BlockFetcher prefixes_meta_block_fetcher( - file, prefetch_buffer, footer, ReadOptions(), prefixes_meta_handle, - &prefixes_meta_contents, ioptions, true /*decompress*/, - true /*maybe_compressed*/, BlockType::kHashIndexMetadata, - UncompressionDict::GetEmptyDict(), cache_options, memory_allocator); - s = prefixes_meta_block_fetcher.ReadBlockContents(); - if (!s.ok()) { - // TODO: log error - return Status::OK(); - } - - BlockPrefixIndex* prefix_index = nullptr; - assert(rep->internal_prefix_transform.get() != nullptr); - s = BlockPrefixIndex::Create(rep->internal_prefix_transform.get(), - prefixes_contents.data, - prefixes_meta_contents.data, &prefix_index); - // TODO: log error - if (s.ok()) { - HashIndexReader* const hash_index_reader = - static_cast(index_reader->get()); - hash_index_reader->prefix_index_.reset(prefix_index); - } - - return Status::OK(); - } - - InternalIteratorBase* NewIterator( - const ReadOptions& read_options, bool disable_prefix_seek, - IndexBlockIter* iter, GetContext* get_context, - BlockCacheLookupContext* lookup_context) override { - const BlockBasedTable::Rep* rep = table()->get_rep(); - const bool no_io = (read_options.read_tier == kBlockCacheTier); - CachableEntry index_block; - const Status s = - GetOrReadIndexBlock(no_io, get_context, lookup_context, &index_block); - if (!s.ok()) { - if (iter != nullptr) { - iter->Invalidate(s); - return iter; - } - - return NewErrorInternalIterator(s); - } - - Statistics* kNullStats = nullptr; - const bool total_order_seek = - read_options.total_order_seek || disable_prefix_seek; - // We don't return pinned data from index blocks, so no need - // to set `block_contents_pinned`. - auto it = index_block.GetValue()->NewIndexIterator( - internal_comparator(), internal_comparator()->user_comparator(), - rep->get_global_seqno(BlockType::kIndex), iter, kNullStats, - total_order_seek, index_has_first_key(), index_key_includes_seq(), - index_value_is_full(), false /* block_contents_pinned */, - prefix_index_.get()); - - assert(it != nullptr); - index_block.TransferTo(it); - - return it; - } - - size_t ApproximateMemoryUsage() const override { - size_t usage = ApproximateIndexBlockMemoryUsage(); -#ifdef ROCKSDB_MALLOC_USABLE_SIZE - usage += malloc_usable_size(const_cast(this)); -#else - if (prefix_index_) { - usage += prefix_index_->ApproximateMemoryUsage(); - } - usage += sizeof(*this); -#endif // ROCKSDB_MALLOC_USABLE_SIZE - return usage; - } - - private: - HashIndexReader(const BlockBasedTable* t, CachableEntry&& index_block) - : IndexReaderCommon(t, std::move(index_block)) {} - - std::unique_ptr prefix_index_; -}; - void BlockBasedTable::UpdateCacheHitMetrics(BlockType block_type, GetContext* get_context, size_t usage) const { @@ -1295,32 +731,6 @@ Status BlockBasedTable::PrefetchTail( return s; } -Status VerifyChecksum(const ChecksumType type, const char* buf, size_t len, - uint32_t expected) { - Status s; - uint32_t actual = 0; - switch (type) { - case kNoChecksum: - break; - case kCRC32c: - expected = crc32c::Unmask(expected); - actual = crc32c::Value(buf, len); - break; - case kxxHash: - actual = XXH32(buf, static_cast(len), 0); - break; - case kxxHash64: - actual = static_cast(XXH64(buf, static_cast(len), 0) & - uint64_t{0xffffffff}); - break; - default: - s = Status::Corruption("unknown checksum type"); - } - if (s.ok() && actual != expected) { - s = Status::Corruption("properties block checksum mismatched"); - } - return s; -} Status BlockBasedTable::TryReadPropertiesWithGlobalSeqno( FilePrefetchBuffer* prefetch_buffer, const Slice& handle_value, @@ -1931,106 +1341,6 @@ InternalIteratorBase* BlockBasedTable::NewIndexIterator( lookup_context); } -// Convert an index iterator value (i.e., an encoded BlockHandle) -// into an iterator over the contents of the corresponding block. -// If input_iter is null, new a iterator -// If input_iter is not null, update this iter and return it -template -TBlockIter* BlockBasedTable::NewDataBlockIterator( - const ReadOptions& ro, const BlockHandle& handle, TBlockIter* input_iter, - BlockType block_type, GetContext* get_context, - BlockCacheLookupContext* lookup_context, Status s, - FilePrefetchBuffer* prefetch_buffer, bool for_compaction) const { - PERF_TIMER_GUARD(new_table_block_iter_nanos); - - TBlockIter* iter = input_iter != nullptr ? input_iter : new TBlockIter; - if (!s.ok()) { - iter->Invalidate(s); - return iter; - } - - CachableEntry uncompression_dict; - if (rep_->uncompression_dict_reader) { - const bool no_io = (ro.read_tier == kBlockCacheTier); - s = rep_->uncompression_dict_reader->GetOrReadUncompressionDictionary( - prefetch_buffer, no_io, get_context, lookup_context, - &uncompression_dict); - if (!s.ok()) { - iter->Invalidate(s); - return iter; - } - } - - const UncompressionDict& dict = uncompression_dict.GetValue() - ? *uncompression_dict.GetValue() - : UncompressionDict::GetEmptyDict(); - - CachableEntry block; - s = RetrieveBlock(prefetch_buffer, ro, handle, dict, &block, block_type, - get_context, lookup_context, for_compaction, - /* use_cache */ true); - - if (!s.ok()) { - assert(block.IsEmpty()); - iter->Invalidate(s); - return iter; - } - - assert(block.GetValue() != nullptr); - - // Block contents are pinned and it is still pinned after the iterator - // is destroyed as long as cleanup functions are moved to another object, - // when: - // 1. block cache handle is set to be released in cleanup function, or - // 2. it's pointing to immortal source. If own_bytes is true then we are - // not reading data from the original source, whether immortal or not. - // Otherwise, the block is pinned iff the source is immortal. - const bool block_contents_pinned = - block.IsCached() || - (!block.GetValue()->own_bytes() && rep_->immortal_table); - iter = InitBlockIterator(rep_, block.GetValue(), block_type, iter, - block_contents_pinned); - - if (!block.IsCached()) { - if (!ro.fill_cache && rep_->cache_key_prefix_size != 0) { - // insert a dummy record to block cache to track the memory usage - Cache* const block_cache = rep_->table_options.block_cache.get(); - Cache::Handle* cache_handle = nullptr; - // There are two other types of cache keys: 1) SST cache key added in - // `MaybeReadBlockAndLoadToCache` 2) dummy cache key added in - // `write_buffer_manager`. Use longer prefix (41 bytes) to differentiate - // from SST cache key(31 bytes), and use non-zero prefix to - // differentiate from `write_buffer_manager` - const size_t kExtraCacheKeyPrefix = kMaxVarint64Length * 4 + 1; - char cache_key[kExtraCacheKeyPrefix + kMaxVarint64Length]; - // Prefix: use rep_->cache_key_prefix padded by 0s - memset(cache_key, 0, kExtraCacheKeyPrefix + kMaxVarint64Length); - assert(rep_->cache_key_prefix_size != 0); - assert(rep_->cache_key_prefix_size <= kExtraCacheKeyPrefix); - memcpy(cache_key, rep_->cache_key_prefix, rep_->cache_key_prefix_size); - char* end = EncodeVarint64(cache_key + kExtraCacheKeyPrefix, - next_cache_key_id_++); - assert(end - cache_key <= - static_cast(kExtraCacheKeyPrefix + kMaxVarint64Length)); - const Slice unique_key(cache_key, static_cast(end - cache_key)); - s = block_cache->Insert(unique_key, nullptr, - block.GetValue()->ApproximateMemoryUsage(), - nullptr, &cache_handle); - - if (s.ok()) { - assert(cache_handle != nullptr); - iter->RegisterCleanup(&ForceReleaseCachedEntry, block_cache, - cache_handle); - } - } - } else { - iter->SetCacheHandle(block.GetCacheHandle()); - } - - block.TransferTo(iter); - - return iter; -} template <> DataBlockIter* BlockBasedTable::InitBlockIterator( @@ -2054,75 +1364,6 @@ IndexBlockIter* BlockBasedTable::InitBlockIterator( block_contents_pinned); } -// Convert an uncompressed data block (i.e CachableEntry) -// into an iterator over the contents of the corresponding block. -// If input_iter is null, new a iterator -// If input_iter is not null, update this iter and return it -template -TBlockIter* BlockBasedTable::NewDataBlockIterator(const ReadOptions& ro, - CachableEntry& block, - TBlockIter* input_iter, - Status s) const { - PERF_TIMER_GUARD(new_table_block_iter_nanos); - - TBlockIter* iter = input_iter != nullptr ? input_iter : new TBlockIter; - if (!s.ok()) { - iter->Invalidate(s); - return iter; - } - - assert(block.GetValue() != nullptr); - // Block contents are pinned and it is still pinned after the iterator - // is destroyed as long as cleanup functions are moved to another object, - // when: - // 1. block cache handle is set to be released in cleanup function, or - // 2. it's pointing to immortal source. If own_bytes is true then we are - // not reading data from the original source, whether immortal or not. - // Otherwise, the block is pinned iff the source is immortal. - const bool block_contents_pinned = - block.IsCached() || - (!block.GetValue()->own_bytes() && rep_->immortal_table); - iter = InitBlockIterator(rep_, block.GetValue(), BlockType::kData, - iter, block_contents_pinned); - - if (!block.IsCached()) { - if (!ro.fill_cache && rep_->cache_key_prefix_size != 0) { - // insert a dummy record to block cache to track the memory usage - Cache* const block_cache = rep_->table_options.block_cache.get(); - Cache::Handle* cache_handle = nullptr; - // There are two other types of cache keys: 1) SST cache key added in - // `MaybeReadBlockAndLoadToCache` 2) dummy cache key added in - // `write_buffer_manager`. Use longer prefix (41 bytes) to differentiate - // from SST cache key(31 bytes), and use non-zero prefix to - // differentiate from `write_buffer_manager` - const size_t kExtraCacheKeyPrefix = kMaxVarint64Length * 4 + 1; - char cache_key[kExtraCacheKeyPrefix + kMaxVarint64Length]; - // Prefix: use rep_->cache_key_prefix padded by 0s - memset(cache_key, 0, kExtraCacheKeyPrefix + kMaxVarint64Length); - assert(rep_->cache_key_prefix_size != 0); - assert(rep_->cache_key_prefix_size <= kExtraCacheKeyPrefix); - memcpy(cache_key, rep_->cache_key_prefix, rep_->cache_key_prefix_size); - char* end = EncodeVarint64(cache_key + kExtraCacheKeyPrefix, - next_cache_key_id_++); - assert(end - cache_key <= - static_cast(kExtraCacheKeyPrefix + kMaxVarint64Length)); - const Slice unique_key(cache_key, static_cast(end - cache_key)); - s = block_cache->Insert(unique_key, nullptr, - block.GetValue()->ApproximateMemoryUsage(), - nullptr, &cache_handle); - if (s.ok()) { - assert(cache_handle != nullptr); - iter->RegisterCleanup(&ForceReleaseCachedEntry, block_cache, - cache_handle); - } - } - } else { - iter->SetCacheHandle(block.GetCacheHandle()); - } - - block.TransferTo(iter); - return iter; -} // If contents is nullptr, this function looks up the block caches for the // data block referenced by handle, and read the block from disk if necessary. @@ -2775,435 +2016,6 @@ bool BlockBasedTable::PrefixMayMatch( return may_match; } -template -void BlockBasedTableIterator::Seek(const Slice& target) { - SeekImpl(&target); -} - -template -void BlockBasedTableIterator::SeekToFirst() { - SeekImpl(nullptr); -} - -template -void BlockBasedTableIterator::SeekImpl( - const Slice* target) { - is_out_of_bound_ = false; - is_at_first_key_from_index_ = false; - if (target && !CheckPrefixMayMatch(*target, IterDirection::kForward)) { - ResetDataIter(); - return; - } - - bool need_seek_index = true; - if (block_iter_points_to_real_block_ && block_iter_.Valid()) { - // Reseek. - prev_block_offset_ = index_iter_->value().handle.offset(); - - if (target) { - // We can avoid an index seek if: - // 1. The new seek key is larger than the current key - // 2. The new seek key is within the upper bound of the block - // Since we don't necessarily know the internal key for either - // the current key or the upper bound, we check user keys and - // exclude the equality case. Considering internal keys can - // improve for the boundary cases, but it would complicate the - // code. - if (user_comparator_.Compare(ExtractUserKey(*target), - block_iter_.user_key()) > 0 && - user_comparator_.Compare(ExtractUserKey(*target), - index_iter_->user_key()) < 0) { - need_seek_index = false; - } - } - } - - if (need_seek_index) { - if (target) { - index_iter_->Seek(*target); - } else { - index_iter_->SeekToFirst(); - } - - if (!index_iter_->Valid()) { - ResetDataIter(); - return; - } - } - - IndexValue v = index_iter_->value(); - const bool same_block = block_iter_points_to_real_block_ && - v.handle.offset() == prev_block_offset_; - - // TODO(kolmike): Remove the != kBlockCacheTier condition. - if (!v.first_internal_key.empty() && !same_block && - (!target || icomp_.Compare(*target, v.first_internal_key) <= 0) && - read_options_.read_tier != kBlockCacheTier) { - // Index contains the first key of the block, and it's >= target. - // We can defer reading the block. - is_at_first_key_from_index_ = true; - // ResetDataIter() will invalidate block_iter_. Thus, there is no need to - // call CheckDataBlockWithinUpperBound() to check for iterate_upper_bound - // as that will be done later when the data block is actually read. - ResetDataIter(); - } else { - // Need to use the data block. - if (!same_block) { - InitDataBlock(); - } else { - // When the user does a reseek, the iterate_upper_bound might have - // changed. CheckDataBlockWithinUpperBound() needs to be called - // explicitly if the reseek ends up in the same data block. - // If the reseek ends up in a different block, InitDataBlock() will do - // the iterator upper bound check. - CheckDataBlockWithinUpperBound(); - } - - if (target) { - block_iter_.Seek(*target); - } else { - block_iter_.SeekToFirst(); - } - FindKeyForward(); - } - - CheckOutOfBound(); - - if (target) { - assert(!Valid() || ((block_type_ == BlockType::kIndex && - !table_->get_rep()->index_key_includes_seq) - ? (user_comparator_.Compare(ExtractUserKey(*target), - key()) <= 0) - : (icomp_.Compare(*target, key()) <= 0))); - } -} - -template -void BlockBasedTableIterator::SeekForPrev( - const Slice& target) { - is_out_of_bound_ = false; - is_at_first_key_from_index_ = false; - // For now totally disable prefix seek in auto prefix mode because we don't - // have logic - if (!CheckPrefixMayMatch(target, IterDirection::kBackward)) { - ResetDataIter(); - return; - } - - SavePrevIndexValue(); - - // Call Seek() rather than SeekForPrev() in the index block, because the - // target data block will likely to contain the position for `target`, the - // same as Seek(), rather than than before. - // For example, if we have three data blocks, each containing two keys: - // [2, 4] [6, 8] [10, 12] - // (the keys in the index block would be [4, 8, 12]) - // and the user calls SeekForPrev(7), we need to go to the second block, - // just like if they call Seek(7). - // The only case where the block is difference is when they seek to a position - // in the boundary. For example, if they SeekForPrev(5), we should go to the - // first block, rather than the second. However, we don't have the information - // to distinguish the two unless we read the second block. In this case, we'll - // end up with reading two blocks. - index_iter_->Seek(target); - - if (!index_iter_->Valid()) { - auto seek_status = index_iter_->status(); - // Check for IO error - if (!seek_status.IsNotFound() && !seek_status.ok()) { - ResetDataIter(); - return; - } - - // With prefix index, Seek() returns NotFound if the prefix doesn't exist - if (seek_status.IsNotFound()) { - // Any key less than the target is fine for prefix seek - ResetDataIter(); - return; - } else { - index_iter_->SeekToLast(); - } - // Check for IO error - if (!index_iter_->Valid()) { - ResetDataIter(); - return; - } - } - - InitDataBlock(); - - block_iter_.SeekForPrev(target); - - FindKeyBackward(); - CheckDataBlockWithinUpperBound(); - assert(!block_iter_.Valid() || - icomp_.Compare(target, block_iter_.key()) >= 0); -} - -template -void BlockBasedTableIterator::SeekToLast() { - is_out_of_bound_ = false; - is_at_first_key_from_index_ = false; - SavePrevIndexValue(); - index_iter_->SeekToLast(); - if (!index_iter_->Valid()) { - ResetDataIter(); - return; - } - InitDataBlock(); - block_iter_.SeekToLast(); - FindKeyBackward(); - CheckDataBlockWithinUpperBound(); -} - -template -void BlockBasedTableIterator::Next() { - if (is_at_first_key_from_index_ && !MaterializeCurrentBlock()) { - return; - } - assert(block_iter_points_to_real_block_); - block_iter_.Next(); - FindKeyForward(); - CheckOutOfBound(); -} - -template -bool BlockBasedTableIterator::NextAndGetResult( - IterateResult* result) { - Next(); - bool is_valid = Valid(); - if (is_valid) { - result->key = key(); - result->may_be_out_of_upper_bound = MayBeOutOfUpperBound(); - } - return is_valid; -} - -template -void BlockBasedTableIterator::Prev() { - if (is_at_first_key_from_index_) { - is_at_first_key_from_index_ = false; - - index_iter_->Prev(); - if (!index_iter_->Valid()) { - return; - } - - InitDataBlock(); - block_iter_.SeekToLast(); - } else { - assert(block_iter_points_to_real_block_); - block_iter_.Prev(); - } - - FindKeyBackward(); -} - -template -void BlockBasedTableIterator::InitDataBlock() { - BlockHandle data_block_handle = index_iter_->value().handle; - if (!block_iter_points_to_real_block_ || - data_block_handle.offset() != prev_block_offset_ || - // if previous attempt of reading the block missed cache, try again - block_iter_.status().IsIncomplete()) { - if (block_iter_points_to_real_block_) { - ResetDataIter(); - } - auto* rep = table_->get_rep(); - - // Prefetch additional data for range scans (iterators). Enabled only for - // user reads. - // Implicit auto readahead: - // Enabled after 2 sequential IOs when ReadOptions.readahead_size == 0. - // Explicit user requested readahead: - // Enabled from the very first IO when ReadOptions.readahead_size is set. - if (lookup_context_.caller != TableReaderCaller::kCompaction) { - if (read_options_.readahead_size == 0) { - // Implicit auto readahead - num_file_reads_++; - if (num_file_reads_ > - BlockBasedTable::kMinNumFileReadsToStartAutoReadahead) { - if (!rep->file->use_direct_io() && - (data_block_handle.offset() + - static_cast(block_size(data_block_handle)) > - readahead_limit_)) { - // Buffered I/O - // Discarding the return status of Prefetch calls intentionally, as - // we can fallback to reading from disk if Prefetch fails. - rep->file->Prefetch(data_block_handle.offset(), readahead_size_); - readahead_limit_ = static_cast(data_block_handle.offset() + - readahead_size_); - // Keep exponentially increasing readahead size until - // kMaxAutoReadaheadSize. - readahead_size_ = std::min(BlockBasedTable::kMaxAutoReadaheadSize, - readahead_size_ * 2); - } else if (rep->file->use_direct_io() && !prefetch_buffer_) { - // Direct I/O - // Let FilePrefetchBuffer take care of the readahead. - rep->CreateFilePrefetchBuffer( - BlockBasedTable::kInitAutoReadaheadSize, - BlockBasedTable::kMaxAutoReadaheadSize, &prefetch_buffer_); - } - } - } else if (!prefetch_buffer_) { - // Explicit user requested readahead - // The actual condition is: - // if (read_options_.readahead_size != 0 && !prefetch_buffer_) - rep->CreateFilePrefetchBuffer(read_options_.readahead_size, - read_options_.readahead_size, - &prefetch_buffer_); - } - } else if (!prefetch_buffer_) { - rep->CreateFilePrefetchBuffer(compaction_readahead_size_, - compaction_readahead_size_, - &prefetch_buffer_); - } - - Status s; - table_->NewDataBlockIterator( - read_options_, data_block_handle, &block_iter_, block_type_, - /*get_context=*/nullptr, &lookup_context_, s, prefetch_buffer_.get(), - /*for_compaction=*/lookup_context_.caller == - TableReaderCaller::kCompaction); - block_iter_points_to_real_block_ = true; - CheckDataBlockWithinUpperBound(); - } -} - -template -bool BlockBasedTableIterator::MaterializeCurrentBlock() { - assert(is_at_first_key_from_index_); - assert(!block_iter_points_to_real_block_); - assert(index_iter_->Valid()); - - is_at_first_key_from_index_ = false; - InitDataBlock(); - assert(block_iter_points_to_real_block_); - block_iter_.SeekToFirst(); - - if (!block_iter_.Valid() || - icomp_.Compare(block_iter_.key(), - index_iter_->value().first_internal_key) != 0) { - // Uh oh. - block_iter_.Invalidate(Status::Corruption( - "first key in index doesn't match first key in block")); - return false; - } - - return true; -} - -template -void BlockBasedTableIterator::FindKeyForward() { - // This method's code is kept short to make it likely to be inlined. - - assert(!is_out_of_bound_); - assert(block_iter_points_to_real_block_); - - if (!block_iter_.Valid()) { - // This is the only call site of FindBlockForward(), but it's extracted into - // a separate method to keep FindKeyForward() short and likely to be - // inlined. When transitioning to a different block, we call - // FindBlockForward(), which is much longer and is probably not inlined. - FindBlockForward(); - } else { - // This is the fast path that avoids a function call. - } -} - -template -void BlockBasedTableIterator::FindBlockForward() { - // TODO the while loop inherits from two-level-iterator. We don't know - // whether a block can be empty so it can be replaced by an "if". - do { - if (!block_iter_.status().ok()) { - return; - } - // Whether next data block is out of upper bound, if there is one. - const bool next_block_is_out_of_bound = - read_options_.iterate_upper_bound != nullptr && - block_iter_points_to_real_block_ && !data_block_within_upper_bound_; - assert(!next_block_is_out_of_bound || - user_comparator_.CompareWithoutTimestamp( - *read_options_.iterate_upper_bound, /*a_has_ts=*/false, - index_iter_->user_key(), /*b_has_ts=*/true) <= 0); - ResetDataIter(); - index_iter_->Next(); - if (next_block_is_out_of_bound) { - // The next block is out of bound. No need to read it. - TEST_SYNC_POINT_CALLBACK("BlockBasedTableIterator:out_of_bound", nullptr); - // We need to make sure this is not the last data block before setting - // is_out_of_bound_, since the index key for the last data block can be - // larger than smallest key of the next file on the same level. - if (index_iter_->Valid()) { - is_out_of_bound_ = true; - } - return; - } - - if (!index_iter_->Valid()) { - return; - } - - IndexValue v = index_iter_->value(); - - // TODO(kolmike): Remove the != kBlockCacheTier condition. - if (!v.first_internal_key.empty() && - read_options_.read_tier != kBlockCacheTier) { - // Index contains the first key of the block. Defer reading the block. - is_at_first_key_from_index_ = true; - return; - } - - InitDataBlock(); - block_iter_.SeekToFirst(); - } while (!block_iter_.Valid()); -} - -template -void BlockBasedTableIterator::FindKeyBackward() { - while (!block_iter_.Valid()) { - if (!block_iter_.status().ok()) { - return; - } - - ResetDataIter(); - index_iter_->Prev(); - - if (index_iter_->Valid()) { - InitDataBlock(); - block_iter_.SeekToLast(); - } else { - return; - } - } - - // We could have check lower bound here too, but we opt not to do it for - // code simplicity. -} - -template -void BlockBasedTableIterator::CheckOutOfBound() { - if (read_options_.iterate_upper_bound != nullptr && Valid()) { - is_out_of_bound_ = - user_comparator_.CompareWithoutTimestamp( - *read_options_.iterate_upper_bound, /*a_has_ts=*/false, user_key(), - /*b_has_ts=*/true) <= 0; - } -} - -template -void BlockBasedTableIterator::CheckDataBlockWithinUpperBound() { - if (read_options_.iterate_upper_bound != nullptr && - block_iter_points_to_real_block_) { - data_block_within_upper_bound_ = - (user_comparator_.CompareWithoutTimestamp( - *read_options_.iterate_upper_bound, /*a_has_ts=*/false, - index_iter_->user_key(), - /*b_has_ts=*/true) > 0); - } -} InternalIterator* BlockBasedTable::NewIterator( const ReadOptions& read_options, const SliceTransform* prefix_extractor, diff --git a/table/block_based/block_based_table_reader.h b/table/block_based/block_based_table_reader.h index d04325bece..536fad2c06 100644 --- a/table/block_based/block_based_table_reader.h +++ b/table/block_based/block_based_table_reader.h @@ -9,38 +9,18 @@ #pragma once -#include -#include -#include -#include -#include -#include - #include "db/range_tombstone_fragmenter.h" #include "file/filename.h" -#include "file/random_access_file_reader.h" -#include "options/cf_options.h" -#include "rocksdb/options.h" -#include "rocksdb/persistent_cache.h" -#include "rocksdb/statistics.h" -#include "rocksdb/status.h" -#include "rocksdb/table.h" -#include "table/block_based/block.h" +#include "table/block_based/cachable_entry.h" #include "table/block_based/block_based_table_factory.h" #include "table/block_based/block_type.h" -#include "table/block_based/cachable_entry.h" #include "table/block_based/filter_block.h" #include "table/block_based/uncompression_dict_reader.h" -#include "table/format.h" -#include "table/get_context.h" -#include "table/multiget_context.h" -#include "table/persistent_cache_helper.h" #include "table/table_properties_internal.h" #include "table/table_reader.h" #include "table/two_level_iterator.h" + #include "trace_replay/block_cache_tracer.h" -#include "util/coding.h" -#include "util/user_comparator_wrapper.h" namespace ROCKSDB_NAMESPACE { @@ -617,211 +597,4 @@ struct BlockBasedTable::Rep { !ioptions.allow_mmap_reads /* enable */)); } }; - -// Iterates over the contents of BlockBasedTable. -template -class BlockBasedTableIterator : public InternalIteratorBase { - // compaction_readahead_size: its value will only be used if for_compaction = - // true - public: - BlockBasedTableIterator(const BlockBasedTable* table, - const ReadOptions& read_options, - const InternalKeyComparator& icomp, - InternalIteratorBase* index_iter, - bool check_filter, bool need_upper_bound_check, - const SliceTransform* prefix_extractor, - BlockType block_type, TableReaderCaller caller, - size_t compaction_readahead_size = 0) - : table_(table), - read_options_(read_options), - icomp_(icomp), - user_comparator_(icomp.user_comparator()), - index_iter_(index_iter), - pinned_iters_mgr_(nullptr), - block_iter_points_to_real_block_(false), - check_filter_(check_filter), - need_upper_bound_check_(need_upper_bound_check), - prefix_extractor_(prefix_extractor), - block_type_(block_type), - lookup_context_(caller), - compaction_readahead_size_(compaction_readahead_size) {} - - ~BlockBasedTableIterator() { delete index_iter_; } - - void Seek(const Slice& target) override; - void SeekForPrev(const Slice& target) override; - void SeekToFirst() override; - void SeekToLast() override; - void Next() final override; - bool NextAndGetResult(IterateResult* result) override; - void Prev() override; - bool Valid() const override { - return !is_out_of_bound_ && - (is_at_first_key_from_index_ || - (block_iter_points_to_real_block_ && block_iter_.Valid())); - } - Slice key() const override { - assert(Valid()); - if (is_at_first_key_from_index_) { - return index_iter_->value().first_internal_key; - } else { - return block_iter_.key(); - } - } - Slice user_key() const override { - assert(Valid()); - if (is_at_first_key_from_index_) { - return ExtractUserKey(index_iter_->value().first_internal_key); - } else { - return block_iter_.user_key(); - } - } - TValue value() const override { - assert(Valid()); - - // Load current block if not loaded. - if (is_at_first_key_from_index_ && - !const_cast(this) - ->MaterializeCurrentBlock()) { - // Oops, index is not consistent with block contents, but we have - // no good way to report error at this point. Let's return empty value. - return TValue(); - } - - return block_iter_.value(); - } - Status status() const override { - // Prefix index set status to NotFound when the prefix does not exist - if (!index_iter_->status().ok() && !index_iter_->status().IsNotFound()) { - return index_iter_->status(); - } else if (block_iter_points_to_real_block_) { - return block_iter_.status(); - } else { - return Status::OK(); - } - } - - // Whether iterator invalidated for being out of bound. - bool IsOutOfBound() override { return is_out_of_bound_; } - - inline bool MayBeOutOfUpperBound() override { - assert(Valid()); - return !data_block_within_upper_bound_; - } - - void SetPinnedItersMgr(PinnedIteratorsManager* pinned_iters_mgr) override { - pinned_iters_mgr_ = pinned_iters_mgr; - } - bool IsKeyPinned() const override { - // Our key comes either from block_iter_'s current key - // or index_iter_'s current *value*. - return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() && - ((is_at_first_key_from_index_ && index_iter_->IsValuePinned()) || - (block_iter_points_to_real_block_ && block_iter_.IsKeyPinned())); - } - bool IsValuePinned() const override { - // Load current block if not loaded. - if (is_at_first_key_from_index_) { - const_cast(this)->MaterializeCurrentBlock(); - } - // BlockIter::IsValuePinned() is always true. No need to check - return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() && - block_iter_points_to_real_block_; - } - - void ResetDataIter() { - if (block_iter_points_to_real_block_) { - if (pinned_iters_mgr_ != nullptr && pinned_iters_mgr_->PinningEnabled()) { - block_iter_.DelegateCleanupsTo(pinned_iters_mgr_); - } - block_iter_.Invalidate(Status::OK()); - block_iter_points_to_real_block_ = false; - } - } - - void SavePrevIndexValue() { - if (block_iter_points_to_real_block_) { - // Reseek. If they end up with the same data block, we shouldn't re-fetch - // the same data block. - prev_block_offset_ = index_iter_->value().handle.offset(); - } - } - - private: - enum class IterDirection { - kForward, - kBackward, - }; - - const BlockBasedTable* table_; - const ReadOptions read_options_; - const InternalKeyComparator& icomp_; - UserComparatorWrapper user_comparator_; - InternalIteratorBase* index_iter_; - PinnedIteratorsManager* pinned_iters_mgr_; - TBlockIter block_iter_; - - // True if block_iter_ is initialized and points to the same block - // as index iterator. - bool block_iter_points_to_real_block_; - // See InternalIteratorBase::IsOutOfBound(). - bool is_out_of_bound_ = false; - // Whether current data block being fully within iterate upper bound. - bool data_block_within_upper_bound_ = false; - // True if we're standing at the first key of a block, and we haven't loaded - // that block yet. A call to value() will trigger loading the block. - bool is_at_first_key_from_index_ = false; - bool check_filter_; - // TODO(Zhongyi): pick a better name - bool need_upper_bound_check_; - const SliceTransform* prefix_extractor_; - BlockType block_type_; - uint64_t prev_block_offset_ = std::numeric_limits::max(); - BlockCacheLookupContext lookup_context_; - // Readahead size used in compaction, its value is used only if - // lookup_context_.caller = kCompaction. - size_t compaction_readahead_size_; - - size_t readahead_size_ = BlockBasedTable::kInitAutoReadaheadSize; - size_t readahead_limit_ = 0; - int64_t num_file_reads_ = 0; - std::unique_ptr prefetch_buffer_; - - // If `target` is null, seek to first. - void SeekImpl(const Slice* target); - - void InitDataBlock(); - bool MaterializeCurrentBlock(); - void FindKeyForward(); - void FindBlockForward(); - void FindKeyBackward(); - void CheckOutOfBound(); - - // Check if data block is fully within iterate_upper_bound. - // - // Note MyRocks may update iterate bounds between seek. To workaround it, - // we need to check and update data_block_within_upper_bound_ accordingly. - void CheckDataBlockWithinUpperBound(); - - bool CheckPrefixMayMatch(const Slice& ikey, IterDirection direction) { - if (need_upper_bound_check_ && direction == IterDirection::kBackward) { - // Upper bound check isn't sufficnet for backward direction to - // guarantee the same result as total order, so disable prefix - // check. - return true; - } - if (check_filter_ && - !table_->PrefixMayMatch(ikey, read_options_, prefix_extractor_, - need_upper_bound_check_, &lookup_context_)) { - // TODO remember the iterator is invalidated because of prefix - // match. This can avoid the upper level file iterator to falsely - // believe the position is the end of the SST file and move to - // the first key of the next file. - ResetDataIter(); - return false; - } - return true; - } -}; - } // namespace ROCKSDB_NAMESPACE diff --git a/table/block_based/block_based_table_reader_impl.h b/table/block_based/block_based_table_reader_impl.h new file mode 100644 index 0000000000..d9cfaa92ca --- /dev/null +++ b/table/block_based/block_based_table_reader_impl.h @@ -0,0 +1,190 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. +#pragma once +#include "table/block_based/block_based_table_reader.h" + +#include "table/block_based/reader_common.h" + +// The file contains some member functions of BlockBasedTable that +// cannot be implemented in block_based_table_reader.cc because +// it's called by other files (e.g. block_based_iterator.h) and +// are templates. + +namespace ROCKSDB_NAMESPACE { +// Convert an index iterator value (i.e., an encoded BlockHandle) +// into an iterator over the contents of the corresponding block. +// If input_iter is null, new a iterator +// If input_iter is not null, update this iter and return it +template +TBlockIter* BlockBasedTable::NewDataBlockIterator( + const ReadOptions& ro, const BlockHandle& handle, TBlockIter* input_iter, + BlockType block_type, GetContext* get_context, + BlockCacheLookupContext* lookup_context, Status s, + FilePrefetchBuffer* prefetch_buffer, bool for_compaction) const { + PERF_TIMER_GUARD(new_table_block_iter_nanos); + + TBlockIter* iter = input_iter != nullptr ? input_iter : new TBlockIter; + if (!s.ok()) { + iter->Invalidate(s); + return iter; + } + + CachableEntry uncompression_dict; + if (rep_->uncompression_dict_reader) { + const bool no_io = (ro.read_tier == kBlockCacheTier); + s = rep_->uncompression_dict_reader->GetOrReadUncompressionDictionary( + prefetch_buffer, no_io, get_context, lookup_context, + &uncompression_dict); + if (!s.ok()) { + iter->Invalidate(s); + return iter; + } + } + + const UncompressionDict& dict = uncompression_dict.GetValue() + ? *uncompression_dict.GetValue() + : UncompressionDict::GetEmptyDict(); + + CachableEntry block; + s = RetrieveBlock(prefetch_buffer, ro, handle, dict, &block, block_type, + get_context, lookup_context, for_compaction, + /* use_cache */ true); + + if (!s.ok()) { + assert(block.IsEmpty()); + iter->Invalidate(s); + return iter; + } + + assert(block.GetValue() != nullptr); + + // Block contents are pinned and it is still pinned after the iterator + // is destroyed as long as cleanup functions are moved to another object, + // when: + // 1. block cache handle is set to be released in cleanup function, or + // 2. it's pointing to immortal source. If own_bytes is true then we are + // not reading data from the original source, whether immortal or not. + // Otherwise, the block is pinned iff the source is immortal. + const bool block_contents_pinned = + block.IsCached() || + (!block.GetValue()->own_bytes() && rep_->immortal_table); + iter = InitBlockIterator(rep_, block.GetValue(), block_type, iter, + block_contents_pinned); + + if (!block.IsCached()) { + if (!ro.fill_cache && rep_->cache_key_prefix_size != 0) { + // insert a dummy record to block cache to track the memory usage + Cache* const block_cache = rep_->table_options.block_cache.get(); + Cache::Handle* cache_handle = nullptr; + // There are two other types of cache keys: 1) SST cache key added in + // `MaybeReadBlockAndLoadToCache` 2) dummy cache key added in + // `write_buffer_manager`. Use longer prefix (41 bytes) to differentiate + // from SST cache key(31 bytes), and use non-zero prefix to + // differentiate from `write_buffer_manager` + const size_t kExtraCacheKeyPrefix = kMaxVarint64Length * 4 + 1; + char cache_key[kExtraCacheKeyPrefix + kMaxVarint64Length]; + // Prefix: use rep_->cache_key_prefix padded by 0s + memset(cache_key, 0, kExtraCacheKeyPrefix + kMaxVarint64Length); + assert(rep_->cache_key_prefix_size != 0); + assert(rep_->cache_key_prefix_size <= kExtraCacheKeyPrefix); + memcpy(cache_key, rep_->cache_key_prefix, rep_->cache_key_prefix_size); + char* end = EncodeVarint64(cache_key + kExtraCacheKeyPrefix, + next_cache_key_id_++); + assert(end - cache_key <= + static_cast(kExtraCacheKeyPrefix + kMaxVarint64Length)); + const Slice unique_key(cache_key, static_cast(end - cache_key)); + s = block_cache->Insert(unique_key, nullptr, + block.GetValue()->ApproximateMemoryUsage(), + nullptr, &cache_handle); + + if (s.ok()) { + assert(cache_handle != nullptr); + iter->RegisterCleanup(&ForceReleaseCachedEntry, block_cache, + cache_handle); + } + } + } else { + iter->SetCacheHandle(block.GetCacheHandle()); + } + + block.TransferTo(iter); + + return iter; +} + +// Convert an uncompressed data block (i.e CachableEntry) +// into an iterator over the contents of the corresponding block. +// If input_iter is null, new a iterator +// If input_iter is not null, update this iter and return it +template +TBlockIter* BlockBasedTable::NewDataBlockIterator(const ReadOptions& ro, + CachableEntry& block, + TBlockIter* input_iter, + Status s) const { + PERF_TIMER_GUARD(new_table_block_iter_nanos); + + TBlockIter* iter = input_iter != nullptr ? input_iter : new TBlockIter; + if (!s.ok()) { + iter->Invalidate(s); + return iter; + } + + assert(block.GetValue() != nullptr); + // Block contents are pinned and it is still pinned after the iterator + // is destroyed as long as cleanup functions are moved to another object, + // when: + // 1. block cache handle is set to be released in cleanup function, or + // 2. it's pointing to immortal source. If own_bytes is true then we are + // not reading data from the original source, whether immortal or not. + // Otherwise, the block is pinned iff the source is immortal. + const bool block_contents_pinned = + block.IsCached() || + (!block.GetValue()->own_bytes() && rep_->immortal_table); + iter = InitBlockIterator(rep_, block.GetValue(), BlockType::kData, + iter, block_contents_pinned); + + if (!block.IsCached()) { + if (!ro.fill_cache && rep_->cache_key_prefix_size != 0) { + // insert a dummy record to block cache to track the memory usage + Cache* const block_cache = rep_->table_options.block_cache.get(); + Cache::Handle* cache_handle = nullptr; + // There are two other types of cache keys: 1) SST cache key added in + // `MaybeReadBlockAndLoadToCache` 2) dummy cache key added in + // `write_buffer_manager`. Use longer prefix (41 bytes) to differentiate + // from SST cache key(31 bytes), and use non-zero prefix to + // differentiate from `write_buffer_manager` + const size_t kExtraCacheKeyPrefix = kMaxVarint64Length * 4 + 1; + char cache_key[kExtraCacheKeyPrefix + kMaxVarint64Length]; + // Prefix: use rep_->cache_key_prefix padded by 0s + memset(cache_key, 0, kExtraCacheKeyPrefix + kMaxVarint64Length); + assert(rep_->cache_key_prefix_size != 0); + assert(rep_->cache_key_prefix_size <= kExtraCacheKeyPrefix); + memcpy(cache_key, rep_->cache_key_prefix, rep_->cache_key_prefix_size); + char* end = EncodeVarint64(cache_key + kExtraCacheKeyPrefix, + next_cache_key_id_++); + assert(end - cache_key <= + static_cast(kExtraCacheKeyPrefix + kMaxVarint64Length)); + const Slice unique_key(cache_key, static_cast(end - cache_key)); + s = block_cache->Insert(unique_key, nullptr, + block.GetValue()->ApproximateMemoryUsage(), + nullptr, &cache_handle); + if (s.ok()) { + assert(cache_handle != nullptr); + iter->RegisterCleanup(&ForceReleaseCachedEntry, block_cache, + cache_handle); + } + } + } else { + iter->SetCacheHandle(block.GetCacheHandle()); + } + + block.TransferTo(iter); + return iter; +} +} // namespace ROCKSDB_NAMESPACE diff --git a/table/block_based/hash_index_reader.cc b/table/block_based/hash_index_reader.cc new file mode 100644 index 0000000000..c1648bbe18 --- /dev/null +++ b/table/block_based/hash_index_reader.cc @@ -0,0 +1,146 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. +#include "table/block_based/hash_index_reader.h" + +#include "table/block_fetcher.h" +#include "table/meta_blocks.h" + +namespace ROCKSDB_NAMESPACE { +Status HashIndexReader::Create(const BlockBasedTable* table, + FilePrefetchBuffer* prefetch_buffer, + InternalIterator* meta_index_iter, + bool use_cache, bool prefetch, bool pin, + BlockCacheLookupContext* lookup_context, + std::unique_ptr* index_reader) { + assert(table != nullptr); + assert(index_reader != nullptr); + assert(!pin || prefetch); + + const BlockBasedTable::Rep* rep = table->get_rep(); + assert(rep != nullptr); + + CachableEntry index_block; + if (prefetch || !use_cache) { + const Status s = + ReadIndexBlock(table, prefetch_buffer, ReadOptions(), use_cache, + /*get_context=*/nullptr, lookup_context, &index_block); + if (!s.ok()) { + return s; + } + + if (use_cache && !pin) { + index_block.Reset(); + } + } + + // Note, failure to create prefix hash index does not need to be a + // hard error. We can still fall back to the original binary search index. + // So, Create will succeed regardless, from this point on. + + index_reader->reset(new HashIndexReader(table, std::move(index_block))); + + // Get prefixes block + BlockHandle prefixes_handle; + Status s = + FindMetaBlock(meta_index_iter, kHashIndexPrefixesBlock, &prefixes_handle); + if (!s.ok()) { + // TODO: log error + return Status::OK(); + } + + // Get index metadata block + BlockHandle prefixes_meta_handle; + s = FindMetaBlock(meta_index_iter, kHashIndexPrefixesMetadataBlock, + &prefixes_meta_handle); + if (!s.ok()) { + // TODO: log error + return Status::OK(); + } + + RandomAccessFileReader* const file = rep->file.get(); + const Footer& footer = rep->footer; + const ImmutableCFOptions& ioptions = rep->ioptions; + const PersistentCacheOptions& cache_options = rep->persistent_cache_options; + MemoryAllocator* const memory_allocator = + GetMemoryAllocator(rep->table_options); + + // Read contents for the blocks + BlockContents prefixes_contents; + BlockFetcher prefixes_block_fetcher( + file, prefetch_buffer, footer, ReadOptions(), prefixes_handle, + &prefixes_contents, ioptions, true /*decompress*/, + true /*maybe_compressed*/, BlockType::kHashIndexPrefixes, + UncompressionDict::GetEmptyDict(), cache_options, memory_allocator); + s = prefixes_block_fetcher.ReadBlockContents(); + if (!s.ok()) { + return s; + } + BlockContents prefixes_meta_contents; + BlockFetcher prefixes_meta_block_fetcher( + file, prefetch_buffer, footer, ReadOptions(), prefixes_meta_handle, + &prefixes_meta_contents, ioptions, true /*decompress*/, + true /*maybe_compressed*/, BlockType::kHashIndexMetadata, + UncompressionDict::GetEmptyDict(), cache_options, memory_allocator); + s = prefixes_meta_block_fetcher.ReadBlockContents(); + if (!s.ok()) { + // TODO: log error + return Status::OK(); + } + + BlockPrefixIndex* prefix_index = nullptr; + assert(rep->internal_prefix_transform.get() != nullptr); + s = BlockPrefixIndex::Create(rep->internal_prefix_transform.get(), + prefixes_contents.data, + prefixes_meta_contents.data, &prefix_index); + // TODO: log error + if (s.ok()) { + HashIndexReader* const hash_index_reader = + static_cast(index_reader->get()); + hash_index_reader->prefix_index_.reset(prefix_index); + } + + return Status::OK(); +} + +InternalIteratorBase* HashIndexReader::NewIterator( + const ReadOptions& read_options, bool disable_prefix_seek, + IndexBlockIter* iter, GetContext* get_context, + BlockCacheLookupContext* lookup_context) { + const BlockBasedTable::Rep* rep = table()->get_rep(); + const bool no_io = (read_options.read_tier == kBlockCacheTier); + CachableEntry index_block; + const Status s = + GetOrReadIndexBlock(no_io, get_context, lookup_context, &index_block); + if (!s.ok()) { + if (iter != nullptr) { + iter->Invalidate(s); + return iter; + } + + return NewErrorInternalIterator(s); + } + + Statistics* kNullStats = nullptr; + const bool total_order_seek = + read_options.total_order_seek || disable_prefix_seek; + // We don't return pinned data from index blocks, so no need + // to set `block_contents_pinned`. + auto it = index_block.GetValue()->NewIndexIterator( + internal_comparator(), internal_comparator()->user_comparator(), + rep->get_global_seqno(BlockType::kIndex), iter, kNullStats, + total_order_seek, index_has_first_key(), index_key_includes_seq(), + index_value_is_full(), false /* block_contents_pinned */, + prefix_index_.get()); + + assert(it != nullptr); + index_block.TransferTo(it); + + return it; +} +} // namespace ROCKSDB_NAMESPACE diff --git a/table/block_based/hash_index_reader.h b/table/block_based/hash_index_reader.h new file mode 100644 index 0000000000..fecd1e5c8b --- /dev/null +++ b/table/block_based/hash_index_reader.h @@ -0,0 +1,49 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. +#pragma once + +#include "table/block_based/index_reader_common.h" + +namespace ROCKSDB_NAMESPACE { +// Index that leverages an internal hash table to quicken the lookup for a given +// key. +class HashIndexReader : public BlockBasedTable::IndexReaderCommon { + public: + static Status Create(const BlockBasedTable* table, + FilePrefetchBuffer* prefetch_buffer, + InternalIterator* meta_index_iter, bool use_cache, + bool prefetch, bool pin, + BlockCacheLookupContext* lookup_context, + std::unique_ptr* index_reader); + + InternalIteratorBase* NewIterator( + const ReadOptions& read_options, bool disable_prefix_seek, + IndexBlockIter* iter, GetContext* get_context, + BlockCacheLookupContext* lookup_context) override; + + size_t ApproximateMemoryUsage() const override { + size_t usage = ApproximateIndexBlockMemoryUsage(); +#ifdef ROCKSDB_MALLOC_USABLE_SIZE + usage += malloc_usable_size(const_cast(this)); +#else + if (prefix_index_) { + usage += prefix_index_->ApproximateMemoryUsage(); + } + usage += sizeof(*this); +#endif // ROCKSDB_MALLOC_USABLE_SIZE + return usage; + } + + private: + HashIndexReader(const BlockBasedTable* t, CachableEntry&& index_block) + : IndexReaderCommon(t, std::move(index_block)) {} + + std::unique_ptr prefix_index_; +}; +} // namespace ROCKSDB_NAMESPACE diff --git a/table/block_based/index_reader_common.cc b/table/block_based/index_reader_common.cc new file mode 100644 index 0000000000..76f894d59f --- /dev/null +++ b/table/block_based/index_reader_common.cc @@ -0,0 +1,54 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. +#include "table/block_based/index_reader_common.h" + +namespace ROCKSDB_NAMESPACE { +Status BlockBasedTable::IndexReaderCommon::ReadIndexBlock( + const BlockBasedTable* table, FilePrefetchBuffer* prefetch_buffer, + const ReadOptions& read_options, bool use_cache, GetContext* get_context, + BlockCacheLookupContext* lookup_context, + CachableEntry* index_block) { + PERF_TIMER_GUARD(read_index_block_nanos); + + assert(table != nullptr); + assert(index_block != nullptr); + assert(index_block->IsEmpty()); + + const Rep* const rep = table->get_rep(); + assert(rep != nullptr); + + const Status s = table->RetrieveBlock( + prefetch_buffer, read_options, rep->footer.index_handle(), + UncompressionDict::GetEmptyDict(), index_block, BlockType::kIndex, + get_context, lookup_context, /* for_compaction */ false, use_cache); + + return s; +} + +Status BlockBasedTable::IndexReaderCommon::GetOrReadIndexBlock( + bool no_io, GetContext* get_context, + BlockCacheLookupContext* lookup_context, + CachableEntry* index_block) const { + assert(index_block != nullptr); + + if (!index_block_.IsEmpty()) { + index_block->SetUnownedValue(index_block_.GetValue()); + return Status::OK(); + } + + ReadOptions read_options; + if (no_io) { + read_options.read_tier = kBlockCacheTier; + } + + return ReadIndexBlock(table_, /*prefetch_buffer=*/nullptr, read_options, + cache_index_blocks(), get_context, lookup_context, + index_block); +} +} // namespace ROCKSDB_NAMESPACE diff --git a/table/block_based/index_reader_common.h b/table/block_based/index_reader_common.h new file mode 100644 index 0000000000..71174a7d30 --- /dev/null +++ b/table/block_based/index_reader_common.h @@ -0,0 +1,85 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. +#pragma once + +#include "table/block_based/block_based_table_reader.h" + +#include "table/block_based/reader_common.h" + +namespace ROCKSDB_NAMESPACE { +// Encapsulates common functionality for the various index reader +// implementations. Provides access to the index block regardless of whether +// it is owned by the reader or stored in the cache, or whether it is pinned +// in the cache or not. +class BlockBasedTable::IndexReaderCommon : public BlockBasedTable::IndexReader { + public: + IndexReaderCommon(const BlockBasedTable* t, + CachableEntry&& index_block) + : table_(t), index_block_(std::move(index_block)) { + assert(table_ != nullptr); + } + + protected: + static Status ReadIndexBlock(const BlockBasedTable* table, + FilePrefetchBuffer* prefetch_buffer, + const ReadOptions& read_options, bool use_cache, + GetContext* get_context, + BlockCacheLookupContext* lookup_context, + CachableEntry* index_block); + + const BlockBasedTable* table() const { return table_; } + + const InternalKeyComparator* internal_comparator() const { + assert(table_ != nullptr); + assert(table_->get_rep() != nullptr); + + return &table_->get_rep()->internal_comparator; + } + + bool index_has_first_key() const { + assert(table_ != nullptr); + assert(table_->get_rep() != nullptr); + return table_->get_rep()->index_has_first_key; + } + + bool index_key_includes_seq() const { + assert(table_ != nullptr); + assert(table_->get_rep() != nullptr); + return table_->get_rep()->index_key_includes_seq; + } + + bool index_value_is_full() const { + assert(table_ != nullptr); + assert(table_->get_rep() != nullptr); + return table_->get_rep()->index_value_is_full; + } + + bool cache_index_blocks() const { + assert(table_ != nullptr); + assert(table_->get_rep() != nullptr); + return table_->get_rep()->table_options.cache_index_and_filter_blocks; + } + + Status GetOrReadIndexBlock(bool no_io, GetContext* get_context, + BlockCacheLookupContext* lookup_context, + CachableEntry* index_block) const; + + size_t ApproximateIndexBlockMemoryUsage() const { + assert(!index_block_.GetOwnValue() || index_block_.GetValue() != nullptr); + return index_block_.GetOwnValue() + ? index_block_.GetValue()->ApproximateMemoryUsage() + : 0; + } + + private: + const BlockBasedTable* table_; + CachableEntry index_block_; +}; + +} // namespace ROCKSDB_NAMESPACE diff --git a/table/block_based/partitioned_index_reader.cc b/table/block_based/partitioned_index_reader.cc new file mode 100644 index 0000000000..aacbb60533 --- /dev/null +++ b/table/block_based/partitioned_index_reader.cc @@ -0,0 +1,175 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. +#include "table/block_based/partitioned_index_reader.h" +#include "table/block_based/block_based_table_iterator.h" + +namespace ROCKSDB_NAMESPACE { +Status PartitionIndexReader::Create( + const BlockBasedTable* table, FilePrefetchBuffer* prefetch_buffer, + bool use_cache, bool prefetch, bool pin, + BlockCacheLookupContext* lookup_context, + std::unique_ptr* index_reader) { + assert(table != nullptr); + assert(table->get_rep()); + assert(!pin || prefetch); + assert(index_reader != nullptr); + + CachableEntry index_block; + if (prefetch || !use_cache) { + const Status s = + ReadIndexBlock(table, prefetch_buffer, ReadOptions(), use_cache, + /*get_context=*/nullptr, lookup_context, &index_block); + if (!s.ok()) { + return s; + } + + if (use_cache && !pin) { + index_block.Reset(); + } + } + + index_reader->reset(new PartitionIndexReader(table, std::move(index_block))); + + return Status::OK(); +} + +InternalIteratorBase* PartitionIndexReader::NewIterator( + const ReadOptions& read_options, bool /* disable_prefix_seek */, + IndexBlockIter* iter, GetContext* get_context, + BlockCacheLookupContext* lookup_context) { + const bool no_io = (read_options.read_tier == kBlockCacheTier); + CachableEntry index_block; + const Status s = + GetOrReadIndexBlock(no_io, get_context, lookup_context, &index_block); + if (!s.ok()) { + if (iter != nullptr) { + iter->Invalidate(s); + return iter; + } + + return NewErrorInternalIterator(s); + } + + const BlockBasedTable::Rep* rep = table()->rep_; + InternalIteratorBase* it = nullptr; + + Statistics* kNullStats = nullptr; + // Filters are already checked before seeking the index + if (!partition_map_.empty()) { + // We don't return pinned data from index blocks, so no need + // to set `block_contents_pinned`. + it = NewTwoLevelIterator( + new BlockBasedTable::PartitionedIndexIteratorState(table(), + &partition_map_), + index_block.GetValue()->NewIndexIterator( + internal_comparator(), internal_comparator()->user_comparator(), + rep->get_global_seqno(BlockType::kIndex), nullptr, kNullStats, true, + index_has_first_key(), index_key_includes_seq(), + index_value_is_full())); + } else { + ReadOptions ro; + ro.fill_cache = read_options.fill_cache; + // We don't return pinned data from index blocks, so no need + // to set `block_contents_pinned`. + it = new BlockBasedTableIterator( + table(), ro, *internal_comparator(), + index_block.GetValue()->NewIndexIterator( + internal_comparator(), internal_comparator()->user_comparator(), + rep->get_global_seqno(BlockType::kIndex), nullptr, kNullStats, true, + index_has_first_key(), index_key_includes_seq(), + index_value_is_full()), + false, true, /* prefix_extractor */ nullptr, BlockType::kIndex, + lookup_context ? lookup_context->caller + : TableReaderCaller::kUncategorized); + } + + assert(it != nullptr); + index_block.TransferTo(it); + + return it; + + // TODO(myabandeh): Update TwoLevelIterator to be able to make use of + // on-stack BlockIter while the state is on heap. Currentlly it assumes + // the first level iter is always on heap and will attempt to delete it + // in its destructor. +} +void PartitionIndexReader::CacheDependencies(bool pin) { + // Before read partitions, prefetch them to avoid lots of IOs + BlockCacheLookupContext lookup_context{TableReaderCaller::kPrefetch}; + const BlockBasedTable::Rep* rep = table()->rep_; + IndexBlockIter biter; + BlockHandle handle; + Statistics* kNullStats = nullptr; + + CachableEntry index_block; + Status s = GetOrReadIndexBlock(false /* no_io */, nullptr /* get_context */, + &lookup_context, &index_block); + if (!s.ok()) { + ROCKS_LOG_WARN(rep->ioptions.info_log, + "Error retrieving top-level index block while trying to " + "cache index partitions: %s", + s.ToString().c_str()); + return; + } + + // We don't return pinned data from index blocks, so no need + // to set `block_contents_pinned`. + index_block.GetValue()->NewIndexIterator( + internal_comparator(), internal_comparator()->user_comparator(), + rep->get_global_seqno(BlockType::kIndex), &biter, kNullStats, true, + index_has_first_key(), index_key_includes_seq(), index_value_is_full()); + // Index partitions are assumed to be consecuitive. Prefetch them all. + // Read the first block offset + biter.SeekToFirst(); + if (!biter.Valid()) { + // Empty index. + return; + } + handle = biter.value().handle; + uint64_t prefetch_off = handle.offset(); + + // Read the last block's offset + biter.SeekToLast(); + if (!biter.Valid()) { + // Empty index. + return; + } + handle = biter.value().handle; + uint64_t last_off = handle.offset() + block_size(handle); + uint64_t prefetch_len = last_off - prefetch_off; + std::unique_ptr prefetch_buffer; + rep->CreateFilePrefetchBuffer(0, 0, &prefetch_buffer); + s = prefetch_buffer->Prefetch(rep->file.get(), prefetch_off, + static_cast(prefetch_len)); + + // After prefetch, read the partitions one by one + biter.SeekToFirst(); + auto ro = ReadOptions(); + for (; biter.Valid(); biter.Next()) { + handle = biter.value().handle; + CachableEntry block; + // TODO: Support counter batch update for partitioned index and + // filter blocks + s = table()->MaybeReadBlockAndLoadToCache( + prefetch_buffer.get(), ro, handle, UncompressionDict::GetEmptyDict(), + &block, BlockType::kIndex, /*get_context=*/nullptr, &lookup_context, + /*contents=*/nullptr); + + assert(s.ok() || block.GetValue() == nullptr); + if (s.ok() && block.GetValue() != nullptr) { + if (block.IsCached()) { + if (pin) { + partition_map_[handle.offset()] = std::move(block); + } + } + } + } +} + +} // namespace ROCKSDB_NAMESPACE diff --git a/table/block_based/partitioned_index_reader.h b/table/block_based/partitioned_index_reader.h new file mode 100644 index 0000000000..86397fd58f --- /dev/null +++ b/table/block_based/partitioned_index_reader.h @@ -0,0 +1,51 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. +#pragma once +#include "table/block_based/index_reader_common.h" + +namespace ROCKSDB_NAMESPACE { +// Index that allows binary search lookup in a two-level index structure. +class PartitionIndexReader : public BlockBasedTable::IndexReaderCommon { + public: + // Read the partition index from the file and create an instance for + // `PartitionIndexReader`. + // On success, index_reader will be populated; otherwise it will remain + // unmodified. + static Status Create(const BlockBasedTable* table, + FilePrefetchBuffer* prefetch_buffer, bool use_cache, + bool prefetch, bool pin, + BlockCacheLookupContext* lookup_context, + std::unique_ptr* index_reader); + + // return a two-level iterator: first level is on the partition index + InternalIteratorBase* NewIterator( + const ReadOptions& read_options, bool /* disable_prefix_seek */, + IndexBlockIter* iter, GetContext* get_context, + BlockCacheLookupContext* lookup_context) override; + + void CacheDependencies(bool pin) override; + size_t ApproximateMemoryUsage() const override { + size_t usage = ApproximateIndexBlockMemoryUsage(); +#ifdef ROCKSDB_MALLOC_USABLE_SIZE + usage += malloc_usable_size(const_cast(this)); +#else + usage += sizeof(*this); +#endif // ROCKSDB_MALLOC_USABLE_SIZE + // TODO(myabandeh): more accurate estimate of partition_map_ mem usage + return usage; + } + + private: + PartitionIndexReader(const BlockBasedTable* t, + CachableEntry&& index_block) + : IndexReaderCommon(t, std::move(index_block)) {} + + std::unordered_map> partition_map_; +}; +} // namespace ROCKSDB_NAMESPACE diff --git a/table/block_based/reader_common.cc b/table/block_based/reader_common.cc new file mode 100644 index 0000000000..4164e4ce16 --- /dev/null +++ b/table/block_based/reader_common.cc @@ -0,0 +1,47 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. +#include "table/block_based/reader_common.h" + +#include "util/crc32c.h" +#include "util/xxhash.h" + +namespace ROCKSDB_NAMESPACE { +void ForceReleaseCachedEntry(void* arg, void* h) { + Cache* cache = reinterpret_cast(arg); + Cache::Handle* handle = reinterpret_cast(h); + cache->Release(handle, true /* force_erase */); +} + +Status VerifyChecksum(const ChecksumType type, const char* buf, size_t len, + uint32_t expected) { + Status s; + uint32_t actual = 0; + switch (type) { + case kNoChecksum: + break; + case kCRC32c: + expected = crc32c::Unmask(expected); + actual = crc32c::Value(buf, len); + break; + case kxxHash: + actual = XXH32(buf, static_cast(len), 0); + break; + case kxxHash64: + actual = static_cast(XXH64(buf, static_cast(len), 0) & + uint64_t{0xffffffff}); + break; + default: + s = Status::Corruption("unknown checksum type"); + } + if (s.ok() && actual != expected) { + s = Status::Corruption("properties block checksum mismatched"); + } + return s; +} +} // namespace ROCKSDB_NAMESPACE diff --git a/table/block_based/reader_common.h b/table/block_based/reader_common.h new file mode 100644 index 0000000000..8fa68d49c0 --- /dev/null +++ b/table/block_based/reader_common.h @@ -0,0 +1,33 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. +#pragma once + +#include "rocksdb/table.h" + +namespace ROCKSDB_NAMESPACE { +// Release the cached entry and decrement its ref count. +extern void ForceReleaseCachedEntry(void* arg, void* h); + +inline MemoryAllocator* GetMemoryAllocator( + const BlockBasedTableOptions& table_options) { + return table_options.block_cache.get() + ? table_options.block_cache->memory_allocator() + : nullptr; +} + +inline MemoryAllocator* GetMemoryAllocatorForCompressedBlock( + const BlockBasedTableOptions& table_options) { + return table_options.block_cache_compressed.get() + ? table_options.block_cache_compressed->memory_allocator() + : nullptr; +} + +extern Status VerifyChecksum(const ChecksumType type, const char* buf, + size_t len, uint32_t expected); +} // namespace ROCKSDB_NAMESPACE