diff --git a/db/db_test2.cc b/db/db_test2.cc index ca8986c4d8..c223f2b008 100644 --- a/db/db_test2.cc +++ b/db/db_test2.cc @@ -2282,12 +2282,15 @@ TEST_F(DBTest2, RateLimitedCompactionReads) { // chose 1MB as the upper bound on the total bytes read. size_t rate_limited_bytes = options.rate_limiter->GetTotalBytesThrough(Env::IO_LOW); - ASSERT_GE( - rate_limited_bytes, - static_cast(kNumKeysPerFile * kBytesPerKey * kNumL0Files)); + // Include the explict prefetch of the footer in direct I/O case. + size_t direct_io_extra = use_direct_io ? 512 * 1024 : 0; + ASSERT_GE(rate_limited_bytes, + static_cast(kNumKeysPerFile * kBytesPerKey * kNumL0Files + + direct_io_extra)); ASSERT_LT( rate_limited_bytes, - static_cast(2 * kNumKeysPerFile * kBytesPerKey * kNumL0Files)); + static_cast(2 * kNumKeysPerFile * kBytesPerKey * kNumL0Files + + direct_io_extra)); Iterator* iter = db_->NewIterator(ReadOptions()); for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { diff --git a/table/adaptive_table_factory.cc b/table/adaptive_table_factory.cc index f83905dff3..47069f8669 100644 --- a/table/adaptive_table_factory.cc +++ b/table/adaptive_table_factory.cc @@ -46,7 +46,8 @@ Status AdaptiveTableFactory::NewTableReader( unique_ptr* table, bool prefetch_index_and_filter_in_cache) const { Footer footer; - auto s = ReadFooterFromFile(file.get(), file_size, &footer); + auto s = ReadFooterFromFile(file.get(), nullptr /* prefetch_buffer */, + file_size, &footer); if (!s.ok()) { return s; } diff --git a/table/block_based_table_reader.cc b/table/block_based_table_reader.cc index 5931692f02..89e0c73549 100644 --- a/table/block_based_table_reader.cc +++ b/table/block_based_table_reader.cc @@ -70,17 +70,17 @@ namespace { // On success fill *result and return OK - caller owns *result // @param compression_dict Data for presetting the compression library's // dictionary. -Status ReadBlockFromFile(RandomAccessFileReader* file, const Footer& footer, - const ReadOptions& options, const BlockHandle& handle, - std::unique_ptr* result, - const ImmutableCFOptions& ioptions, bool do_uncompress, - const Slice& compression_dict, - const PersistentCacheOptions& cache_options, - SequenceNumber global_seqno, - size_t read_amp_bytes_per_bit) { +Status ReadBlockFromFile( + RandomAccessFileReader* file, FilePrefetchBuffer* prefetch_buffer, + const Footer& footer, const ReadOptions& options, const BlockHandle& handle, + std::unique_ptr* result, const ImmutableCFOptions& ioptions, + bool do_uncompress, const Slice& compression_dict, + const PersistentCacheOptions& cache_options, SequenceNumber global_seqno, + size_t read_amp_bytes_per_bit) { BlockContents contents; - Status s = ReadBlockContents(file, footer, options, handle, &contents, ioptions, - do_uncompress, compression_dict, cache_options); + Status s = ReadBlockContents(file, prefetch_buffer, footer, options, handle, + &contents, ioptions, do_uncompress, + compression_dict, cache_options); if (s.ok()) { result->reset(new Block(std::move(contents), global_seqno, read_amp_bytes_per_bit, ioptions.statistics)); @@ -157,6 +157,7 @@ class PartitionIndexReader : public IndexReader, public Cleanable { // On success, index_reader will be populated; otherwise it will remain // unmodified. static Status Create(BlockBasedTable* table, RandomAccessFileReader* file, + FilePrefetchBuffer* prefetch_buffer, const Footer& footer, const BlockHandle& index_handle, const ImmutableCFOptions& ioptions, const InternalKeyComparator* icomparator, @@ -165,8 +166,9 @@ class PartitionIndexReader : public IndexReader, public Cleanable { const int level) { std::unique_ptr index_block; auto s = ReadBlockFromFile( - file, footer, ReadOptions(), index_handle, &index_block, ioptions, - true /* decompress */, Slice() /*compression dict*/, cache_options, + file, prefetch_buffer, footer, ReadOptions(), index_handle, + &index_block, ioptions, true /* decompress */, + Slice() /*compression dict*/, cache_options, kDisableGlobalSequenceNumber, 0 /* read_amp_bytes_per_bit */); if (s.ok()) { @@ -238,16 +240,18 @@ class BinarySearchIndexReader : public IndexReader { // `BinarySearchIndexReader`. // On success, index_reader will be populated; otherwise it will remain // unmodified. - static Status Create(RandomAccessFileReader* file, const Footer& footer, - const BlockHandle& index_handle, + static Status Create(RandomAccessFileReader* file, + FilePrefetchBuffer* prefetch_buffer, + const Footer& footer, const BlockHandle& index_handle, const ImmutableCFOptions& ioptions, const InternalKeyComparator* icomparator, IndexReader** index_reader, const PersistentCacheOptions& cache_options) { std::unique_ptr index_block; auto s = ReadBlockFromFile( - file, footer, ReadOptions(), index_handle, &index_block, ioptions, - true /* decompress */, Slice() /*compression dict*/, cache_options, + file, prefetch_buffer, footer, ReadOptions(), index_handle, + &index_block, ioptions, true /* decompress */, + Slice() /*compression dict*/, cache_options, kDisableGlobalSequenceNumber, 0 /* read_amp_bytes_per_bit */); if (s.ok()) { @@ -289,6 +293,7 @@ class HashIndexReader : public IndexReader { public: static Status Create(const SliceTransform* hash_key_extractor, const Footer& footer, RandomAccessFileReader* file, + FilePrefetchBuffer* prefetch_buffer, const ImmutableCFOptions& ioptions, const InternalKeyComparator* icomparator, const BlockHandle& index_handle, @@ -298,8 +303,9 @@ class HashIndexReader : public IndexReader { const PersistentCacheOptions& cache_options) { std::unique_ptr index_block; auto s = ReadBlockFromFile( - file, footer, ReadOptions(), index_handle, &index_block, ioptions, - true /* decompress */, Slice() /*compression dict*/, cache_options, + file, prefetch_buffer, footer, ReadOptions(), index_handle, + &index_block, ioptions, true /* decompress */, + Slice() /*compression dict*/, cache_options, kDisableGlobalSequenceNumber, 0 /* read_amp_bytes_per_bit */); if (!s.ok()) { @@ -335,15 +341,17 @@ class HashIndexReader : public IndexReader { // Read contents for the blocks BlockContents prefixes_contents; - s = ReadBlockContents(file, footer, ReadOptions(), prefixes_handle, - &prefixes_contents, ioptions, true /* decompress */, - Slice() /*compression dict*/, cache_options); + s = ReadBlockContents(file, prefetch_buffer, footer, ReadOptions(), + prefixes_handle, &prefixes_contents, ioptions, + true /* decompress */, Slice() /*compression dict*/, + cache_options); if (!s.ok()) { return s; } BlockContents prefixes_meta_contents; - s = ReadBlockContents(file, footer, ReadOptions(), prefixes_meta_handle, - &prefixes_meta_contents, ioptions, true /* decompress */, + s = ReadBlockContents(file, prefetch_buffer, footer, ReadOptions(), + prefixes_meta_handle, &prefixes_meta_contents, + ioptions, true /* decompress */, Slice() /*compression dict*/, cache_options); if (!s.ok()) { // TODO: log error @@ -535,12 +543,29 @@ Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions, Footer footer; + std::unique_ptr prefetch_buffer; + // Before read footer, readahead backwards to prefetch data - Status s = - file->Prefetch((file_size < 512 * 1024 ? 0 : file_size - 512 * 1024), - 512 * 1024 /* 512 KB prefetching */); - s = ReadFooterFromFile(file.get(), file_size, &footer, - kBlockBasedTableMagicNumber); + const size_t kTailPrefetchSize = 512 * 1024; + size_t prefetch_off; + size_t prefetch_len; + if (file_size < kTailPrefetchSize) { + prefetch_off = 0; + prefetch_len = file_size; + } else { + prefetch_off = file_size - kTailPrefetchSize; + prefetch_len = kTailPrefetchSize; + } + Status s; + // TODO should not have this special logic in the future. + if (!file->use_direct_io()) { + s = file->Prefetch(prefetch_off, prefetch_len); + } else { + prefetch_buffer.reset(new FilePrefetchBuffer()); + s = prefetch_buffer->Prefetch(file.get(), prefetch_off, prefetch_len); + } + s = ReadFooterFromFile(file.get(), prefetch_buffer.get(), file_size, &footer, + kBlockBasedTableMagicNumber); if (!s.ok()) { return s; } @@ -577,7 +602,7 @@ Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions, // Read meta index std::unique_ptr meta; std::unique_ptr meta_iter; - s = ReadMetaBlock(rep, &meta, &meta_iter); + s = ReadMetaBlock(rep, prefetch_buffer.get(), &meta, &meta_iter); if (!s.ok()) { return s; } @@ -623,8 +648,9 @@ Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions, s = meta_iter->status(); TableProperties* table_properties = nullptr; if (s.ok()) { - s = ReadProperties(meta_iter->value(), rep->file.get(), rep->footer, - rep->ioptions, &table_properties); + s = ReadProperties(meta_iter->value(), rep->file.get(), + prefetch_buffer.get(), rep->footer, rep->ioptions, + &table_properties); } if (!s.ok()) { @@ -655,9 +681,9 @@ Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions, // TODO(andrewkr): ReadMetaBlock repeats SeekToCompressionDictBlock(). // maybe decode a handle from meta_iter // and do ReadBlockContents(handle) instead - s = rocksdb::ReadMetaBlock(rep->file.get(), file_size, - kBlockBasedTableMagicNumber, rep->ioptions, - rocksdb::kCompressionDictBlock, + s = rocksdb::ReadMetaBlock(rep->file.get(), prefetch_buffer.get(), + file_size, kBlockBasedTableMagicNumber, + rep->ioptions, rocksdb::kCompressionDictBlock, compression_dict_block.get()); if (!s.ok()) { ROCKS_LOG_WARN( @@ -682,6 +708,7 @@ Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions, } else { if (found_range_del_block && !rep->range_del_handle.IsNull()) { ReadOptions read_options; + // TODO: try to use prefetched buffer too. s = MaybeLoadDataBlockToCache(rep, read_options, rep->range_del_handle, Slice() /* compression_dict */, &rep->range_del_entry); @@ -753,7 +780,8 @@ Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions, // pre-load these blocks, which will kept in member variables in Rep // and with a same life-time as this table object. IndexReader* index_reader = nullptr; - s = new_table->CreateIndexReader(&index_reader, meta_iter.get(), level); + s = new_table->CreateIndexReader(prefetch_buffer.get(), &index_reader, + meta_iter.get(), level); if (s.ok()) { rep->index_reader.reset(index_reader); @@ -761,8 +789,8 @@ Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions, // Set filter block if (rep->filter_policy) { const bool is_a_filter_partition = true; - rep->filter.reset( - new_table->ReadFilter(rep->filter_handle, !is_a_filter_partition)); + rep->filter.reset(new_table->ReadFilter( + prefetch_buffer.get(), rep->filter_handle, !is_a_filter_partition)); if (rep->filter.get()) { rep->filter->SetLevel(level); } @@ -816,13 +844,14 @@ size_t BlockBasedTable::ApproximateMemoryUsage() const { // Load the meta-block from the file. On success, return the loaded meta block // and its iterator. Status BlockBasedTable::ReadMetaBlock(Rep* rep, + FilePrefetchBuffer* prefetch_buffer, std::unique_ptr* meta_block, std::unique_ptr* iter) { // TODO(sanjay): Skip this if footer.metaindex_handle() size indicates // it is an empty block. std::unique_ptr meta; Status s = ReadBlockFromFile( - rep->file.get(), rep->footer, ReadOptions(), + rep->file.get(), prefetch_buffer, rep->footer, ReadOptions(), rep->footer.metaindex_handle(), &meta, rep->ioptions, true /* decompress */, Slice() /*compression dict*/, rep->persistent_cache_options, kDisableGlobalSequenceNumber, @@ -1021,7 +1050,8 @@ Status BlockBasedTable::PutDataBlockToCache( } FilterBlockReader* BlockBasedTable::ReadFilter( - const BlockHandle& filter_handle, const bool is_a_filter_partition) const { + FilePrefetchBuffer* prefetch_buffer, const BlockHandle& filter_handle, + const bool is_a_filter_partition) const { auto& rep = rep_; // TODO: We might want to unify with ReadBlockFromFile() if we start // requiring checksum verification in Table::Open. @@ -1029,8 +1059,8 @@ FilterBlockReader* BlockBasedTable::ReadFilter( return nullptr; } BlockContents block; - if (!ReadBlockContents(rep->file.get(), rep->footer, ReadOptions(), - filter_handle, &block, rep->ioptions, + if (!ReadBlockContents(rep->file.get(), prefetch_buffer, rep->footer, + ReadOptions(), filter_handle, &block, rep->ioptions, false /* decompress */, Slice() /*compression dict*/, rep->persistent_cache_options) .ok()) { @@ -1127,7 +1157,8 @@ BlockBasedTable::CachableEntry BlockBasedTable::GetFilter( // Do not invoke any io. return CachableEntry(); } else { - filter = ReadFilter(filter_blk_handle, is_a_filter_partition); + filter = ReadFilter(nullptr /* prefetch_buffer */, filter_blk_handle, + is_a_filter_partition); if (filter != nullptr) { assert(filter->size() > 0); Status s = block_cache->Insert( @@ -1195,7 +1226,7 @@ InternalIterator* BlockBasedTable::NewIndexIterator( // Create index reader and put it in the cache. Status s; TEST_SYNC_POINT("BlockBasedTable::NewIndexIterator::thread2:2"); - s = CreateIndexReader(&index_reader); + s = CreateIndexReader(nullptr /* prefetch_buffer */, &index_reader); TEST_SYNC_POINT("BlockBasedTable::NewIndexIterator::thread1:1"); TEST_SYNC_POINT("BlockBasedTable::NewIndexIterator::thread2:3"); TEST_SYNC_POINT("BlockBasedTable::NewIndexIterator::thread1:4"); @@ -1290,10 +1321,11 @@ InternalIterator* BlockBasedTable::NewDataBlockIterator( } } std::unique_ptr block_value; - s = ReadBlockFromFile( - rep->file.get(), rep->footer, ro, handle, &block_value, rep->ioptions, - true /* compress */, compression_dict, rep->persistent_cache_options, - rep->global_seqno, rep->table_options.read_amp_bytes_per_bit); + s = ReadBlockFromFile(rep->file.get(), nullptr /* prefetch_buffer */, + rep->footer, ro, handle, &block_value, rep->ioptions, + true /* compress */, compression_dict, + rep->persistent_cache_options, rep->global_seqno, + rep->table_options.read_amp_bytes_per_bit); if (s.ok()) { block.value = block_value.release(); } @@ -1360,11 +1392,12 @@ Status BlockBasedTable::MaybeLoadDataBlockToCache( std::unique_ptr raw_block; { StopWatch sw(rep->ioptions.env, statistics, READ_BLOCK_GET_MICROS); - s = ReadBlockFromFile( - rep->file.get(), rep->footer, ro, handle, &raw_block, rep->ioptions, - block_cache_compressed == nullptr, compression_dict, - rep->persistent_cache_options, rep->global_seqno, - rep->table_options.read_amp_bytes_per_bit); + s = ReadBlockFromFile(rep->file.get(), nullptr /* prefetch_buffer*/, + rep->footer, ro, handle, &raw_block, + rep->ioptions, block_cache_compressed == nullptr, + compression_dict, rep->persistent_cache_options, + rep->global_seqno, + rep->table_options.read_amp_bytes_per_bit); } if (s.ok()) { @@ -1750,7 +1783,7 @@ Status BlockBasedTable::VerifyChecksum() { // Check Meta blocks std::unique_ptr meta; std::unique_ptr meta_iter; - s = ReadMetaBlock(rep_, &meta, &meta_iter); + s = ReadMetaBlock(rep_, nullptr /* prefetch buffer */, &meta, &meta_iter); if (s.ok()) { s = VerifyChecksumInBlocks(meta_iter.get()); if (!s.ok()) { @@ -1788,9 +1821,10 @@ Status BlockBasedTable::VerifyChecksumInBlocks(InternalIterator* index_iter) { break; } BlockContents contents; - s = ReadBlockContents(rep_->file.get(), rep_->footer, ReadOptions(), - handle, &contents, rep_->ioptions, - false /* decompress */, Slice() /*compression dict*/, + s = ReadBlockContents(rep_->file.get(), nullptr /* prefetch buffer */, + rep_->footer, ReadOptions(), handle, &contents, + rep_->ioptions, false /* decompress */, + Slice() /*compression dict*/, rep_->persistent_cache_options); if (!s.ok()) { break; @@ -1840,8 +1874,8 @@ bool BlockBasedTable::TEST_KeyInCache(const ReadOptions& options, // 4. internal_comparator // 5. index_type Status BlockBasedTable::CreateIndexReader( - IndexReader** index_reader, InternalIterator* preloaded_meta_index_iter, - int level) { + FilePrefetchBuffer* prefetch_buffer, IndexReader** index_reader, + InternalIterator* preloaded_meta_index_iter, int level) { // Some old version of block-based tables don't have index type present in // table properties. If that's the case we can safely use the kBinarySearch. auto index_type_on_file = BlockBasedTableOptions::kBinarySearch; @@ -1869,20 +1903,22 @@ Status BlockBasedTable::CreateIndexReader( switch (index_type_on_file) { case BlockBasedTableOptions::kTwoLevelIndexSearch: { return PartitionIndexReader::Create( - this, file, footer, footer.index_handle(), rep_->ioptions, - icomparator, index_reader, rep_->persistent_cache_options, level); + this, file, prefetch_buffer, footer, footer.index_handle(), + rep_->ioptions, icomparator, index_reader, + rep_->persistent_cache_options, level); } case BlockBasedTableOptions::kBinarySearch: { return BinarySearchIndexReader::Create( - file, footer, footer.index_handle(), rep_->ioptions, icomparator, - index_reader, rep_->persistent_cache_options); + file, prefetch_buffer, footer, footer.index_handle(), rep_->ioptions, + icomparator, index_reader, rep_->persistent_cache_options); } case BlockBasedTableOptions::kHashSearch: { std::unique_ptr meta_guard; std::unique_ptr meta_iter_guard; auto meta_index_iter = preloaded_meta_index_iter; if (meta_index_iter == nullptr) { - auto s = ReadMetaBlock(rep_, &meta_guard, &meta_iter_guard); + auto s = + ReadMetaBlock(rep_, prefetch_buffer, &meta_guard, &meta_iter_guard); if (!s.ok()) { // we simply fall back to binary search in case there is any // problem with prefix hash index loading. @@ -1890,16 +1926,18 @@ Status BlockBasedTable::CreateIndexReader( "Unable to read the metaindex block." " Fall back to binary search index."); return BinarySearchIndexReader::Create( - file, footer, footer.index_handle(), rep_->ioptions, icomparator, - index_reader, rep_->persistent_cache_options); + file, prefetch_buffer, footer, footer.index_handle(), + rep_->ioptions, icomparator, index_reader, + rep_->persistent_cache_options); } meta_index_iter = meta_iter_guard.get(); } return HashIndexReader::Create( - rep_->internal_prefix_transform.get(), footer, file, rep_->ioptions, - icomparator, footer.index_handle(), meta_index_iter, index_reader, - rep_->hash_index_allow_collision, rep_->persistent_cache_options); + rep_->internal_prefix_transform.get(), footer, file, prefetch_buffer, + rep_->ioptions, icomparator, footer.index_handle(), meta_index_iter, + index_reader, rep_->hash_index_allow_collision, + rep_->persistent_cache_options); } default: { std::string error_message = @@ -2015,7 +2053,8 @@ Status BlockBasedTable::DumpTable(WritableFile* out_file) { "--------------------------------------\n"); std::unique_ptr meta; std::unique_ptr meta_iter; - Status s = ReadMetaBlock(rep_, &meta, &meta_iter); + Status s = + ReadMetaBlock(rep_, nullptr /* prefetch_buffer */, &meta, &meta_iter); if (s.ok()) { for (meta_iter->SeekToFirst(); meta_iter->Valid(); meta_iter->Next()) { s = meta_iter->status(); @@ -2071,10 +2110,11 @@ Status BlockBasedTable::DumpTable(WritableFile* out_file) { BlockHandle handle; if (FindMetaBlock(meta_iter.get(), filter_block_key, &handle).ok()) { BlockContents block; - if (ReadBlockContents( - rep_->file.get(), rep_->footer, ReadOptions(), handle, &block, - rep_->ioptions, false /*decompress*/, - Slice() /*compression dict*/, rep_->persistent_cache_options) + if (ReadBlockContents(rep_->file.get(), nullptr /* prefetch_buffer */, + rep_->footer, ReadOptions(), handle, &block, + rep_->ioptions, false /*decompress*/, + Slice() /*compression dict*/, + rep_->persistent_cache_options) .ok()) { rep_->filter.reset(new BlockBasedFilterBlockReader( rep_->ioptions.prefix_extractor, table_options, diff --git a/table/block_based_table_reader.h b/table/block_based_table_reader.h index 3451614c87..457edce220 100644 --- a/table/block_based_table_reader.h +++ b/table/block_based_table_reader.h @@ -300,7 +300,7 @@ class BlockBasedTable : public TableReader { // need to access extra meta blocks for index construction. This parameter // helps avoid re-reading meta index block if caller already created one. Status CreateIndexReader( - IndexReader** index_reader, + FilePrefetchBuffer* prefetch_buffer, IndexReader** index_reader, InternalIterator* preloaded_meta_index_iter = nullptr, const int level = -1); @@ -309,13 +309,15 @@ class BlockBasedTable : public TableReader { const bool no_io) const; // Read the meta block from sst. - static Status ReadMetaBlock(Rep* rep, std::unique_ptr* meta_block, + static Status ReadMetaBlock(Rep* rep, FilePrefetchBuffer* prefetch_buffer, + std::unique_ptr* meta_block, std::unique_ptr* iter); Status VerifyChecksumInBlocks(InternalIterator* index_iter); // Create the filter from the filter block. - FilterBlockReader* ReadFilter(const BlockHandle& filter_handle, + FilterBlockReader* ReadFilter(FilePrefetchBuffer* prefetch_buffer, + const BlockHandle& filter_handle, const bool is_a_filter_partition) const; static void SetupCacheKeyPrefix(Rep* rep, uint64_t file_size); diff --git a/table/format.cc b/table/format.cc index 3e5a191bbf..e5f2df0074 100644 --- a/table/format.cc +++ b/table/format.cc @@ -216,8 +216,10 @@ std::string Footer::ToString() const { return result; } -Status ReadFooterFromFile(RandomAccessFileReader* file, uint64_t file_size, - Footer* footer, uint64_t enforce_table_magic_number) { +Status ReadFooterFromFile(RandomAccessFileReader* file, + FilePrefetchBuffer* prefetch_buffer, + uint64_t file_size, Footer* footer, + uint64_t enforce_table_magic_number) { if (file_size < Footer::kMinEncodedLength) { return Status::Corruption( "file is too short (" + ToString(file_size) + " bytes) to be an " @@ -230,9 +232,14 @@ Status ReadFooterFromFile(RandomAccessFileReader* file, uint64_t file_size, (file_size > Footer::kMaxEncodedLength) ? static_cast(file_size - Footer::kMaxEncodedLength) : 0; - Status s = file->Read(read_offset, Footer::kMaxEncodedLength, &footer_input, - footer_space); - if (!s.ok()) return s; + Status s; + if (prefetch_buffer == nullptr || + !prefetch_buffer->TryReadFromCache(read_offset, Footer::kMaxEncodedLength, + &footer_input)) { + s = file->Read(read_offset, Footer::kMaxEncodedLength, &footer_input, + footer_space); + if (!s.ok()) return s; + } // Check that we actually read the whole footer from the file. It may be // that size isn't correct. @@ -259,6 +266,43 @@ Status ReadFooterFromFile(RandomAccessFileReader* file, uint64_t file_size, // Without anonymous namespace here, we fail the warning -Wmissing-prototypes namespace { +Status CheckBlockChecksum(const ReadOptions& options, const Footer& footer, + const Slice& contents, size_t block_size, + RandomAccessFileReader* file, + const BlockHandle& handle) { + Status s; + // Check the crc of the type and the block contents + if (options.verify_checksums) { + const char* data = contents.data(); // Pointer to where Read put the data + PERF_TIMER_GUARD(block_checksum_time); + uint32_t value = DecodeFixed32(data + block_size + 1); + uint32_t actual = 0; + switch (footer.checksum()) { + case kCRC32c: + value = crc32c::Unmask(value); + actual = crc32c::Value(data, block_size + 1); + break; + case kxxHash: + actual = XXH32(data, static_cast(block_size) + 1, 0); + break; + default: + s = Status::Corruption( + "unknown checksum type " + ToString(footer.checksum()) + " in " + + file->file_name() + " offset " + ToString(handle.offset()) + + " size " + ToString(block_size)); + } + if (s.ok() && actual != value) { + s = Status::Corruption( + "block checksum mismatch: expected " + ToString(actual) + ", got " + + ToString(value) + " in " + file->file_name() + " offset " + + ToString(handle.offset()) + " size " + ToString(block_size)); + } + if (!s.ok()) { + return s; + } + } + return s; +} // Read a block and check its CRC // contents is the result of reading. @@ -281,53 +325,21 @@ Status ReadBlock(RandomAccessFileReader* file, const Footer& footer, return s; } if (contents->size() != n + kBlockTrailerSize) { - return Status::Corruption( - "truncated block read from " + file->file_name() + " offset " - + ToString(handle.offset()) + ", expected " - + ToString(n + kBlockTrailerSize) + " bytes, got " - + ToString(contents->size())); + return Status::Corruption("truncated block read from " + file->file_name() + + " offset " + ToString(handle.offset()) + + ", expected " + ToString(n + kBlockTrailerSize) + + " bytes, got " + ToString(contents->size())); } - - // Check the crc of the type and the block contents - const char* data = contents->data(); // Pointer to where Read put the data - if (options.verify_checksums) { - PERF_TIMER_GUARD(block_checksum_time); - uint32_t value = DecodeFixed32(data + n + 1); - uint32_t actual = 0; - switch (footer.checksum()) { - case kCRC32c: - value = crc32c::Unmask(value); - actual = crc32c::Value(data, n + 1); - break; - case kxxHash: - actual = XXH32(data, static_cast(n) + 1, 0); - break; - default: - s = Status::Corruption( - "unknown checksum type " + ToString(footer.checksum()) - + " in " + file->file_name() + " offset " - + ToString(handle.offset()) + " size " + ToString(n)); - } - if (s.ok() && actual != value) { - s = Status::Corruption( - "block checksum mismatch: expected " + ToString(actual) - + ", got " + ToString(value) + " in " + file->file_name() - + " offset " + ToString(handle.offset()) - + " size " + ToString(n)); - } - if (!s.ok()) { - return s; - } - } - return s; + return CheckBlockChecksum(options, footer, *contents, n, file, handle); } } // namespace -Status ReadBlockContents(RandomAccessFileReader* file, const Footer& footer, - const ReadOptions& read_options, +Status ReadBlockContents(RandomAccessFileReader* file, + FilePrefetchBuffer* prefetch_buffer, + const Footer& footer, const ReadOptions& read_options, const BlockHandle& handle, BlockContents* contents, - const ImmutableCFOptions &ioptions, + const ImmutableCFOptions& ioptions, bool decompression_requested, const Slice& compression_dict, const PersistentCacheOptions& cache_options) { @@ -357,8 +369,21 @@ Status ReadBlockContents(RandomAccessFileReader* file, const Footer& footer, } } - if (cache_options.persistent_cache && - cache_options.persistent_cache->IsCompressed()) { + bool got_from_prefetch_buffer = false; + if (prefetch_buffer != nullptr && + prefetch_buffer->TryReadFromCache( + handle.offset(), + static_cast(handle.size()) + kBlockTrailerSize, &slice)) { + status = + CheckBlockChecksum(read_options, footer, slice, + static_cast(handle.size()), file, handle); + if (!status.ok()) { + return status; + } + got_from_prefetch_buffer = true; + used_buf = const_cast(slice.data()); + } else if (cache_options.persistent_cache && + cache_options.persistent_cache->IsCompressed()) { // lookup uncompressed cache mode p-cache status = PersistentCacheHelper::LookupRawPage( cache_options, handle, &heap_buf, n + kBlockTrailerSize); @@ -366,42 +391,44 @@ Status ReadBlockContents(RandomAccessFileReader* file, const Footer& footer, status = Status::NotFound(); } - if (status.ok()) { - // cache hit - used_buf = heap_buf.get(); - slice = Slice(heap_buf.get(), n); - } else { - if (ioptions.info_log && !status.IsNotFound()) { - assert(!status.ok()); - ROCKS_LOG_INFO(ioptions.info_log, - "Error reading from persistent cache. %s", - status.ToString().c_str()); - } - // cache miss read from device - if (decompression_requested && - n + kBlockTrailerSize < DefaultStackBufferSize) { - // If we've got a small enough hunk of data, read it in to the - // trivially allocated stack buffer instead of needing a full malloc() - used_buf = &stack_buf[0]; - } else { - heap_buf = std::unique_ptr(new char[n + kBlockTrailerSize]); + if (!got_from_prefetch_buffer) { + if (status.ok()) { + // cache hit used_buf = heap_buf.get(); + slice = Slice(heap_buf.get(), n); + } else { + if (ioptions.info_log && !status.IsNotFound()) { + assert(!status.ok()); + ROCKS_LOG_INFO(ioptions.info_log, + "Error reading from persistent cache. %s", + status.ToString().c_str()); + } + // cache miss read from device + if (decompression_requested && + n + kBlockTrailerSize < DefaultStackBufferSize) { + // If we've got a small enough hunk of data, read it in to the + // trivially allocated stack buffer instead of needing a full malloc() + used_buf = &stack_buf[0]; + } else { + heap_buf = std::unique_ptr(new char[n + kBlockTrailerSize]); + used_buf = heap_buf.get(); + } + + status = ReadBlock(file, footer, read_options, handle, &slice, used_buf); + if (status.ok() && read_options.fill_cache && + cache_options.persistent_cache && + cache_options.persistent_cache->IsCompressed()) { + // insert to raw cache + PersistentCacheHelper::InsertRawPage(cache_options, handle, used_buf, + n + kBlockTrailerSize); + } } - status = ReadBlock(file, footer, read_options, handle, &slice, used_buf); - if (status.ok() && read_options.fill_cache && - cache_options.persistent_cache && - cache_options.persistent_cache->IsCompressed()) { - // insert to raw cache - PersistentCacheHelper::InsertRawPage(cache_options, handle, used_buf, - n + kBlockTrailerSize); + if (!status.ok()) { + return status; } } - if (!status.ok()) { - return status; - } - PERF_TIMER_GUARD(block_decompress_time); compression_type = static_cast(slice.data()[n]); @@ -416,14 +443,14 @@ Status ReadBlockContents(RandomAccessFileReader* file, const Footer& footer, *contents = BlockContents(Slice(slice.data(), n), false, compression_type); } else { // page is uncompressed, the buffer either stack or heap provided - if (used_buf == &stack_buf[0]) { + if (got_from_prefetch_buffer || used_buf == &stack_buf[0]) { heap_buf = std::unique_ptr(new char[n]); - memcpy(heap_buf.get(), stack_buf, n); + memcpy(heap_buf.get(), used_buf, n); } *contents = BlockContents(std::move(heap_buf), n, true, compression_type); } - if (status.ok() && read_options.fill_cache && + if (status.ok() && !got_from_prefetch_buffer && read_options.fill_cache && cache_options.persistent_cache && !cache_options.persistent_cache->IsCompressed()) { // insert to uncompressed cache diff --git a/table/format.h b/table/format.h index d89b1d312c..512b4a32bf 100644 --- a/table/format.h +++ b/table/format.h @@ -18,6 +18,7 @@ #include "options/cf_options.h" #include "port/port.h" // noexcept #include "table/persistent_cache_options.h" +#include "util/file_reader_writer.h" namespace rocksdb { @@ -173,8 +174,9 @@ class Footer { // Read the footer from file // If enforce_table_magic_number != 0, ReadFooterFromFile() will return // corruption if table_magic number is not equal to enforce_table_magic_number -Status ReadFooterFromFile(RandomAccessFileReader* file, uint64_t file_size, - Footer* footer, +Status ReadFooterFromFile(RandomAccessFileReader* file, + FilePrefetchBuffer* prefetch_buffer, + uint64_t file_size, Footer* footer, uint64_t enforce_table_magic_number = 0); // 1-byte type + 32-bit crc @@ -213,9 +215,9 @@ struct BlockContents { // Read the block identified by "handle" from "file". On failure // return non-OK. On success fill *result and return OK. extern Status ReadBlockContents( - RandomAccessFileReader* file, const Footer& footer, - const ReadOptions& options, const BlockHandle& handle, - BlockContents* contents, const ImmutableCFOptions &ioptions, + RandomAccessFileReader* file, FilePrefetchBuffer* prefetch_buffer, + const Footer& footer, const ReadOptions& options, const BlockHandle& handle, + BlockContents* contents, const ImmutableCFOptions& ioptions, bool do_uncompress = true, const Slice& compression_dict = Slice(), const PersistentCacheOptions& cache_options = PersistentCacheOptions()); diff --git a/table/meta_blocks.cc b/table/meta_blocks.cc index 5946e40fe0..1227bb0aeb 100644 --- a/table/meta_blocks.cc +++ b/table/meta_blocks.cc @@ -16,6 +16,7 @@ #include "table/persistent_cache_helper.h" #include "table/table_properties_internal.h" #include "util/coding.h" +#include "util/file_reader_writer.h" namespace rocksdb { @@ -159,7 +160,8 @@ bool NotifyCollectTableCollectorsOnFinish( } Status ReadProperties(const Slice& handle_value, RandomAccessFileReader* file, - const Footer& footer, const ImmutableCFOptions& ioptions, + FilePrefetchBuffer* prefetch_buffer, const Footer& footer, + const ImmutableCFOptions& ioptions, TableProperties** table_properties) { assert(table_properties); @@ -173,8 +175,8 @@ Status ReadProperties(const Slice& handle_value, RandomAccessFileReader* file, ReadOptions read_options; read_options.verify_checksums = false; Status s; - s = ReadBlockContents(file, footer, read_options, handle, &block_contents, - ioptions, false /* decompress */); + s = ReadBlockContents(file, prefetch_buffer, footer, read_options, handle, + &block_contents, ioptions, false /* decompress */); if (!s.ok()) { return s; @@ -277,7 +279,8 @@ Status ReadTableProperties(RandomAccessFileReader* file, uint64_t file_size, TableProperties** properties) { // -- Read metaindex block Footer footer; - auto s = ReadFooterFromFile(file, file_size, &footer, table_magic_number); + auto s = ReadFooterFromFile(file, nullptr /* prefetch_buffer */, file_size, + &footer, table_magic_number); if (!s.ok()) { return s; } @@ -286,8 +289,9 @@ Status ReadTableProperties(RandomAccessFileReader* file, uint64_t file_size, BlockContents metaindex_contents; ReadOptions read_options; read_options.verify_checksums = false; - s = ReadBlockContents(file, footer, read_options, metaindex_handle, - &metaindex_contents, ioptions, false /* decompress */); + s = ReadBlockContents(file, nullptr /* prefetch_buffer */, footer, + read_options, metaindex_handle, &metaindex_contents, + ioptions, false /* decompress */); if (!s.ok()) { return s; } @@ -305,7 +309,8 @@ Status ReadTableProperties(RandomAccessFileReader* file, uint64_t file_size, TableProperties table_properties; if (found_properties_block == true) { - s = ReadProperties(meta_iter->value(), file, footer, ioptions, properties); + s = ReadProperties(meta_iter->value(), file, nullptr /* prefetch_buffer */, + footer, ioptions, properties); } else { s = Status::NotFound(); } @@ -332,7 +337,8 @@ Status FindMetaBlock(RandomAccessFileReader* file, uint64_t file_size, const std::string& meta_block_name, BlockHandle* block_handle) { Footer footer; - auto s = ReadFooterFromFile(file, file_size, &footer, table_magic_number); + auto s = ReadFooterFromFile(file, nullptr /* prefetch_buffer */, file_size, + &footer, table_magic_number); if (!s.ok()) { return s; } @@ -341,8 +347,9 @@ Status FindMetaBlock(RandomAccessFileReader* file, uint64_t file_size, BlockContents metaindex_contents; ReadOptions read_options; read_options.verify_checksums = false; - s = ReadBlockContents(file, footer, read_options, metaindex_handle, - &metaindex_contents, ioptions, false /* do decompression */); + s = ReadBlockContents(file, nullptr /* prefetch_buffer */, footer, + read_options, metaindex_handle, &metaindex_contents, + ioptions, false /* do decompression */); if (!s.ok()) { return s; } @@ -355,14 +362,16 @@ Status FindMetaBlock(RandomAccessFileReader* file, uint64_t file_size, return FindMetaBlock(meta_iter.get(), meta_block_name, block_handle); } -Status ReadMetaBlock(RandomAccessFileReader* file, uint64_t file_size, +Status ReadMetaBlock(RandomAccessFileReader* file, + FilePrefetchBuffer* prefetch_buffer, uint64_t file_size, uint64_t table_magic_number, - const ImmutableCFOptions &ioptions, + const ImmutableCFOptions& ioptions, const std::string& meta_block_name, BlockContents* contents) { Status status; Footer footer; - status = ReadFooterFromFile(file, file_size, &footer, table_magic_number); + status = ReadFooterFromFile(file, prefetch_buffer, file_size, &footer, + table_magic_number); if (!status.ok()) { return status; } @@ -372,8 +381,8 @@ Status ReadMetaBlock(RandomAccessFileReader* file, uint64_t file_size, BlockContents metaindex_contents; ReadOptions read_options; read_options.verify_checksums = false; - status = ReadBlockContents(file, footer, read_options, metaindex_handle, - &metaindex_contents, ioptions, + status = ReadBlockContents(file, prefetch_buffer, footer, read_options, + metaindex_handle, &metaindex_contents, ioptions, false /* decompress */); if (!status.ok()) { return status; @@ -394,8 +403,9 @@ Status ReadMetaBlock(RandomAccessFileReader* file, uint64_t file_size, } // Reading metablock - return ReadBlockContents(file, footer, read_options, block_handle, contents, - ioptions, false /* decompress */); + return ReadBlockContents(file, prefetch_buffer, footer, read_options, + block_handle, contents, ioptions, + false /* decompress */); } } // namespace rocksdb diff --git a/table/meta_blocks.h b/table/meta_blocks.h index ddb685360d..220985d9e1 100644 --- a/table/meta_blocks.h +++ b/table/meta_blocks.h @@ -94,7 +94,8 @@ bool NotifyCollectTableCollectorsOnFinish( // *table_properties will point to a heap-allocated TableProperties // object, otherwise value of `table_properties` will not be modified. Status ReadProperties(const Slice& handle_value, RandomAccessFileReader* file, - const Footer& footer, const ImmutableCFOptions &ioptions, + FilePrefetchBuffer* prefetch_buffer, const Footer& footer, + const ImmutableCFOptions& ioptions, TableProperties** table_properties); // Directly read the properties from the properties block of a plain table. @@ -121,9 +122,10 @@ Status FindMetaBlock(RandomAccessFileReader* file, uint64_t file_size, // Read the specified meta block with name meta_block_name // from `file` and initialize `contents` with contents of this block. // Return Status::OK in case of success. -Status ReadMetaBlock(RandomAccessFileReader* file, uint64_t file_size, +Status ReadMetaBlock(RandomAccessFileReader* file, + FilePrefetchBuffer* prefetch_buffer, uint64_t file_size, uint64_t table_magic_number, - const ImmutableCFOptions &ioptions, + const ImmutableCFOptions& ioptions, const std::string& meta_block_name, BlockContents* contents); diff --git a/table/partitioned_filter_block.cc b/table/partitioned_filter_block.cc index 2b330039e5..d3d7949d09 100644 --- a/table/partitioned_filter_block.cc +++ b/table/partitioned_filter_block.cc @@ -132,7 +132,8 @@ bool PartitionedFilterBlockReader::KeyMayMatch( return false; } bool cached = false; - auto filter_partition = GetFilterPartition(&filter_handle, no_io, &cached); + auto filter_partition = GetFilterPartition(nullptr /* prefetch_buffer */, + &filter_handle, no_io, &cached); if (UNLIKELY(!filter_partition.value)) { return true; } @@ -164,7 +165,8 @@ bool PartitionedFilterBlockReader::PrefixMayMatch( return false; } bool cached = false; - auto filter_partition = GetFilterPartition(&filter_handle, no_io, &cached); + auto filter_partition = GetFilterPartition(nullptr /* prefetch_buffer */, + &filter_handle, no_io, &cached); if (UNLIKELY(!filter_partition.value)) { return true; } @@ -194,9 +196,9 @@ Slice PartitionedFilterBlockReader::GetFilterPartitionHandle( } BlockBasedTable::CachableEntry -PartitionedFilterBlockReader::GetFilterPartition(Slice* handle_value, - const bool no_io, - bool* cached) { +PartitionedFilterBlockReader::GetFilterPartition( + FilePrefetchBuffer* prefetch_buffer, Slice* handle_value, const bool no_io, + bool* cached) { BlockHandle fltr_blk_handle; auto s = fltr_blk_handle.DecodeFrom(handle_value); assert(s.ok()); @@ -232,7 +234,8 @@ PartitionedFilterBlockReader::GetFilterPartition(Slice* handle_value, } return filter; } else { - auto filter = table_->ReadFilter(fltr_blk_handle, is_a_filter_partition); + auto filter = table_->ReadFilter(prefetch_buffer, fltr_blk_handle, + is_a_filter_partition); return {filter, nullptr}; } } diff --git a/table/partitioned_filter_block.h b/table/partitioned_filter_block.h index 6c4a5d7b9d..d408175390 100644 --- a/table/partitioned_filter_block.h +++ b/table/partitioned_filter_block.h @@ -86,7 +86,8 @@ class PartitionedFilterBlockReader : public FilterBlockReader { private: Slice GetFilterPartitionHandle(const Slice& entry); BlockBasedTable::CachableEntry GetFilterPartition( - Slice* handle, const bool no_io, bool* cached); + FilePrefetchBuffer* prefetch_buffer, Slice* handle, const bool no_io, + bool* cached); const SliceTransform* prefix_extractor_; std::unique_ptr idx_on_fltr_blk_; diff --git a/table/plain_table_reader.cc b/table/plain_table_reader.cc index 92933b34ba..d4d9edb741 100644 --- a/table/plain_table_reader.cc +++ b/table/plain_table_reader.cc @@ -291,9 +291,10 @@ Status PlainTableReader::PopulateIndex(TableProperties* props, table_properties_.reset(props); BlockContents index_block_contents; - Status s = ReadMetaBlock( - file_info_.file.get(), file_size_, kPlainTableMagicNumber, ioptions_, - PlainTableIndexBuilder::kPlainTableIndexBlock, &index_block_contents); + Status s = ReadMetaBlock(file_info_.file.get(), nullptr /* prefetch_buffer */, + file_size_, kPlainTableMagicNumber, ioptions_, + PlainTableIndexBuilder::kPlainTableIndexBlock, + &index_block_contents); bool index_in_file = s.ok(); @@ -301,9 +302,9 @@ Status PlainTableReader::PopulateIndex(TableProperties* props, bool bloom_in_file = false; // We only need to read the bloom block if index block is in file. if (index_in_file) { - s = ReadMetaBlock(file_info_.file.get(), file_size_, kPlainTableMagicNumber, - ioptions_, BloomBlockBuilder::kBloomBlock, - &bloom_block_contents); + s = ReadMetaBlock(file_info_.file.get(), nullptr /* prefetch_buffer */, + file_size_, kPlainTableMagicNumber, ioptions_, + BloomBlockBuilder::kBloomBlock, &bloom_block_contents); bloom_in_file = s.ok() && bloom_block_contents.data.size() > 0; } diff --git a/tools/sst_dump_tool.cc b/tools/sst_dump_tool.cc index e6322f8b4d..fa89e6cdd6 100644 --- a/tools/sst_dump_tool.cc +++ b/tools/sst_dump_tool.cc @@ -79,7 +79,8 @@ Status SstFileReader::GetTableReader(const std::string& file_path) { file_.reset(new RandomAccessFileReader(std::move(file), file_path)); if (s.ok()) { - s = ReadFooterFromFile(file_.get(), file_size, &footer); + s = ReadFooterFromFile(file_.get(), nullptr /* prefetch_buffer */, + file_size, &footer); } if (s.ok()) { magic_number = footer.table_magic_number(); diff --git a/util/file_reader_writer.cc b/util/file_reader_writer.cc index 22ab712878..f46b78fa06 100644 --- a/util/file_reader_writer.cc +++ b/util/file_reader_writer.cc @@ -603,6 +603,34 @@ class ReadaheadRandomAccessFile : public RandomAccessFile { }; } // namespace +Status FilePrefetchBuffer::Prefetch(RandomAccessFileReader* reader, + uint64_t offset, size_t n) { + size_t alignment = reader->file()->GetRequiredBufferAlignment(); + uint64_t roundup_offset = Roundup(offset, alignment); + uint64_t roundup_len = Roundup(n, alignment); + buffer_.Alignment(alignment); + buffer_.AllocateNewBuffer(roundup_len); + + Slice result; + Status s = + reader->Read(roundup_offset, roundup_len, &result, buffer_.BufferStart()); + if (s.ok()) { + buffer_offset_ = roundup_offset; + buffer_len_ = result.size(); + } + return s; +} + +bool FilePrefetchBuffer::TryReadFromCache(uint64_t offset, size_t n, + Slice* result) const { + if (offset < buffer_offset_ || offset + n > buffer_offset_ + buffer_len_) { + return false; + } + uint64_t offset_in_buffer = offset - buffer_offset_; + *result = Slice(buffer_.BufferStart() + offset_in_buffer, n); + return true; +} + std::unique_ptr NewReadaheadRandomAccessFile( std::unique_ptr&& file, size_t readahead_size) { std::unique_ptr result( diff --git a/util/file_reader_writer.h b/util/file_reader_writer.h index deed73c38d..9be6924582 100644 --- a/util/file_reader_writer.h +++ b/util/file_reader_writer.h @@ -196,6 +196,17 @@ class WritableFileWriter { Status SyncInternal(bool use_fsync); }; +class FilePrefetchBuffer { + public: + Status Prefetch(RandomAccessFileReader* reader, uint64_t offset, size_t n); + bool TryReadFromCache(uint64_t offset, size_t n, Slice* result) const; + + private: + AlignedBuffer buffer_; + uint64_t buffer_offset_; + size_t buffer_len_; +}; + extern Status NewWritableFile(Env* env, const std::string& fname, unique_ptr* result, const EnvOptions& options);