diff --git a/HISTORY.md b/HISTORY.md index 7aaee41798..45938eae5f 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -4,6 +4,7 @@ * Added new ChecksumType kXXH3 which is faster than kCRC32c on almost all x86\_64 hardware. * Added a new online consistency check for BlobDB which validates that the number/total size of garbage blobs does not exceed the number/total size of all blobs in any given blob file. * Provided support for tracking per-sst user-defined timestamp information in MANIFEST. +* Added new option "adaptive_readahead" in ReadOptions. For iterators, RocksDB does auto-readahead on noticing sequential reads and by enabling this option, readahead_size of current file (if reads are sequential) will be carried forward to next file instead of starting from the scratch at each level (except L0 level files). If reads are not sequential it will fall back to 8KB. This option is applicable only for RocksDB internal prefetch buffer and isn't supported with underlying file system prefetching. ### Bug Fixes * Prevent a `CompactRange()` with `CompactRangeOptions::change_level == true` from possibly causing corruption to the LSM state (overlapping files within a level) when run in parallel with another manual compaction. Note that setting `force_consistency_checks == true` (the default) would cause the DB to enter read-only mode in this scenario and return `Status::Corruption`, rather than committing any corruption. diff --git a/db/version_set.cc b/db/version_set.cc index 98a65a685c..d46c7bc505 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -880,7 +880,8 @@ class LevelIterator final : public InternalIterator { level_(level), range_del_agg_(range_del_agg), pinned_iters_mgr_(nullptr), - compaction_boundaries_(compaction_boundaries) { + compaction_boundaries_(compaction_boundaries), + is_next_read_sequential_(false) { // Empty level is not supported. assert(flevel_ != nullptr && flevel_->num_files > 0); } @@ -1027,6 +1028,8 @@ class LevelIterator final : public InternalIterator { // To be propagated to RangeDelAggregator in order to safely truncate range // tombstones. const std::vector* compaction_boundaries_; + + bool is_next_read_sequential_; }; void LevelIterator::Seek(const Slice& target) { @@ -1128,7 +1131,9 @@ bool LevelIterator::NextAndGetResult(IterateResult* result) { assert(Valid()); bool is_valid = file_iter_.NextAndGetResult(result); if (!is_valid) { + is_next_read_sequential_ = true; SkipEmptyFileForward(); + is_next_read_sequential_ = false; is_valid = Valid(); if (is_valid) { result->key = key(); @@ -1195,6 +1200,12 @@ void LevelIterator::SetFileIterator(InternalIterator* iter) { } InternalIterator* old_iter = file_iter_.Set(iter); + + // Update the read pattern for PrefetchBuffer. + if (is_next_read_sequential_) { + file_iter_.UpdateReadaheadState(old_iter); + } + if (pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled()) { pinned_iters_mgr_->PinIterator(old_iter); } else { diff --git a/file/file_prefetch_buffer.cc b/file/file_prefetch_buffer.cc index 1fe5a367ee..a04d66a2a9 100644 --- a/file/file_prefetch_buffer.cc +++ b/file/file_prefetch_buffer.cc @@ -161,6 +161,8 @@ bool FilePrefetchBuffer::TryReadFromCache(const IOOptions& opts, #endif return false; } + TEST_SYNC_POINT_CALLBACK("FilePrefetchBuffer::TryReadFromCache", + &readahead_size_); readahead_size_ = std::min(max_readahead_size_, readahead_size_ * 2); } else { return false; diff --git a/file/file_prefetch_buffer.h b/file/file_prefetch_buffer.h index 980496d285..e741a2cba0 100644 --- a/file/file_prefetch_buffer.h +++ b/file/file_prefetch_buffer.h @@ -8,11 +8,13 @@ // found in the LICENSE file. See the AUTHORS file for names of contributors. #pragma once +#include #include #include #include #include "file/random_access_file_reader.h" +#include "file/readahead_file_info.h" #include "port/port.h" #include "rocksdb/env.h" #include "rocksdb/options.h" @@ -20,6 +22,8 @@ namespace ROCKSDB_NAMESPACE { +#define DEAFULT_DECREMENT 8 * 1024 + // FilePrefetchBuffer is a smart buffer to store and read data from a file. class FilePrefetchBuffer { public: @@ -90,7 +94,14 @@ class FilePrefetchBuffer { // tracked if track_min_offset = true. size_t min_offset_read() const { return min_offset_read_; } - void UpdateReadPattern(const size_t& offset, const size_t& len) { + void UpdateReadPattern(const uint64_t& offset, const size_t& len, + bool is_adaptive_readahead = false) { + if (is_adaptive_readahead) { + // Since this block was eligible for prefetch but it was found in + // cache, so check and decrease the readahead_size by 8KB (default) + // if eligible. + DecreaseReadAheadIfEligible(offset, len); + } prev_offset_ = offset; prev_len_ = len; } @@ -104,11 +115,40 @@ class FilePrefetchBuffer { readahead_size_ = initial_readahead_size_; } + void GetReadaheadState(ReadaheadFileInfo::ReadaheadInfo* readahead_info) { + readahead_info->readahead_size = readahead_size_; + readahead_info->num_file_reads = num_file_reads_; + } + + void DecreaseReadAheadIfEligible(uint64_t offset, size_t size, + size_t value = DEAFULT_DECREMENT) { + // Decrease the readahead_size if + // - its enabled internally by RocksDB (implicit_auto_readahead_) and, + // - readahead_size is greater than 0 and, + // - this block would have called prefetch API if not found in cache for + // which conditions are: + // - few/no bytes are in buffer and, + // - block is sequential with the previous read and, + // - num_file_reads_ + 1 (including this read) > + // kMinNumFileReadsToStartAutoReadahead + if (implicit_auto_readahead_ && readahead_size_ > 0) { + if ((offset + size > buffer_offset_ + buffer_.CurrentSize()) && + IsBlockSequential(offset) && + (num_file_reads_ + 1 > kMinNumFileReadsToStartAutoReadahead)) { + readahead_size_ = + std::max(initial_readahead_size_, + (readahead_size_ >= value ? readahead_size_ - value : 0)); + } + } + } + private: AlignedBuffer buffer_; uint64_t buffer_offset_; RandomAccessFileReader* file_reader_; size_t readahead_size_; + // FilePrefetchBuffer object won't be created from Iterator flow if + // max_readahead_size_ = 0. size_t max_readahead_size_; size_t initial_readahead_size_; // The minimum `offset` ever passed to TryReadFromCache(). @@ -120,11 +160,11 @@ class FilePrefetchBuffer { // can be fetched from min_offset_read(). bool track_min_offset_; - // implicit_auto_readahead is enabled by rocksdb internally after 2 sequential - // IOs. + // implicit_auto_readahead is enabled by rocksdb internally after 2 + // sequential IOs. bool implicit_auto_readahead_; - size_t prev_offset_; + uint64_t prev_offset_; size_t prev_len_; - int num_file_reads_; + int64_t num_file_reads_; }; } // namespace ROCKSDB_NAMESPACE diff --git a/file/prefetch_test.cc b/file/prefetch_test.cc index 569326fe03..6c2769422c 100644 --- a/file/prefetch_test.cc +++ b/file/prefetch_test.cc @@ -670,6 +670,309 @@ TEST_P(PrefetchTest, PrefetchWhenReseekwithCache) { Close(); } +class PrefetchTest1 : public DBTestBase, + public ::testing::WithParamInterface { + public: + PrefetchTest1() : DBTestBase("prefetch_test1", true) {} +}; + +INSTANTIATE_TEST_CASE_P(PrefetchTest1, PrefetchTest1, ::testing::Bool()); + +#ifndef ROCKSDB_LITE +TEST_P(PrefetchTest1, DBIterLevelReadAhead) { + const int kNumKeys = 1000; + // Set options + std::shared_ptr fs = + std::make_shared(env_->GetFileSystem(), false); + std::unique_ptr env(new CompositeEnvWrapper(env_, fs)); + + Options options = CurrentOptions(); + options.write_buffer_size = 1024; + options.create_if_missing = true; + options.compression = kNoCompression; + options.env = env.get(); + if (GetParam()) { + options.use_direct_reads = true; + options.use_direct_io_for_flush_and_compaction = true; + } + BlockBasedTableOptions table_options; + table_options.no_block_cache = true; + table_options.cache_index_and_filter_blocks = false; + table_options.metadata_block_size = 1024; + table_options.index_type = + BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch; + options.table_factory.reset(NewBlockBasedTableFactory(table_options)); + + Status s = TryReopen(options); + if (GetParam() && (s.IsNotSupported() || s.IsInvalidArgument())) { + // If direct IO is not supported, skip the test + return; + } else { + ASSERT_OK(s); + } + + WriteBatch batch; + Random rnd(309); + for (int j = 0; j < 5; j++) { + for (int i = j * kNumKeys; i < (j + 1) * kNumKeys; i++) { + ASSERT_OK(batch.Put(BuildKey(i), rnd.RandomString(1000))); + } + ASSERT_OK(db_->Write(WriteOptions(), &batch)); + ASSERT_OK(Flush()); + } + MoveFilesToLevel(2); + int buff_prefetch_count = 0; + int readahead_carry_over_count = 0; + int num_sst_files = NumTableFilesAtLevel(2); + size_t current_readahead_size = 0; + + // Test - Iterate over the keys sequentially. + { + SyncPoint::GetInstance()->SetCallBack( + "FilePrefetchBuffer::Prefetch:Start", + [&](void*) { buff_prefetch_count++; }); + + // The callback checks, since reads are sequential, readahead_size doesn't + // start from 8KB when iterator moves to next file and its called + // num_sst_files-1 times (excluding for first file). + SyncPoint::GetInstance()->SetCallBack( + "BlockPrefetcher::SetReadaheadState", [&](void* arg) { + readahead_carry_over_count++; + size_t readahead_size = *reinterpret_cast(arg); + if (readahead_carry_over_count) { + ASSERT_GT(readahead_size, 8 * 1024); + // ASSERT_GE(readahead_size, current_readahead_size); + } + }); + + SyncPoint::GetInstance()->SetCallBack( + "FilePrefetchBuffer::TryReadFromCache", [&](void* arg) { + current_readahead_size = *reinterpret_cast(arg); + }); + + SyncPoint::GetInstance()->EnableProcessing(); + + ReadOptions ro; + ro.adaptive_readahead = true; + auto iter = std::unique_ptr(db_->NewIterator(ro)); + int num_keys = 0; + for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { + num_keys++; + } + + ASSERT_GT(buff_prefetch_count, 0); + buff_prefetch_count = 0; + // For index and data blocks. + ASSERT_EQ(readahead_carry_over_count, 2 * (num_sst_files - 1)); + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + } + Close(); +} + +TEST_P(PrefetchTest1, NonSequentialReads) { + const int kNumKeys = 1000; + // Set options + std::shared_ptr fs = + std::make_shared(env_->GetFileSystem(), false); + std::unique_ptr env(new CompositeEnvWrapper(env_, fs)); + + Options options = CurrentOptions(); + options.write_buffer_size = 1024; + options.create_if_missing = true; + options.compression = kNoCompression; + options.env = env.get(); + if (GetParam()) { + options.use_direct_reads = true; + options.use_direct_io_for_flush_and_compaction = true; + } + BlockBasedTableOptions table_options; + table_options.no_block_cache = true; + table_options.cache_index_and_filter_blocks = false; + table_options.metadata_block_size = 1024; + table_options.index_type = + BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch; + options.table_factory.reset(NewBlockBasedTableFactory(table_options)); + + Status s = TryReopen(options); + if (GetParam() && (s.IsNotSupported() || s.IsInvalidArgument())) { + // If direct IO is not supported, skip the test + return; + } else { + ASSERT_OK(s); + } + + WriteBatch batch; + Random rnd(309); + for (int j = 0; j < 5; j++) { + for (int i = j * kNumKeys; i < (j + 1) * kNumKeys; i++) { + ASSERT_OK(batch.Put(BuildKey(i), rnd.RandomString(1000))); + } + ASSERT_OK(db_->Write(WriteOptions(), &batch)); + ASSERT_OK(Flush()); + } + MoveFilesToLevel(2); + + int buff_prefetch_count = 0; + int set_readahead = 0; + size_t readahead_size = 0; + + SyncPoint::GetInstance()->SetCallBack("FilePrefetchBuffer::Prefetch:Start", + [&](void*) { buff_prefetch_count++; }); + SyncPoint::GetInstance()->SetCallBack( + "BlockPrefetcher::SetReadaheadState", + [&](void* /*arg*/) { set_readahead++; }); + SyncPoint::GetInstance()->SetCallBack( + "FilePrefetchBuffer::TryReadFromCache", + [&](void* arg) { readahead_size = *reinterpret_cast(arg); }); + + SyncPoint::GetInstance()->EnableProcessing(); + + { + // Iterate until prefetch is done. + ReadOptions ro; + ro.adaptive_readahead = true; + auto iter = std::unique_ptr(db_->NewIterator(ro)); + iter->SeekToFirst(); + while (iter->Valid() && buff_prefetch_count == 0) { + iter->Next(); + } + ASSERT_EQ(readahead_size, 8 * 1024); + ASSERT_EQ(buff_prefetch_count, 1); + ASSERT_EQ(set_readahead, 0); + buff_prefetch_count = 0; + + // Move to last file and check readahead size fallbacks to 8KB. So next + // readahead size after prefetch should be 8 * 1024; + iter->Seek(BuildKey(4004)); + while (iter->Valid() && buff_prefetch_count == 0) { + iter->Next(); + } + ASSERT_EQ(readahead_size, 8 * 1024); + ASSERT_EQ(set_readahead, 0); + ASSERT_EQ(buff_prefetch_count, 1); + } + Close(); +} +#endif //! ROCKSDB_LITE + +TEST_P(PrefetchTest1, DecreaseReadAheadIfInCache) { + const int kNumKeys = 2000; + // Set options + std::shared_ptr fs = + std::make_shared(env_->GetFileSystem(), false); + std::unique_ptr env(new CompositeEnvWrapper(env_, fs)); + + Options options = CurrentOptions(); + options.write_buffer_size = 1024; + options.create_if_missing = true; + options.compression = kNoCompression; + options.env = env.get(); + if (GetParam()) { + options.use_direct_reads = true; + options.use_direct_io_for_flush_and_compaction = true; + } + BlockBasedTableOptions table_options; + std::shared_ptr cache = NewLRUCache(4 * 1024 * 1024, 2); // 8MB + table_options.block_cache = cache; + table_options.cache_index_and_filter_blocks = false; + table_options.metadata_block_size = 1024; + table_options.index_type = + BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch; + options.table_factory.reset(NewBlockBasedTableFactory(table_options)); + + Status s = TryReopen(options); + if (GetParam() && (s.IsNotSupported() || s.IsInvalidArgument())) { + // If direct IO is not supported, skip the test + return; + } else { + ASSERT_OK(s); + } + + WriteBatch batch; + Random rnd(309); + for (int i = 0; i < kNumKeys; i++) { + ASSERT_OK(batch.Put(BuildKey(i), rnd.RandomString(1000))); + } + ASSERT_OK(db_->Write(WriteOptions(), &batch)); + + std::string start_key = BuildKey(0); + std::string end_key = BuildKey(kNumKeys - 1); + Slice least(start_key.data(), start_key.size()); + Slice greatest(end_key.data(), end_key.size()); + + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &least, &greatest)); + + int buff_prefetch_count = 0; + size_t current_readahead_size = 0; + size_t expected_current_readahead_size = 8 * 1024; + size_t decrease_readahead_size = 8 * 1024; + + SyncPoint::GetInstance()->SetCallBack("FilePrefetchBuffer::Prefetch:Start", + [&](void*) { buff_prefetch_count++; }); + SyncPoint::GetInstance()->SetCallBack( + "FilePrefetchBuffer::TryReadFromCache", [&](void* arg) { + current_readahead_size = *reinterpret_cast(arg); + }); + + SyncPoint::GetInstance()->EnableProcessing(); + ReadOptions ro; + ro.adaptive_readahead = true; + { + /* + * Reseek keys from sequential Data Blocks within same partitioned + * index. After 2 sequential reads it will prefetch the data block. + * Data Block size is nearly 4076 so readahead will fetch 8 * 1024 data + * more initially (2 more data blocks). + */ + auto iter = std::unique_ptr(db_->NewIterator(ro)); + // Warm up the cache + iter->Seek(BuildKey(1011)); + iter->Seek(BuildKey(1015)); + iter->Seek(BuildKey(1019)); + buff_prefetch_count = 0; + } + { + // After caching, blocks will be read from cache (Sequential blocks) + auto iter = std::unique_ptr(db_->NewIterator(ro)); + iter->Seek(BuildKey(0)); + iter->Seek(BuildKey(1000)); + iter->Seek(BuildKey(1004)); // Prefetch data (not in cache). + ASSERT_EQ(current_readahead_size, expected_current_readahead_size); + + // Missed one sequential block but 1011 is already in buffer so + // readahead will not be reset. + iter->Seek(BuildKey(1011)); + ASSERT_EQ(current_readahead_size, expected_current_readahead_size); + + // Eligible to Prefetch data (not in buffer) but block is in cache so no + // prefetch will happen and will result in decrease in readahead_size. + // readahead_size will be 8 * 1024 + iter->Seek(BuildKey(1015)); + expected_current_readahead_size -= decrease_readahead_size; + + // 1016 is the same block as 1015. So no change in readahead_size. + iter->Seek(BuildKey(1016)); + + // Prefetch data (not in buffer) but found in cache. So decrease + // readahead_size. Since it will 0 after decrementing so readahead_size will + // be set to initial value. + iter->Seek(BuildKey(1019)); + expected_current_readahead_size = std::max( + decrease_readahead_size, + (expected_current_readahead_size >= decrease_readahead_size + ? (expected_current_readahead_size - decrease_readahead_size) + : 0)); + + // Prefetch next sequential data. + iter->Seek(BuildKey(1022)); + ASSERT_EQ(current_readahead_size, expected_current_readahead_size); + ASSERT_EQ(buff_prefetch_count, 2); + buff_prefetch_count = 0; + } + Close(); +} + } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) { diff --git a/file/readahead_file_info.h b/file/readahead_file_info.h new file mode 100644 index 0000000000..f0208bf2dd --- /dev/null +++ b/file/readahead_file_info.h @@ -0,0 +1,33 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#include +#include + +#include "rocksdb/rocksdb_namespace.h" + +namespace ROCKSDB_NAMESPACE { + +// struct ReadaheadFileInfo contains readahead information that is passed from +// one file to another file per level during iterations. This information helps +// iterators to carry forward the internal automatic prefetching readahead value +// to next file during sequential reads instead of starting from the scratch. + +struct ReadaheadFileInfo { + struct ReadaheadInfo { + size_t readahead_size = 0; + int64_t num_file_reads = 0; + }; + + // Used by Data block iterators to update readahead info. + ReadaheadInfo data_block_readahead_info; + + // Used by Index block iterators to update readahead info. + ReadaheadInfo index_block_readahead_info; +}; + +} // namespace ROCKSDB_NAMESPACE diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 51020831c0..8f49950c7b 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -1602,6 +1602,19 @@ struct ReadOptions { // Default: std::numeric_limits::max() uint64_t value_size_soft_limit; + // For iterators, RocksDB does auto-readahead on noticing more than two + // sequential reads for a table file if user doesn't provide readahead_size. + // The readahead starts at 8KB and doubles on every additional read upto + // max_auto_readahead_size only when reads are sequential. However at each + // level, if iterator moves over next file, readahead_size starts again from + // 8KB. + // + // By enabling this option, RocksDB will do some enhancements for + // prefetching the data. + // + // Default: false + bool adaptive_readahead; + ReadOptions(); ReadOptions(bool cksum, bool cache); }; diff --git a/options/options.cc b/options/options.cc index 9a8eee209b..aa16663f9b 100644 --- a/options/options.cc +++ b/options/options.cc @@ -650,7 +650,8 @@ ReadOptions::ReadOptions() iter_start_ts(nullptr), deadline(std::chrono::microseconds::zero()), io_timeout(std::chrono::microseconds::zero()), - value_size_soft_limit(std::numeric_limits::max()) {} + value_size_soft_limit(std::numeric_limits::max()), + adaptive_readahead(false) {} ReadOptions::ReadOptions(bool cksum, bool cache) : snapshot(nullptr), @@ -674,6 +675,7 @@ ReadOptions::ReadOptions(bool cksum, bool cache) iter_start_ts(nullptr), deadline(std::chrono::microseconds::zero()), io_timeout(std::chrono::microseconds::zero()), - value_size_soft_limit(std::numeric_limits::max()) {} + value_size_soft_limit(std::numeric_limits::max()), + adaptive_readahead(false) {} } // namespace ROCKSDB_NAMESPACE diff --git a/table/block_based/block_based_table_iterator.cc b/table/block_based/block_based_table_iterator.cc index da854e8733..fee006d15a 100644 --- a/table/block_based/block_based_table_iterator.cc +++ b/table/block_based/block_based_table_iterator.cc @@ -235,7 +235,6 @@ void BlockBasedTableIterator::InitDataBlock() { block_prefetcher_.PrefetchIfNeeded(rep, data_block_handle, read_options_.readahead_size, is_for_compaction); - Status s; table_->NewDataBlockIterator( read_options_, data_block_handle, &block_iter_, BlockType::kData, diff --git a/table/block_based/block_based_table_iterator.h b/table/block_based/block_based_table_iterator.h index 181e7824b1..e9172907ad 100644 --- a/table/block_based/block_based_table_iterator.h +++ b/table/block_based/block_based_table_iterator.h @@ -27,11 +27,11 @@ class BlockBasedTableIterator : public InternalIteratorBase { bool check_filter, bool need_upper_bound_check, const SliceTransform* prefix_extractor, TableReaderCaller caller, size_t compaction_readahead_size = 0, bool allow_unprepared_value = false) - : table_(table), + : index_iter_(std::move(index_iter)), + table_(table), read_options_(read_options), icomp_(icomp), user_comparator_(icomp.user_comparator()), - index_iter_(std::move(index_iter)), pinned_iters_mgr_(nullptr), prefix_extractor_(prefix_extractor), lookup_context_(caller), @@ -149,6 +149,27 @@ class BlockBasedTableIterator : public InternalIteratorBase { } } + void GetReadaheadState(ReadaheadFileInfo* readahead_file_info) override { + if (block_prefetcher_.prefetch_buffer() != nullptr && + read_options_.adaptive_readahead) { + block_prefetcher_.prefetch_buffer()->GetReadaheadState( + &(readahead_file_info->data_block_readahead_info)); + if (index_iter_) { + index_iter_->GetReadaheadState(readahead_file_info); + } + } + } + + void SetReadaheadState(ReadaheadFileInfo* readahead_file_info) override { + block_prefetcher_.SetReadaheadState( + &(readahead_file_info->data_block_readahead_info)); + if (index_iter_) { + index_iter_->SetReadaheadState(readahead_file_info); + } + } + + std::unique_ptr> index_iter_; + private: enum class IterDirection { kForward, @@ -187,7 +208,6 @@ class BlockBasedTableIterator : public InternalIteratorBase { const ReadOptions& read_options_; const InternalKeyComparator& icomp_; UserComparatorWrapper user_comparator_; - std::unique_ptr> index_iter_; PinnedIteratorsManager* pinned_iters_mgr_; DataBlockIter block_iter_; const SliceTransform* prefix_extractor_; diff --git a/table/block_based/block_based_table_reader.cc b/table/block_based/block_based_table_reader.cc index 96d8895e2c..75b9994ff7 100644 --- a/table/block_based/block_based_table_reader.cc +++ b/table/block_based/block_based_table_reader.cc @@ -1523,8 +1523,8 @@ Status BlockBasedTable::MaybeReadBlockAndLoadToCache( // Update the block details so that PrefetchBuffer can use the read // pattern to determine if reads are sequential or not for // prefetching. It should also take in account blocks read from cache. - prefetch_buffer->UpdateReadPattern(handle.offset(), - block_size(handle)); + prefetch_buffer->UpdateReadPattern( + handle.offset(), block_size(handle), ro.adaptive_readahead); } } } diff --git a/table/block_based/block_prefetcher.cc b/table/block_based/block_prefetcher.cc index d9ef162c63..2ecfc751ec 100644 --- a/table/block_based/block_prefetcher.cc +++ b/table/block_based/block_prefetcher.cc @@ -62,13 +62,12 @@ void BlockPrefetcher::PrefetchIfNeeded(const BlockBasedTable::Rep* rep, return; } - size_t initial_auto_readahead_size = BlockBasedTable::kInitAutoReadaheadSize; - if (initial_auto_readahead_size > max_auto_readahead_size) { - initial_auto_readahead_size = max_auto_readahead_size; + if (initial_auto_readahead_size_ > max_auto_readahead_size) { + initial_auto_readahead_size_ = max_auto_readahead_size; } if (rep->file->use_direct_io()) { - rep->CreateFilePrefetchBufferIfNotExists(initial_auto_readahead_size, + rep->CreateFilePrefetchBufferIfNotExists(initial_auto_readahead_size_, max_auto_readahead_size, &prefetch_buffer_, true); return; @@ -84,7 +83,7 @@ void BlockPrefetcher::PrefetchIfNeeded(const BlockBasedTable::Rep* rep, Status s = rep->file->Prefetch(handle.offset(), block_size(handle) + readahead_size_); if (s.IsNotSupported()) { - rep->CreateFilePrefetchBufferIfNotExists(initial_auto_readahead_size, + rep->CreateFilePrefetchBufferIfNotExists(initial_auto_readahead_size_, max_auto_readahead_size, &prefetch_buffer_, true); return; diff --git a/table/block_based/block_prefetcher.h b/table/block_based/block_prefetcher.h index 30b3d5eb2c..35c5eceb50 100644 --- a/table/block_based/block_prefetcher.h +++ b/table/block_based/block_prefetcher.h @@ -19,31 +19,44 @@ class BlockPrefetcher { bool is_for_compaction); FilePrefetchBuffer* prefetch_buffer() { return prefetch_buffer_.get(); } - void UpdateReadPattern(const size_t& offset, const size_t& len) { + void UpdateReadPattern(const uint64_t& offset, const size_t& len) { prev_offset_ = offset; prev_len_ = len; } - bool IsBlockSequential(const size_t& offset) { + bool IsBlockSequential(const uint64_t& offset) { return (prev_len_ == 0 || (prev_offset_ + prev_len_ == offset)); } void ResetValues() { num_file_reads_ = 1; readahead_size_ = BlockBasedTable::kInitAutoReadaheadSize; + initial_auto_readahead_size_ = readahead_size_; readahead_limit_ = 0; return; } + void SetReadaheadState(ReadaheadFileInfo::ReadaheadInfo* readahead_info) { + num_file_reads_ = readahead_info->num_file_reads; + initial_auto_readahead_size_ = readahead_info->readahead_size; + TEST_SYNC_POINT_CALLBACK("BlockPrefetcher::SetReadaheadState", + &initial_auto_readahead_size_); + } + private: // Readahead size used in compaction, its value is used only if // lookup_context_.caller = kCompaction. size_t compaction_readahead_size_; + // readahead_size_ is used if underlying FS supports prefetching. size_t readahead_size_ = BlockBasedTable::kInitAutoReadaheadSize; size_t readahead_limit_ = 0; + // initial_auto_readahead_size_ is used if RocksDB uses internal prefetch + // buffer. + uint64_t initial_auto_readahead_size_ = + BlockBasedTable::kInitAutoReadaheadSize; int64_t num_file_reads_ = 0; - size_t prev_offset_ = 0; + uint64_t prev_offset_ = 0; size_t prev_len_ = 0; std::unique_ptr prefetch_buffer_; }; diff --git a/table/block_based/partitioned_index_iterator.cc b/table/block_based/partitioned_index_iterator.cc index 6fce4e40fc..12ac6f96c9 100644 --- a/table/block_based/partitioned_index_iterator.cc +++ b/table/block_based/partitioned_index_iterator.cc @@ -92,7 +92,6 @@ void PartitionedIndexIterator::InitPartitionedIndexBlock() { block_prefetcher_.PrefetchIfNeeded(rep, partitioned_index_handle, read_options_.readahead_size, is_for_compaction); - Status s; table_->NewDataBlockIterator( read_options_, partitioned_index_handle, &block_iter_, diff --git a/table/block_based/partitioned_index_iterator.h b/table/block_based/partitioned_index_iterator.h index 2543e45e1b..40ad8bb23d 100644 --- a/table/block_based/partitioned_index_iterator.h +++ b/table/block_based/partitioned_index_iterator.h @@ -27,16 +27,17 @@ class PartitionedIndexIterator : public InternalIteratorBase { const InternalKeyComparator& icomp, std::unique_ptr>&& index_iter, TableReaderCaller caller, size_t compaction_readahead_size = 0) - : table_(table), + : index_iter_(std::move(index_iter)), + table_(table), read_options_(read_options), #ifndef NDEBUG icomp_(icomp), #endif user_comparator_(icomp.user_comparator()), - index_iter_(std::move(index_iter)), block_iter_points_to_real_block_(false), lookup_context_(caller), - block_prefetcher_(compaction_readahead_size) {} + block_prefetcher_(compaction_readahead_size) { + } ~PartitionedIndexIterator() override {} @@ -113,6 +114,21 @@ class PartitionedIndexIterator : public InternalIteratorBase { } } + void GetReadaheadState(ReadaheadFileInfo* readahead_file_info) override { + if (block_prefetcher_.prefetch_buffer() != nullptr && + read_options_.adaptive_readahead) { + block_prefetcher_.prefetch_buffer()->GetReadaheadState( + &(readahead_file_info->index_block_readahead_info)); + } + } + + void SetReadaheadState(ReadaheadFileInfo* readahead_file_info) override { + block_prefetcher_.SetReadaheadState( + &(readahead_file_info->index_block_readahead_info)); + } + + std::unique_ptr> index_iter_; + private: friend class BlockBasedTableReaderTestVerifyChecksum_ChecksumMismatch_Test; const BlockBasedTable* table_; @@ -121,7 +137,6 @@ class PartitionedIndexIterator : public InternalIteratorBase { const InternalKeyComparator& icomp_; #endif UserComparatorWrapper user_comparator_; - std::unique_ptr> index_iter_; IndexBlockIter block_iter_; // True if block_iter_ is initialized and points to the same block diff --git a/table/block_based/partitioned_index_reader.cc b/table/block_based/partitioned_index_reader.cc index 5a0e414684..f5934e691b 100644 --- a/table/block_based/partitioned_index_reader.cc +++ b/table/block_based/partitioned_index_reader.cc @@ -79,6 +79,7 @@ InternalIteratorBase* PartitionIndexReader::NewIterator( ro.fill_cache = read_options.fill_cache; ro.deadline = read_options.deadline; ro.io_timeout = read_options.io_timeout; + ro.adaptive_readahead = read_options.adaptive_readahead; // We don't return pinned data from index blocks, so no need // to set `block_contents_pinned`. std::unique_ptr> index_iter( diff --git a/table/internal_iterator.h b/table/internal_iterator.h index c4382a54e6..06d14ad352 100644 --- a/table/internal_iterator.h +++ b/table/internal_iterator.h @@ -7,7 +7,9 @@ #pragma once #include + #include "db/dbformat.h" +#include "file/readahead_file_info.h" #include "rocksdb/comparator.h" #include "rocksdb/iterator.h" #include "rocksdb/status.h" @@ -172,6 +174,18 @@ class InternalIteratorBase : public Cleanable { return Status::NotSupported(""); } + // When iterator moves from one file to another file at same level, new file's + // readahead state (details of last block read) is updated with previous + // file's readahead state. This way internal readahead_size of Prefetch Buffer + // doesn't start from scratch and can fall back to 8KB with no prefetch if + // reads are not sequential. + // + // Default implementation is no-op and its implemented by iterators. + virtual void GetReadaheadState(ReadaheadFileInfo* /*readahead_file_info*/) {} + + // Default implementation is no-op and its implemented by iterators. + virtual void SetReadaheadState(ReadaheadFileInfo* /*readahead_file_info*/) {} + protected: void SeekForPrevImpl(const Slice& target, const Comparator* cmp) { Seek(target); diff --git a/table/iterator_wrapper.h b/table/iterator_wrapper.h index ff46f2536c..134d93d082 100644 --- a/table/iterator_wrapper.h +++ b/table/iterator_wrapper.h @@ -154,6 +154,14 @@ class IteratorWrapperBase { return iter_->user_key(); } + void UpdateReadaheadState(InternalIteratorBase* old_iter) { + if (old_iter && iter_) { + ReadaheadFileInfo readahead_file_info; + old_iter->GetReadaheadState(&readahead_file_info); + iter_->SetReadaheadState(&readahead_file_info); + } + } + private: void Update() { valid_ = iter_->Valid(); diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index 52c7c2b53f..8e6795e944 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -1049,6 +1049,10 @@ DEFINE_bool(io_uring_enabled, true, extern "C" bool RocksDbIOUringEnable() { return FLAGS_io_uring_enabled; } #endif // ROCKSDB_LITE +DEFINE_bool(adaptive_readahead, false, + "carry forward internal auto readahead size from one file to next " + "file at each level during iteration"); + static enum ROCKSDB_NAMESPACE::CompressionType StringToCompressionType( const char* ctype) { assert(ctype); @@ -5491,6 +5495,7 @@ class Benchmark { options.timestamp = &ts; } + options.adaptive_readahead = FLAGS_adaptive_readahead; Iterator* iter = db->NewIterator(options); int64_t i = 0; int64_t bytes = 0; @@ -5585,7 +5590,9 @@ class Benchmark { } void ReadReverse(ThreadState* thread, DB* db) { - Iterator* iter = db->NewIterator(ReadOptions(FLAGS_verify_checksum, true)); + ReadOptions options(FLAGS_verify_checksum, true); + options.adaptive_readahead = FLAGS_adaptive_readahead; + Iterator* iter = db->NewIterator(options); int64_t i = 0; int64_t bytes = 0; for (iter->SeekToLast(); i < reads_ && iter->Valid(); iter->Prev()) { @@ -6375,6 +6382,7 @@ class Benchmark { options.prefix_same_as_start = FLAGS_prefix_same_as_start; options.tailing = FLAGS_use_tailing_iterator; options.readahead_size = FLAGS_readahead_size; + options.adaptive_readahead = FLAGS_adaptive_readahead; std::unique_ptr ts_guard; Slice ts; if (user_timestamp_size_ > 0) { @@ -6671,6 +6679,7 @@ class Benchmark { ts = mock_app_clock_->GetTimestampForRead(thread->rand, ts_guard.get()); read_options.timestamp = &ts; } + read_options.adaptive_readahead = FLAGS_adaptive_readahead; Iterator* iter = db_.db->NewIterator(read_options); fprintf(stderr, "num reads to do %" PRIu64 "\n", reads_); @@ -7271,6 +7280,7 @@ class Benchmark { DB* db = SelectDB(thread); ReadOptions read_opts(FLAGS_verify_checksum, true); + read_opts.adaptive_readahead = FLAGS_adaptive_readahead; std::unique_ptr ts_guard; Slice ts; if (user_timestamp_size_ > 0) {