rocksdb/table/block_based/block_prefetcher.cc
akankshamahajan f65a0379f0 Implement trimming of readhead size when upper bound is specified (#11684)
Summary:
Implement trimming of readahead_size under a new option ReadOptions.auto_readahead_size. It'll trim the readahead_size during prefetching upto iterate_upper_bound offset only when ReadOptions.iterate_upper_bound is set, therefore reducing the prefetching of data beyond upper_bound.
It's enabled for both implicit auto readahead size and when ReadOptions.readahead_size is specified and for sync and async_io.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/11684

Test Plan: Added new unit test

Reviewed By: anand1976

Differential Revision: D48479723

Pulled By: akankshamahajan15

fbshipit-source-id: 2b1703579caf779105e836b580866ffd7db076fc
2023-08-18 15:52:04 -07:00

151 lines
5.7 KiB
C++

// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "table/block_based/block_prefetcher.h"
#include "rocksdb/file_system.h"
#include "table/block_based/block_based_table_reader.h"
namespace ROCKSDB_NAMESPACE {
void BlockPrefetcher::PrefetchIfNeeded(const BlockBasedTable::Rep* rep,
const BlockHandle& handle,
const size_t readahead_size,
bool is_for_compaction,
const bool no_sequential_checking,
const ReadOptions& read_options) {
const size_t len = BlockBasedTable::BlockSizeWithTrailer(handle);
const size_t offset = handle.offset();
if (is_for_compaction) {
if (!rep->file->use_direct_io()) {
// If FS supports prefetching (readahead_limit_ will be non zero in that
// case) and current block exists in prefetch buffer then return.
if (offset + len <= readahead_limit_) {
return;
}
IOOptions opts;
Status s = rep->file->PrepareIOOptions(read_options, opts);
if (!s.ok()) {
return;
}
s = rep->file->Prefetch(opts, offset, len + compaction_readahead_size_);
if (s.ok()) {
readahead_limit_ = offset + len + compaction_readahead_size_;
return;
}
}
// If FS prefetch is not supported, fall back to use internal prefetch
// buffer. Discarding other return status of Prefetch calls intentionally,
// as we can fallback to reading from disk if Prefetch fails.
//
// num_file_reads is used by FilePrefetchBuffer only when
// implicit_auto_readahead is set.
rep->CreateFilePrefetchBufferIfNotExists(
compaction_readahead_size_, compaction_readahead_size_,
&prefetch_buffer_, /*implicit_auto_readahead=*/false,
/*num_file_reads=*/0, /*num_file_reads_for_auto_readahead=*/0,
/*upper_bound_offset=*/0);
return;
}
// Explicit user requested readahead.
if (readahead_size > 0) {
rep->CreateFilePrefetchBufferIfNotExists(
readahead_size, readahead_size, &prefetch_buffer_,
/*implicit_auto_readahead=*/false, /*num_file_reads=*/0,
/*num_file_reads_for_auto_readahead=*/0, upper_bound_offset_);
return;
}
// Implicit readahead.
// If max_auto_readahead_size is set to be 0 by user, no data will be
// prefetched.
size_t max_auto_readahead_size = rep->table_options.max_auto_readahead_size;
if (max_auto_readahead_size == 0 || initial_auto_readahead_size_ == 0) {
return;
}
if (initial_auto_readahead_size_ > max_auto_readahead_size) {
initial_auto_readahead_size_ = max_auto_readahead_size;
}
// In case of no_sequential_checking, it will skip the num_file_reads_ and
// will always creates the FilePrefetchBuffer.
if (no_sequential_checking) {
rep->CreateFilePrefetchBufferIfNotExists(
initial_auto_readahead_size_, max_auto_readahead_size,
&prefetch_buffer_, /*implicit_auto_readahead=*/true,
/*num_file_reads=*/0,
rep->table_options.num_file_reads_for_auto_readahead,
upper_bound_offset_);
return;
}
// If FS supports prefetching (readahead_limit_ will be non zero in that case)
// and current block exists in prefetch buffer then return.
if (offset + len <= readahead_limit_) {
UpdateReadPattern(offset, len);
return;
}
if (!IsBlockSequential(offset)) {
UpdateReadPattern(offset, len);
ResetValues(rep->table_options.initial_auto_readahead_size);
return;
}
UpdateReadPattern(offset, len);
// Implicit auto readahead, which will be enabled if the number of reads
// reached `table_options.num_file_reads_for_auto_readahead` (default: 2) and
// scans are sequential.
num_file_reads_++;
if (num_file_reads_ <= rep->table_options.num_file_reads_for_auto_readahead) {
return;
}
if (rep->file->use_direct_io()) {
rep->CreateFilePrefetchBufferIfNotExists(
initial_auto_readahead_size_, max_auto_readahead_size,
&prefetch_buffer_, /*implicit_auto_readahead=*/true, num_file_reads_,
rep->table_options.num_file_reads_for_auto_readahead,
upper_bound_offset_);
return;
}
if (readahead_size_ > max_auto_readahead_size) {
readahead_size_ = max_auto_readahead_size;
}
// If prefetch is not supported, fall back to use internal prefetch buffer.
// Discarding other return status of Prefetch calls intentionally, as
// we can fallback to reading from disk if Prefetch fails.
IOOptions opts;
Status s = rep->file->PrepareIOOptions(read_options, opts);
if (!s.ok()) {
return;
}
s = rep->file->Prefetch(
opts, handle.offset(),
BlockBasedTable::BlockSizeWithTrailer(handle) + readahead_size_);
if (s.IsNotSupported()) {
rep->CreateFilePrefetchBufferIfNotExists(
initial_auto_readahead_size_, max_auto_readahead_size,
&prefetch_buffer_, /*implicit_auto_readahead=*/true, num_file_reads_,
rep->table_options.num_file_reads_for_auto_readahead,
upper_bound_offset_);
return;
}
readahead_limit_ = offset + len + readahead_size_;
// Keep exponentially increasing readahead size until
// max_auto_readahead_size.
readahead_size_ = std::min(max_auto_readahead_size, readahead_size_ * 2);
}
} // namespace ROCKSDB_NAMESPACE