Avoid double block cache lookup during Seek with async_io option (#11616)

Summary:
With the async_io option, the Seek happens in 2 phases. Phase 1 starts an asynchronous read on a block cache miss, and phase 2 waits for it to complete and finishes the seek. In both phases, BlockBasedTable::NewDataBlockIterator is called, which tries to lookup the block cache for the data block first before looking in the prefetch buffer. It's optimized by doing the block cache lookup only in the first phase and save some CPU.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/11616

Test Plan: Added unit test

Reviewed By: jaykorean

Differential Revision: D47477887

Pulled By: akankshamahajan15

fbshipit-source-id: 0355e0a68fc0ea2eb92340ae42735afcdbcbfd79
This commit is contained in:
akankshamahajan 2023-09-18 11:32:30 -07:00 committed by Facebook GitHub Bot
parent 6997a06c63
commit 5b5b011cdd
13 changed files with 157 additions and 81 deletions

View file

@ -1459,6 +1459,69 @@ TEST_P(PrefetchTest, DBIterLevelReadAheadWithAsyncIO) {
Close();
}
TEST_P(PrefetchTest, AvoidBlockCacheLookupTwice) {
const int kNumKeys = 1000;
// Set options
std::shared_ptr<MockFS> fs =
std::make_shared<MockFS>(env_->GetFileSystem(), false);
std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs));
bool use_direct_io = std::get<0>(GetParam());
bool async_io = std::get<1>(GetParam());
Options options;
SetGenericOptions(env.get(), use_direct_io, options);
options.statistics = CreateDBStatistics();
BlockBasedTableOptions table_options;
SetBlockBasedTableOptions(table_options);
std::shared_ptr<Cache> cache = NewLRUCache(4 * 1024 * 1024, 2); // 8MB
table_options.block_cache = cache;
table_options.no_block_cache = false;
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
Status s = TryReopen(options);
if (use_direct_io && (s.IsNotSupported() || s.IsInvalidArgument())) {
// If direct IO is not supported, skip the test
return;
} else {
ASSERT_OK(s);
}
// Write to DB.
{
WriteBatch batch;
Random rnd(309);
for (int i = 0; i < kNumKeys; i++) {
ASSERT_OK(batch.Put(BuildKey(i), rnd.RandomString(1000)));
}
ASSERT_OK(db_->Write(WriteOptions(), &batch));
std::string start_key = BuildKey(0);
std::string end_key = BuildKey(kNumKeys - 1);
Slice least(start_key.data(), start_key.size());
Slice greatest(end_key.data(), end_key.size());
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &least, &greatest));
}
ReadOptions ro;
ro.async_io = async_io;
// Iterate over the keys.
{
// Each block contains around 4 keys.
auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ro));
ASSERT_OK(options.statistics->Reset());
iter->Seek(BuildKey(99)); // Prefetch data because of seek parallelization.
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(options.statistics->getAndResetTickerCount(BLOCK_CACHE_DATA_MISS),
1);
}
Close();
}
TEST_P(PrefetchTest, DBIterAsyncIONoIOUring) {
if (mem_env_ || encrypted_env_) {
ROCKSDB_GTEST_SKIP("Test requires non-mem or non-encrypted environment");

View file

@ -303,7 +303,8 @@ void BlockBasedTableIterator::InitDataBlock() {
read_options_, data_block_handle, &block_iter_, BlockType::kData,
/*get_context=*/nullptr, &lookup_context_,
block_prefetcher_.prefetch_buffer(),
/*for_compaction=*/is_for_compaction, /*async_read=*/false, s);
/*for_compaction=*/is_for_compaction, /*async_read=*/false, s,
/*use_block_cache_for_lookup=*/true);
block_iter_points_to_real_block_ = true;
CheckDataBlockWithinUpperBound();
if (!is_for_compaction &&
@ -349,7 +350,8 @@ void BlockBasedTableIterator::AsyncInitDataBlock(bool is_first_pass) {
read_options_, data_block_handle, &block_iter_, BlockType::kData,
/*get_context=*/nullptr, &lookup_context_,
block_prefetcher_.prefetch_buffer(),
/*for_compaction=*/is_for_compaction, /*async_read=*/true, s);
/*for_compaction=*/is_for_compaction, /*async_read=*/true, s,
/*use_block_cache_for_lookup=*/true);
if (s.IsTryAgain()) {
async_read_in_progress_ = true;
@ -364,7 +366,8 @@ void BlockBasedTableIterator::AsyncInitDataBlock(bool is_first_pass) {
read_options_, data_block_handle, &block_iter_, BlockType::kData,
/*get_context=*/nullptr, &lookup_context_,
block_prefetcher_.prefetch_buffer(),
/*for_compaction=*/is_for_compaction, /*async_read=*/false, s);
/*for_compaction=*/is_for_compaction, /*async_read=*/false, s,
/*use_block_cache_for_lookup=*/false);
}
block_iter_points_to_real_block_ = true;
CheckDataBlockWithinUpperBound();

View file

@ -88,19 +88,20 @@ CacheAllocationPtr CopyBufferToHeap(MemoryAllocator* allocator, Slice& buf) {
// Explicitly instantiate templates for each "blocklike" type we use (and
// before implicit specialization).
// This makes it possible to keep the template definitions in the .cc file.
#define INSTANTIATE_BLOCKLIKE_TEMPLATES(T) \
template Status BlockBasedTable::RetrieveBlock<T>( \
FilePrefetchBuffer * prefetch_buffer, const ReadOptions& ro, \
const BlockHandle& handle, const UncompressionDict& uncompression_dict, \
CachableEntry<T>* out_parsed_block, GetContext* get_context, \
BlockCacheLookupContext* lookup_context, bool for_compaction, \
bool use_cache, bool async_read) const; \
template Status BlockBasedTable::MaybeReadBlockAndLoadToCache<T>( \
FilePrefetchBuffer * prefetch_buffer, const ReadOptions& ro, \
const BlockHandle& handle, const UncompressionDict& uncompression_dict, \
bool for_compaction, CachableEntry<T>* block_entry, \
GetContext* get_context, BlockCacheLookupContext* lookup_context, \
BlockContents* contents, bool async_read) const;
#define INSTANTIATE_BLOCKLIKE_TEMPLATES(T) \
template Status BlockBasedTable::RetrieveBlock<T>( \
FilePrefetchBuffer * prefetch_buffer, const ReadOptions& ro, \
const BlockHandle& handle, const UncompressionDict& uncompression_dict, \
CachableEntry<T>* out_parsed_block, GetContext* get_context, \
BlockCacheLookupContext* lookup_context, bool for_compaction, \
bool use_cache, bool async_read, bool use_block_cache_for_lookup) const; \
template Status BlockBasedTable::MaybeReadBlockAndLoadToCache<T>( \
FilePrefetchBuffer * prefetch_buffer, const ReadOptions& ro, \
const BlockHandle& handle, const UncompressionDict& uncompression_dict, \
bool for_compaction, CachableEntry<T>* block_entry, \
GetContext* get_context, BlockCacheLookupContext* lookup_context, \
BlockContents* contents, bool async_read, \
bool use_block_cache_for_lookup) const;
INSTANTIATE_BLOCKLIKE_TEMPLATES(ParsedFullFilterBlock);
INSTANTIATE_BLOCKLIKE_TEMPLATES(UncompressionDict);
@ -994,7 +995,8 @@ Status BlockBasedTable::ReadRangeDelBlock(
read_options, range_del_handle,
/*input_iter=*/nullptr, BlockType::kRangeDeletion,
/*get_context=*/nullptr, lookup_context, prefetch_buffer,
/*for_compaction= */ false, /*async_read= */ false, tmp_status));
/*for_compaction= */ false, /*async_read= */ false, tmp_status,
/*use_block_cache_for_lookup=*/true));
assert(iter != nullptr);
s = iter->status();
if (!s.ok()) {
@ -1477,12 +1479,12 @@ BlockBasedTable::MaybeReadBlockAndLoadToCache(
const BlockHandle& handle, const UncompressionDict& uncompression_dict,
bool for_compaction, CachableEntry<TBlocklike>* out_parsed_block,
GetContext* get_context, BlockCacheLookupContext* lookup_context,
BlockContents* contents, bool async_read) const {
BlockContents* contents, bool async_read,
bool use_block_cache_for_lookup) const {
assert(out_parsed_block != nullptr);
const bool no_io = (ro.read_tier == kBlockCacheTier);
BlockCacheInterface<TBlocklike> block_cache{
rep_->table_options.block_cache.get()};
// First, try to get the block from the cache
//
// If either block cache is enabled, we'll try to read from it.
@ -1496,21 +1498,25 @@ BlockBasedTable::MaybeReadBlockAndLoadToCache(
key = key_data.AsSlice();
if (!contents) {
s = GetDataBlockFromCache(key, block_cache, out_parsed_block,
get_context);
// Value could still be null at this point, so check the cache handle
// and update the read pattern for prefetching
if (out_parsed_block->GetValue() || out_parsed_block->GetCacheHandle()) {
// TODO(haoyu): Differentiate cache hit on uncompressed block cache and
// compressed block cache.
is_cache_hit = true;
if (prefetch_buffer) {
// Update the block details so that PrefetchBuffer can use the read
// pattern to determine if reads are sequential or not for
// prefetching. It should also take in account blocks read from cache.
prefetch_buffer->UpdateReadPattern(
handle.offset(), BlockSizeWithTrailer(handle),
ro.adaptive_readahead /*decrease_readahead_size*/);
if (use_block_cache_for_lookup) {
s = GetDataBlockFromCache(key, block_cache, out_parsed_block,
get_context);
// Value could still be null at this point, so check the cache handle
// and update the read pattern for prefetching
if (out_parsed_block->GetValue() ||
out_parsed_block->GetCacheHandle()) {
// TODO(haoyu): Differentiate cache hit on uncompressed block cache
// and compressed block cache.
is_cache_hit = true;
if (prefetch_buffer) {
// Update the block details so that PrefetchBuffer can use the read
// pattern to determine if reads are sequential or not for
// prefetching. It should also take in account blocks read from
// cache.
prefetch_buffer->UpdateReadPattern(
handle.offset(), BlockSizeWithTrailer(handle),
ro.adaptive_readahead /*decrease_readahead_size*/);
}
}
}
}
@ -1693,7 +1699,7 @@ WithBlocklikeCheck<Status, TBlocklike> BlockBasedTable::RetrieveBlock(
const BlockHandle& handle, const UncompressionDict& uncompression_dict,
CachableEntry<TBlocklike>* out_parsed_block, GetContext* get_context,
BlockCacheLookupContext* lookup_context, bool for_compaction,
bool use_cache, bool async_read) const {
bool use_cache, bool async_read, bool use_block_cache_for_lookup) const {
assert(out_parsed_block);
assert(out_parsed_block->IsEmpty());
@ -1702,7 +1708,7 @@ WithBlocklikeCheck<Status, TBlocklike> BlockBasedTable::RetrieveBlock(
s = MaybeReadBlockAndLoadToCache(
prefetch_buffer, ro, handle, uncompression_dict, for_compaction,
out_parsed_block, get_context, lookup_context,
/*contents=*/nullptr, async_read);
/*contents=*/nullptr, async_read, use_block_cache_for_lookup);
if (!s.ok()) {
return s;
@ -2165,7 +2171,8 @@ Status BlockBasedTable::Get(const ReadOptions& read_options, const Slice& key,
NewDataBlockIterator<DataBlockIter>(
read_options, v.handle, &biter, BlockType::kData, get_context,
&lookup_data_block_context, /*prefetch_buffer=*/nullptr,
/*for_compaction=*/false, /*async_read=*/false, tmp_status);
/*for_compaction=*/false, /*async_read=*/false, tmp_status,
/*use_block_cache_for_lookup=*/true);
if (no_io && biter.status().IsIncomplete()) {
// couldn't get block from block_cache
@ -2335,7 +2342,7 @@ Status BlockBasedTable::Prefetch(const ReadOptions& read_options,
read_options, block_handle, &biter, /*type=*/BlockType::kData,
/*get_context=*/nullptr, &lookup_context,
/*prefetch_buffer=*/nullptr, /*for_compaction=*/false,
/*async_read=*/false, tmp_status);
/*async_read=*/false, tmp_status, /*use_block_cache_for_lookup=*/true);
if (!biter.status().ok()) {
// there was an unexpected error while pre-fetching
@ -2760,7 +2767,7 @@ Status BlockBasedTable::GetKVPairsFromDataBlocks(
/*input_iter=*/nullptr, /*type=*/BlockType::kData,
/*get_context=*/nullptr, /*lookup_context=*/nullptr,
/*prefetch_buffer=*/nullptr, /*for_compaction=*/false,
/*async_read=*/false, tmp_status));
/*async_read=*/false, tmp_status, /*use_block_cache_for_lookup=*/true));
s = datablock_iter->status();
if (!s.ok()) {
@ -2999,7 +3006,7 @@ Status BlockBasedTable::DumpDataBlocks(std::ostream& out_stream) {
/*input_iter=*/nullptr, /*type=*/BlockType::kData,
/*get_context=*/nullptr, /*lookup_context=*/nullptr,
/*prefetch_buffer=*/nullptr, /*for_compaction=*/false,
/*async_read=*/false, tmp_status));
/*async_read=*/false, tmp_status, /*use_block_cache_for_lookup=*/true));
s = datablock_iter->status();
if (!s.ok()) {

View file

@ -287,14 +287,12 @@ class BlockBasedTable : public TableReader {
// input_iter: if it is not null, update this one and return it as Iterator
template <typename TBlockIter>
TBlockIter* NewDataBlockIterator(const ReadOptions& ro,
const BlockHandle& block_handle,
TBlockIter* input_iter, BlockType block_type,
GetContext* get_context,
BlockCacheLookupContext* lookup_context,
FilePrefetchBuffer* prefetch_buffer,
bool for_compaction, bool async_read,
Status& s) const;
TBlockIter* NewDataBlockIterator(
const ReadOptions& ro, const BlockHandle& block_handle,
TBlockIter* input_iter, BlockType block_type, GetContext* get_context,
BlockCacheLookupContext* lookup_context,
FilePrefetchBuffer* prefetch_buffer, bool for_compaction, bool async_read,
Status& s, bool use_block_cache_for_lookup) const;
// input_iter: if it is not null, update this one and return it as Iterator
template <typename TBlockIter>
@ -351,7 +349,8 @@ class BlockBasedTable : public TableReader {
const BlockHandle& handle, const UncompressionDict& uncompression_dict,
bool for_compaction, CachableEntry<TBlocklike>* block_entry,
GetContext* get_context, BlockCacheLookupContext* lookup_context,
BlockContents* contents, bool async_read) const;
BlockContents* contents, bool async_read,
bool use_block_cache_for_lookup) const;
// Similar to the above, with one crucial difference: it will retrieve the
// block from the file even if there are no caches configured (assuming the
@ -362,7 +361,7 @@ class BlockBasedTable : public TableReader {
const BlockHandle& handle, const UncompressionDict& uncompression_dict,
CachableEntry<TBlocklike>* block_entry, GetContext* get_context,
BlockCacheLookupContext* lookup_context, bool for_compaction,
bool use_cache, bool async_read) const;
bool use_cache, bool async_read, bool use_block_cache_for_lookup) const;
template <typename TBlocklike>
WithBlocklikeCheck<void, TBlocklike> SaveLookupContextOrTraceRecord(

View file

@ -49,7 +49,7 @@ TBlockIter* BlockBasedTable::NewDataBlockIterator(
BlockType block_type, GetContext* get_context,
BlockCacheLookupContext* lookup_context,
FilePrefetchBuffer* prefetch_buffer, bool for_compaction, bool async_read,
Status& s) const {
Status& s, bool use_block_cache_for_lookup) const {
using IterBlocklike = typename IterTraits<TBlockIter>::IterBlocklike;
PERF_TIMER_GUARD(new_table_block_iter_nanos);
@ -77,15 +77,15 @@ TBlockIter* BlockBasedTable::NewDataBlockIterator(
const UncompressionDict& dict = uncompression_dict.GetValue()
? *uncompression_dict.GetValue()
: UncompressionDict::GetEmptyDict();
s = RetrieveBlock(prefetch_buffer, ro, handle, dict,
&block.As<IterBlocklike>(), get_context, lookup_context,
for_compaction,
/* use_cache */ true, async_read);
s = RetrieveBlock(
prefetch_buffer, ro, handle, dict, &block.As<IterBlocklike>(),
get_context, lookup_context, for_compaction,
/* use_cache */ true, async_read, use_block_cache_for_lookup);
} else {
s = RetrieveBlock(
prefetch_buffer, ro, handle, UncompressionDict::GetEmptyDict(),
&block.As<IterBlocklike>(), get_context, lookup_context, for_compaction,
/* use_cache */ true, async_read);
/* use_cache */ true, async_read, use_block_cache_for_lookup);
}
if (s.IsTryAgain() && async_read) {

View file

@ -50,12 +50,12 @@ DEFINE_SYNC_AND_ASYNC(void, BlockBasedTable::RetrieveMultipleBlocks)
}
// XXX: use_cache=true means double cache query?
statuses[idx_in_batch] =
RetrieveBlock(nullptr, options, handle, uncompression_dict,
&results[idx_in_batch].As<Block_kData>(),
mget_iter->get_context, /* lookup_context */ nullptr,
/* for_compaction */ false, /* use_cache */ true,
/* async_read */ false);
statuses[idx_in_batch] = RetrieveBlock(
nullptr, options, handle, uncompression_dict,
&results[idx_in_batch].As<Block_kData>(), mget_iter->get_context,
/* lookup_context */ nullptr,
/* for_compaction */ false, /* use_cache */ true,
/* async_read */ false, /* use_block_cache_for_lookup */ true);
}
assert(idx_in_batch == handles->size());
CO_RETURN;
@ -269,7 +269,7 @@ DEFINE_SYNC_AND_ASYNC(void, BlockBasedTable::RetrieveMultipleBlocks)
nullptr, options, handle, uncompression_dict,
/*for_compaction=*/false, block_entry, mget_iter->get_context,
/*lookup_context=*/nullptr, &serialized_block,
/*async_read=*/false);
/*async_read=*/false, /*use_block_cache_for_lookup=*/true);
// block_entry value could be null if no block cache is present, i.e
// BlockBasedTableOptions::no_block_cache is true and no compressed
@ -628,7 +628,8 @@ DEFINE_SYNC_AND_ASYNC(void, BlockBasedTable::MultiGet)
read_options, iiter->value().handle, &next_biter,
BlockType::kData, get_context, lookup_data_block_context,
/* prefetch_buffer= */ nullptr, /* for_compaction = */ false,
/*async_read = */ false, tmp_s);
/*async_read = */ false, tmp_s,
/* use_block_cache_for_lookup = */ true);
biter = &next_biter;
reusing_prev_block = false;
later_reused = false;

View file

@ -28,12 +28,12 @@ Status FilterBlockReaderCommon<TBlocklike>::ReadFilterBlock(
const BlockBasedTable::Rep* const rep = table->get_rep();
assert(rep);
const Status s =
table->RetrieveBlock(prefetch_buffer, read_options, rep->filter_handle,
UncompressionDict::GetEmptyDict(), filter_block,
get_context, lookup_context,
/* for_compaction */ false, use_cache,
/* async_read */ false);
const Status s = table->RetrieveBlock(
prefetch_buffer, read_options, rep->filter_handle,
UncompressionDict::GetEmptyDict(), filter_block, get_context,
lookup_context,
/* for_compaction */ false, use_cache,
/* async_read */ false, /* use_block_cache_for_lookup */ true);
return s;
}

View file

@ -29,7 +29,7 @@ Status BlockBasedTable::IndexReaderCommon::ReadIndexBlock(
prefetch_buffer, read_options, rep->index_handle,
UncompressionDict::GetEmptyDict(), &index_block->As<Block_kIndex>(),
get_context, lookup_context, /* for_compaction */ false, use_cache,
/* async_read */ false);
/* async_read */ false, /* use_block_cache_for_lookup */ true);
return s;
}

View file

@ -317,12 +317,12 @@ Status PartitionedFilterBlockReader::GetFilterPartitionBlock(
read_options.read_tier = kBlockCacheTier;
}
const Status s =
table()->RetrieveBlock(prefetch_buffer, read_options, fltr_blk_handle,
UncompressionDict::GetEmptyDict(), filter_block,
get_context, lookup_context,
/* for_compaction */ false, /* use_cache */ true,
/* async_read */ false);
const Status s = table()->RetrieveBlock(
prefetch_buffer, read_options, fltr_blk_handle,
UncompressionDict::GetEmptyDict(), filter_block, get_context,
lookup_context,
/* for_compaction */ false, /* use_cache */ true,
/* async_read */ false, /* use_block_cache_for_lookup */ true);
return s;
}
@ -521,7 +521,8 @@ Status PartitionedFilterBlockReader::CacheDependencies(
prefetch_buffer ? prefetch_buffer.get() : tail_prefetch_buffer, ro,
handle, UncompressionDict::GetEmptyDict(),
/* for_compaction */ false, &block, nullptr /* get_context */,
&lookup_context, nullptr /* contents */, false);
&lookup_context, nullptr /* contents */, false,
/* use_block_cache_for_lookup */ true);
if (!s.ok()) {
return s;
}

View file

@ -98,7 +98,8 @@ void PartitionedIndexIterator::InitPartitionedIndexBlock() {
BlockType::kIndex,
/*get_context=*/nullptr, &lookup_context_,
block_prefetcher_.prefetch_buffer(),
/*for_compaction=*/is_for_compaction, /*async_read=*/false, s);
/*for_compaction=*/is_for_compaction, /*async_read=*/false, s,
/*use_block_cache_for_lookup=*/true);
block_iter_points_to_real_block_ = true;
// We could check upper bound here but it is complicated to reason about
// upper bound in index iterator. On the other than, in large scans, index

View file

@ -200,7 +200,7 @@ Status PartitionIndexReader::CacheDependencies(
handle, UncompressionDict::GetEmptyDict(),
/*for_compaction=*/false, &block.As<Block_kIndex>(),
/*get_context=*/nullptr, &lookup_context, /*contents=*/nullptr,
/*async_read=*/false);
/*async_read=*/false, /*use_block_cache_for_lookup=*/true);
if (!s.ok()) {
return s;

View file

@ -63,7 +63,7 @@ Status UncompressionDictReader::ReadUncompressionDictionary(
UncompressionDict::GetEmptyDict(), uncompression_dict, get_context,
lookup_context,
/* for_compaction */ false, use_cache,
/* async_read */ false);
/* async_read */ false, /* use_block_cache_for_lookup */ true);
if (!s.ok()) {
ROCKS_LOG_WARN(

View file

@ -0,0 +1 @@
During async_io, the Seek happens in 2 phases. Phase 1 starts an asynchronous read on a block cache miss, and phase 2 waits for it to complete and finishes the seek. In both phases, it tries to lookup the block cache for the data block first before looking in the prefetch buffer. It's optimized by doing the block cache lookup only in the first phase that would save some CPU.