Refactor FilePrefetchBuffer code (#12097)

Summary:
Summary - Refactor FilePrefetchBuffer code
- Implementation:
FilePrefetchBuffer maintains a deque of free buffers (free_bufs_) of size num_buffers_ and buffers (bufs_) which contains the prefetched data. Whenever a buffer is consumed or is outdated (w.r.t. to requested offset), that buffer is cleared and returned to free_bufs_.

 If a buffer is available in free_bufs_, it's moved to bufs_ and is sent for prefetching. num_buffers_ defines how many buffers are maintained that contains prefetched data.
If num_buffers_ == 1, it's a sequential read flow. Read API will be called on that one buffer whenever the data is requested and is not in the buffer.
If num_buffers_ > 1, then the data is prefetched asynchronosuly in the buffers whenever the data is consumed from the buffers and that buffer is freed.
If num_buffers > 1, then requested data can be overlapping between 2 buffers. To return the continuous buffer overlap_bufs_ is used. The requested data is copied from 2 buffers to the overlap_bufs_ and overlap_bufs_ is returned to
the caller.

- Merged Sync and Async code flow into one in FilePrefetchBuffer.

Test Plan -
- Crash test passed
- Unit tests
- Pending - Benchmarks

Pull Request resolved: https://github.com/facebook/rocksdb/pull/12097

Reviewed By: ajkr

Differential Revision: D51759552

Pulled By: akankshamahajan15

fbshipit-source-id: 69a352945affac2ed22be96048d55863e0168ad5
This commit is contained in:
akankshamahajan 2024-01-05 09:29:01 -08:00 committed by Facebook GitHub Bot
parent ed46981bea
commit 5cb2d09d47
16 changed files with 939 additions and 921 deletions

View File

@ -11,8 +11,10 @@ FilePrefetchBuffer* PrefetchBufferCollection::GetOrCreatePrefetchBuffer(
uint64_t file_number) {
auto& prefetch_buffer = prefetch_buffers_[file_number];
if (!prefetch_buffer) {
prefetch_buffer.reset(
new FilePrefetchBuffer(readahead_size_, readahead_size_));
ReadaheadParams readahead_params;
readahead_params.initial_readahead_size = readahead_size_;
readahead_params.max_readahead_size = readahead_size_;
prefetch_buffer.reset(new FilePrefetchBuffer(readahead_params));
}
return prefetch_buffer.get();

4
env/fs_posix.cc vendored
View File

@ -998,9 +998,7 @@ class PosixFileSystem : public FileSystem {
}
#endif // ROCKSDB_IOURING_PRESENT
// EXPERIMENTAL
//
// TODO akankshamahajan:
// TODO:
// 1. Update Poll API to take into account min_completions
// and returns if number of handles in io_handles (any order) completed is
// equal to atleast min_completions.

File diff suppressed because it is too large Load Diff

View File

@ -11,6 +11,7 @@
#include <algorithm>
#include <atomic>
#include <deque>
#include <sstream>
#include <string>
@ -31,10 +32,37 @@ namespace ROCKSDB_NAMESPACE {
struct IOOptions;
class RandomAccessFileReader;
struct ReadaheadParams {
ReadaheadParams() {}
// The initial readahead size.
size_t initial_readahead_size = 0;
// The maximum readahead size.
// If max_readahead_size > readahead_size, then readahead size will be doubled
// on every IO until max_readahead_size is hit. Typically this is set as a
// multiple of initial_readahead_size. initial_readahead_size should be
// greater than equal to initial_readahead_size.
size_t max_readahead_size = 0;
// If true, Readahead is enabled implicitly by rocksdb
// after doing sequential scans for num_file_reads_for_auto_readahead.
bool implicit_auto_readahead = false;
// TODO akanksha - Remove num_file_reads when BlockPrefetcher is refactored.
uint64_t num_file_reads = 0;
uint64_t num_file_reads_for_auto_readahead = 0;
// Number of buffers to maintain that contains prefetched data. If num_buffers
// > 1 then buffers will be filled asynchronously whenever they get emptied.
size_t num_buffers = 1;
};
struct BufferInfo {
void ClearBuffer() {
buffer_.Clear();
initial_end_offset_ = 0;
async_req_len_ = 0;
}
AlignedBuffer buffer_;
@ -55,9 +83,6 @@ struct BufferInfo {
IOHandleDeleter del_fn_ = nullptr;
// pos represents the index of this buffer in vector of BufferInfo.
uint32_t pos_ = 0;
// initial_end_offset is used to keep track of the end offset of the buffer
// that was originally called. It's helpful in case of autotuning of readahead
// size when callback is made to BlockBasedTableIterator.
@ -69,6 +94,39 @@ struct BufferInfo {
// prefetch call, start_offset should be intialized to 100 i.e start_offset =
// buf->initial_end_offset_.
uint64_t initial_end_offset_ = 0;
bool IsDataBlockInBuffer(uint64_t offset, size_t length) {
assert(async_read_in_progress_ == false);
return (offset >= offset_ &&
offset + length <= offset_ + buffer_.CurrentSize());
}
bool IsOffsetInBuffer(uint64_t offset) {
assert(async_read_in_progress_ == false);
return (offset >= offset_ && offset < offset_ + buffer_.CurrentSize());
}
bool DoesBufferContainData() {
assert(async_read_in_progress_ == false);
return buffer_.CurrentSize() > 0;
}
bool IsBufferOutdated(uint64_t offset) {
return (!async_read_in_progress_ && DoesBufferContainData() &&
offset >= offset_ + buffer_.CurrentSize());
}
bool IsBufferOutdatedWithAsyncProgress(uint64_t offset) {
return (async_read_in_progress_ && io_handle_ != nullptr &&
offset >= offset_ + async_req_len_);
}
bool IsOffsetInBufferWithAsyncProgress(uint64_t offset) {
return (async_read_in_progress_ && offset >= offset_ &&
offset < offset_ + async_req_len_);
}
size_t CurrentSize() { return buffer_.CurrentSize(); }
};
enum class FilePrefetchBufferUsage {
@ -77,68 +135,87 @@ enum class FilePrefetchBufferUsage {
kUnknown,
};
// Implementation:
// FilePrefetchBuffer maintains a dequeu of free buffers (free_bufs_) with no
// data and bufs_ which contains the prefetched data. Whenever a buffer is
// consumed or is outdated (w.r.t. to requested offset), that buffer is cleared
// and returned to free_bufs_.
//
// If a buffer is available in free_bufs_, it's moved to bufs_ and is sent for
// prefetching.
// num_buffers_ defines how many buffers FilePrefetchBuffer can maintain at a
// time that contains prefetched data with num_buffers_ == bufs_.size() +
// free_bufs_.size().
//
// If num_buffers_ == 1, it's a sequential read flow. Read API will be called on
// that one buffer whenever the data is requested and is not in the buffer.
// If num_buffers_ > 1, then the data is prefetched asynchronosuly in the
// buffers whenever the data is consumed from the buffers and that buffer is
// freed.
// If num_buffers > 1, then requested data can be overlapping between 2 buffers.
// To return the continuous buffer, overlap_buf_ is used. The requested data is
// copied from 2 buffers to the overlap_buf_ and overlap_buf_ is returned to
// the caller.
// FilePrefetchBuffer is a smart buffer to store and read data from a file.
class FilePrefetchBuffer {
public:
// Constructor.
//
// All arguments are optional.
// readahead_size : the initial readahead size.
// max_readahead_size : the maximum readahead size.
// If max_readahead_size > readahead_size, the readahead size will be
// doubled on every IO until max_readahead_size is hit.
// Typically this is set as a multiple of readahead_size.
// max_readahead_size should be greater than equal to readahead_size.
// enable : controls whether reading from the buffer is enabled.
// If false, TryReadFromCache() always return false, and we only take stats
// for the minimum offset if track_min_offset = true. See below NOTE about
// mmap reads.
// ReadaheadParams : Parameters to control the readahead behavior.
// enable : controls whether reading from the buffer is enabled.
// If false, TryReadFromCache() always return false, and we
// only take stats for the minimum offset if
// track_min_offset = true.
// See below NOTE about mmap reads.
// track_min_offset : Track the minimum offset ever read and collect stats on
// it. Used for adaptable readahead of the file footer/metadata.
// implicit_auto_readahead : Readahead is enabled implicitly by rocksdb after
// doing sequential scans for two times.
// it. Used for adaptable readahead of the file
// footer/metadata.
//
// Automatic readhead is enabled for a file if readahead_size
// and max_readahead_size are passed in.
// A user can construct a FilePrefetchBuffer without any arguments, but use
// `Prefetch` to load data into the buffer.
// NOTE: FilePrefetchBuffer is incompatible with prefetching from
// RandomAccessFileReaders using mmap reads, so it is common to use
// `!use_mmap_reads` for the `enable` parameter.
FilePrefetchBuffer(
size_t readahead_size = 0, size_t max_readahead_size = 0,
bool enable = true, bool track_min_offset = false,
bool implicit_auto_readahead = false, uint64_t num_file_reads = 0,
uint64_t num_file_reads_for_auto_readahead = 0, FileSystem* fs = nullptr,
const ReadaheadParams& readahead_params = {}, bool enable = true,
bool track_min_offset = false, FileSystem* fs = nullptr,
SystemClock* clock = nullptr, Statistics* stats = nullptr,
const std::function<void(bool, uint64_t&, uint64_t&)>& cb = nullptr,
FilePrefetchBufferUsage usage = FilePrefetchBufferUsage::kUnknown)
: curr_(0),
readahead_size_(readahead_size),
initial_auto_readahead_size_(readahead_size),
max_readahead_size_(max_readahead_size),
: readahead_size_(readahead_params.initial_readahead_size),
initial_auto_readahead_size_(readahead_params.initial_readahead_size),
max_readahead_size_(readahead_params.max_readahead_size),
min_offset_read_(std::numeric_limits<size_t>::max()),
enable_(enable),
track_min_offset_(track_min_offset),
implicit_auto_readahead_(implicit_auto_readahead),
implicit_auto_readahead_(readahead_params.implicit_auto_readahead),
prev_offset_(0),
prev_len_(0),
num_file_reads_for_auto_readahead_(num_file_reads_for_auto_readahead),
num_file_reads_(num_file_reads),
num_file_reads_for_auto_readahead_(
readahead_params.num_file_reads_for_auto_readahead),
num_file_reads_(readahead_params.num_file_reads),
explicit_prefetch_submitted_(false),
fs_(fs),
clock_(clock),
stats_(stats),
usage_(usage),
readaheadsize_cb_(cb) {
readaheadsize_cb_(cb),
num_buffers_(readahead_params.num_buffers) {
assert((num_file_reads_ >= num_file_reads_for_auto_readahead_ + 1) ||
(num_file_reads_ == 0));
// If ReadOptions.async_io is enabled, data is asynchronously filled in
// second buffer while curr_ is being consumed. If data is overlapping in
// two buffers, data is copied to third buffer to return continuous buffer.
bufs_.resize(3);
for (uint32_t i = 0; i < 2; i++) {
bufs_[i].pos_ = i;
// If num_buffers_ > 1, data is asynchronously filled in the
// queue. As result, data can be overlapping in two buffers. It copies the
// data to overlap_buf_ in order to to return continuous buffer.
if (num_buffers_ > 1) {
overlap_buf_ = new BufferInfo();
}
free_bufs_.resize(num_buffers_);
for (uint32_t i = 0; i < num_buffers_; i++) {
free_bufs_[i] = new BufferInfo();
}
}
@ -146,10 +223,9 @@ class FilePrefetchBuffer {
// Abort any pending async read request before destroying the class object.
if (fs_ != nullptr) {
std::vector<void*> handles;
for (uint32_t i = 0; i < 2; i++) {
if (bufs_[i].async_read_in_progress_ &&
bufs_[i].io_handle_ != nullptr) {
handles.emplace_back(bufs_[i].io_handle_);
for (auto& buf : bufs_) {
if (buf->async_read_in_progress_ && buf->io_handle_ != nullptr) {
handles.emplace_back(buf->io_handle_);
}
}
if (!handles.empty()) {
@ -157,60 +233,63 @@ class FilePrefetchBuffer {
Status s = fs_->AbortIO(handles);
assert(s.ok());
}
for (auto& buf : bufs_) {
if (buf->io_handle_ != nullptr) {
DestroyAndClearIOHandle(buf);
buf->ClearBuffer();
}
buf->async_read_in_progress_ = false;
}
}
// Prefetch buffer bytes discarded.
uint64_t bytes_discarded = 0;
// Iterated over 2 buffers.
for (int i = 0; i < 2; i++) {
int first = i;
int second = i ^ 1;
if (DoesBufferContainData(first)) {
// If last block was read completely from first and some bytes in
// first buffer are still unconsumed.
if (prev_offset_ >= bufs_[first].offset_ &&
prev_offset_ + prev_len_ <
bufs_[first].offset_ + bufs_[first].buffer_.CurrentSize()) {
bytes_discarded += bufs_[first].buffer_.CurrentSize() -
(prev_offset_ + prev_len_ - bufs_[first].offset_);
// Iterated over buffers.
for (auto& buf : bufs_) {
if (buf->DoesBufferContainData()) {
// If last read was from this block and some bytes are still unconsumed.
if (prev_offset_ >= buf->offset_ &&
prev_offset_ + prev_len_ < buf->offset_ + buf->CurrentSize()) {
bytes_discarded +=
buf->CurrentSize() - (prev_offset_ + prev_len_ - buf->offset_);
}
// If data was in second buffer and some/whole block bytes were read
// from second buffer.
else if (prev_offset_ < bufs_[first].offset_ &&
!DoesBufferContainData(second)) {
// If last block read was completely from different buffer, this
// buffer is unconsumed.
if (prev_offset_ + prev_len_ <= bufs_[first].offset_) {
bytes_discarded += bufs_[first].buffer_.CurrentSize();
}
// If last block read overlaps with this buffer and some data is
// still unconsumed and previous buffer (second) is not cleared.
else if (prev_offset_ + prev_len_ > bufs_[first].offset_ &&
bufs_[first].offset_ + bufs_[first].buffer_.CurrentSize() ==
bufs_[second].offset_) {
bytes_discarded += bufs_[first].buffer_.CurrentSize() -
(/*bytes read from this buffer=*/prev_len_ -
(bufs_[first].offset_ - prev_offset_));
}
// If last read was from previous blocks and this block is unconsumed.
else if (prev_offset_ < buf->offset_ &&
prev_offset_ + prev_len_ <= buf->offset_) {
bytes_discarded += buf->CurrentSize();
}
}
}
for (uint32_t i = 0; i < 2; i++) {
// Release io_handle.
DestroyAndClearIOHandle(i);
}
RecordInHistogram(stats_, PREFETCHED_BYTES_DISCARDED, bytes_discarded);
for (auto& buf : bufs_) {
delete buf;
buf = nullptr;
}
for (auto& buf : free_bufs_) {
delete buf;
buf = nullptr;
}
if (overlap_buf_ != nullptr) {
delete overlap_buf_;
overlap_buf_ = nullptr;
}
}
bool Enabled() const { return enable_; }
// Load data into the buffer from a file.
// Called externally by user to only load data into the buffer from a file
// with num_buffers_ should be set to default(1).
//
// opts : the IO options to use.
// reader : the file reader.
// offset : the file offset to start reading from.
// n : the number of bytes to read.
//
Status Prefetch(const IOOptions& opts, RandomAccessFileReader* reader,
uint64_t offset, size_t n);
@ -244,15 +323,11 @@ class FilePrefetchBuffer {
uint64_t offset, size_t n, Slice* result, Status* s,
bool for_compaction = false);
bool TryReadFromCacheAsync(const IOOptions& opts,
RandomAccessFileReader* reader, uint64_t offset,
size_t n, Slice* result, Status* status);
// The minimum `offset` ever passed to TryReadFromCache(). This will nly be
// tracked if track_min_offset = true.
size_t min_offset_read() const { return min_offset_read_; }
size_t GetPrefetchOffset() const { return bufs_[curr_].offset_; }
size_t GetPrefetchOffset() const { return bufs_.front()->offset_; }
// Called in case of implicit auto prefetching.
void UpdateReadPattern(const uint64_t& offset, const size_t& len,
@ -272,6 +347,10 @@ class FilePrefetchBuffer {
void DecreaseReadAheadIfEligible(uint64_t offset, size_t size,
size_t value = DEFAULT_DECREMENT) {
if (bufs_.empty()) {
return;
}
// Decrease the readahead_size if
// - its enabled internally by RocksDB (implicit_auto_readahead_) and,
// - readahead_size is greater than 0 and,
@ -281,11 +360,12 @@ class FilePrefetchBuffer {
// - block is sequential with the previous read and,
// - num_file_reads_ + 1 (including this read) >
// num_file_reads_for_auto_readahead_
size_t curr_size = bufs_[curr_].async_read_in_progress_
? bufs_[curr_].async_req_len_
: bufs_[curr_].buffer_.CurrentSize();
size_t curr_size = bufs_.front()->async_read_in_progress_
? bufs_.front()->async_req_len_
: bufs_.front()->CurrentSize();
if (implicit_auto_readahead_ && readahead_size_ > 0) {
if ((offset + size > bufs_[curr_].offset_ + curr_size) &&
if ((offset + size > bufs_.front()->offset_ + curr_size) &&
IsBlockSequential(offset) &&
(num_file_reads_ + 1 > num_file_reads_for_auto_readahead_)) {
readahead_size_ =
@ -298,45 +378,47 @@ class FilePrefetchBuffer {
// Callback function passed to underlying FS in case of asynchronous reads.
void PrefetchAsyncCallback(const FSReadRequest& req, void* cb_arg);
void TEST_GetBufferOffsetandSize(uint32_t index, uint64_t& offset,
size_t& len) {
offset = bufs_[index].offset_;
len = bufs_[index].buffer_.CurrentSize();
void TEST_GetBufferOffsetandSize(
std::vector<std::pair<uint64_t, size_t>>& buffer_info) {
for (size_t i = 0; i < bufs_.size(); i++) {
buffer_info[i].first = bufs_[i]->offset_;
buffer_info[i].second = bufs_[i]->async_read_in_progress_
? bufs_[i]->async_req_len_
: bufs_[i]->CurrentSize();
}
}
private:
// Calculates roundoff offset and length to be prefetched based on alignment
// and data present in buffer_. It also allocates new buffer or refit tail if
// required.
void CalculateOffsetAndLen(size_t alignment, uint64_t offset,
size_t roundup_len, uint32_t index,
bool refit_tail, uint64_t& chunk_len);
void PrepareBufferForRead(BufferInfo* buf, size_t alignment, uint64_t offset,
size_t roundup_len, bool refit_tail,
uint64_t& aligned_useful_len);
void AbortIOIfNeeded(uint64_t offset);
void AbortOutdatedIO(uint64_t offset);
void AbortAllIOs();
void UpdateBuffersIfNeeded(uint64_t offset, size_t len);
void ClearOutdatedData(uint64_t offset, size_t len);
// It calls Poll API if any there is any pending asynchronous request. It then
// checks if data is in any buffer. It clears the outdated data and swaps the
// buffers if required.
void PollAndUpdateBuffersIfNeeded(uint64_t offset, size_t len);
// It calls Poll API to check for any pending asynchronous request.
void PollIfNeeded(uint64_t offset, size_t len);
Status PrefetchAsyncInternal(const IOOptions& opts,
RandomAccessFileReader* reader, uint64_t offset,
size_t length, size_t readahead_size,
bool& copy_to_third_buffer);
Status PrefetchInternal(const IOOptions& opts, RandomAccessFileReader* reader,
uint64_t offset, size_t length, size_t readahead_size,
bool& copy_to_third_buffer);
Status Read(const IOOptions& opts, RandomAccessFileReader* reader,
uint64_t read_len, uint64_t chunk_len, uint64_t start_offset,
uint32_t index);
Status Read(BufferInfo* buf, const IOOptions& opts,
RandomAccessFileReader* reader, uint64_t read_len,
uint64_t aligned_useful_len, uint64_t start_offset);
Status ReadAsync(const IOOptions& opts, RandomAccessFileReader* reader,
uint64_t read_len, uint64_t start_offset, uint32_t index);
Status ReadAsync(BufferInfo* buf, const IOOptions& opts,
RandomAccessFileReader* reader, uint64_t read_len,
uint64_t start_offset);
// Copy the data from src to third buffer.
void CopyDataToBuffer(uint32_t src, uint64_t& offset, size_t& length);
// Copy the data from src to overlap_buf_.
void CopyDataToBuffer(BufferInfo* src, uint64_t& offset, size_t& length);
bool IsBlockSequential(const size_t& offset) {
return (prev_len_ == 0 || (prev_offset_ + prev_len_ == offset));
@ -372,64 +454,24 @@ class FilePrefetchBuffer {
return true;
}
// Helper functions.
bool IsDataBlockInBuffer(uint64_t offset, size_t length, uint32_t index) {
return (offset >= bufs_[index].offset_ &&
offset + length <=
bufs_[index].offset_ + bufs_[index].buffer_.CurrentSize());
}
bool IsOffsetInBuffer(uint64_t offset, uint32_t index) {
return (offset >= bufs_[index].offset_ &&
offset < bufs_[index].offset_ + bufs_[index].buffer_.CurrentSize());
}
bool DoesBufferContainData(uint32_t index) {
return bufs_[index].buffer_.CurrentSize() > 0;
}
bool IsBufferOutdated(uint64_t offset, uint32_t index) {
return (
!bufs_[index].async_read_in_progress_ && DoesBufferContainData(index) &&
offset >= bufs_[index].offset_ + bufs_[index].buffer_.CurrentSize());
}
bool IsBufferOutdatedWithAsyncProgress(uint64_t offset, uint32_t index) {
return (bufs_[index].async_read_in_progress_ &&
bufs_[index].io_handle_ != nullptr &&
offset >= bufs_[index].offset_ + bufs_[index].async_req_len_);
}
bool IsOffsetInBufferWithAsyncProgress(uint64_t offset, uint32_t index) {
return (bufs_[index].async_read_in_progress_ &&
offset >= bufs_[index].offset_ &&
offset < bufs_[index].offset_ + bufs_[index].async_req_len_);
}
bool IsSecondBuffEligibleForPrefetching() {
uint32_t second = curr_ ^ 1;
if (bufs_[second].async_read_in_progress_) {
bool IsEligibleForFurtherPrefetching() {
if (free_bufs_.empty()) {
return false;
}
assert(!bufs_[curr_].async_read_in_progress_);
if (DoesBufferContainData(curr_) && DoesBufferContainData(second) &&
(bufs_[curr_].offset_ + bufs_[curr_].buffer_.CurrentSize() ==
bufs_[second].offset_)) {
return false;
}
// Readahead size can be 0 because of trimming.
if (readahead_size_ == 0) {
return false;
}
bufs_[second].ClearBuffer();
return true;
}
void DestroyAndClearIOHandle(uint32_t index) {
if (bufs_[index].io_handle_ != nullptr && bufs_[index].del_fn_ != nullptr) {
bufs_[index].del_fn_(bufs_[index].io_handle_);
bufs_[index].io_handle_ = nullptr;
bufs_[index].del_fn_ = nullptr;
void DestroyAndClearIOHandle(BufferInfo* buf) {
if (buf->io_handle_ != nullptr && buf->del_fn_ != nullptr) {
buf->del_fn_(buf->io_handle_);
buf->io_handle_ = nullptr;
buf->del_fn_ = nullptr;
}
bufs_[index].async_read_in_progress_ = false;
buf->async_read_in_progress_ = false;
}
Status HandleOverlappingData(const IOOptions& opts,
@ -444,17 +486,12 @@ class FilePrefetchBuffer {
Status* s,
bool for_compaction = false);
bool TryReadFromCacheAsyncUntracked(const IOOptions& opts,
RandomAccessFileReader* reader,
uint64_t offset, size_t n, Slice* result,
Status* status);
void ReadAheadSizeTuning(bool read_curr_block, bool refit_tail,
uint64_t prev_buf_end_offset, uint32_t index,
void ReadAheadSizeTuning(BufferInfo* buf, bool read_curr_block,
bool refit_tail, uint64_t prev_buf_end_offset,
size_t alignment, size_t length,
size_t readahead_size, uint64_t& offset,
uint64_t& end_offset, size_t& read_len,
uint64_t& chunk_len);
uint64_t& aligned_useful_len);
void UpdateStats(bool found_in_buffer, size_t length_found) {
if (found_in_buffer) {
@ -472,10 +509,78 @@ class FilePrefetchBuffer {
}
}
std::vector<BufferInfo> bufs_;
// curr_ represents the index for bufs_ indicating which buffer is being
// consumed currently.
uint32_t curr_;
Status PrefetchRemBuffers(const IOOptions& opts,
RandomAccessFileReader* reader,
uint64_t end_offset1, size_t alignment,
size_t readahead_size);
// *** BEGIN APIs related to allocating and freeing buffers ***
bool IsBufferQueueEmpty() { return bufs_.empty(); }
BufferInfo* GetFirstBuffer() { return bufs_.front(); }
BufferInfo* GetLastBuffer() { return bufs_.back(); }
size_t NumBuffersAllocated() { return bufs_.size(); }
void AllocateBuffer() {
assert(!free_bufs_.empty());
BufferInfo* buf = free_bufs_.front();
free_bufs_.pop_front();
bufs_.emplace_back(buf);
}
void AllocateBufferIfEmpty() {
if (bufs_.empty()) {
AllocateBuffer();
}
}
void FreeFrontBuffer() {
BufferInfo* buf = bufs_.front();
buf->ClearBuffer();
bufs_.pop_front();
free_bufs_.emplace_back(buf);
}
void FreeLastBuffer() {
BufferInfo* buf = bufs_.back();
buf->ClearBuffer();
bufs_.pop_back();
free_bufs_.emplace_back(buf);
}
void FreeAllBuffers() {
for (auto& buf : bufs_) {
buf->ClearBuffer();
bufs_.pop_front();
free_bufs_.emplace_back(buf);
}
}
void FreeEmptyBuffers() {
if (bufs_.empty()) {
return;
}
std::deque<BufferInfo*> tmp_buf;
while (!bufs_.empty()) {
BufferInfo* buf = bufs_.front();
bufs_.pop_front();
if (buf->async_read_in_progress_ || buf->DoesBufferContainData()) {
tmp_buf.emplace_back(buf);
} else {
free_bufs_.emplace_back(buf);
}
}
bufs_ = tmp_buf;
}
// *** END APIs related to allocating and freeing buffers ***
std::deque<BufferInfo*> bufs_;
std::deque<BufferInfo*> free_bufs_;
BufferInfo* overlap_buf_ = nullptr;
size_t readahead_size_;
size_t initial_auto_readahead_size_;
@ -503,7 +608,7 @@ class FilePrefetchBuffer {
uint64_t num_file_reads_;
// If explicit_prefetch_submitted_ is set then it indicates RocksDB called
// PrefetchAsync to submit request. It needs to call TryReadFromCacheAsync to
// PrefetchAsync to submit request. It needs to call TryReadFromCache to
// poll the submitted request without checking if data is sequential and
// num_file_reads_.
bool explicit_prefetch_submitted_;
@ -515,5 +620,9 @@ class FilePrefetchBuffer {
FilePrefetchBufferUsage usage_;
std::function<void(bool, uint64_t&, uint64_t&)> readaheadsize_cb_;
// num_buffers_ is the number of buffers maintained by FilePrefetchBuffer to
// prefetch the data at a time.
size_t num_buffers_;
};
} // namespace ROCKSDB_NAMESPACE

View File

@ -1637,7 +1637,6 @@ TEST_P(PrefetchTest, DBIterLevelReadAheadWithAsyncIO) {
ASSERT_OK(Flush());
}
MoveFilesToLevel(2);
int buff_async_prefetch_count = 0;
int buff_prefetch_count = 0;
int readahead_carry_over_count = 0;
int num_sst_files = NumTableFilesAtLevel(2);
@ -1650,10 +1649,6 @@ TEST_P(PrefetchTest, DBIterLevelReadAheadWithAsyncIO) {
"FilePrefetchBuffer::Prefetch:Start",
[&](void*) { buff_prefetch_count++; });
SyncPoint::GetInstance()->SetCallBack(
"FilePrefetchBuffer::PrefetchAsyncInternal:Start",
[&](void*) { buff_async_prefetch_count++; });
SyncPoint::GetInstance()->SetCallBack(
"UpdateResults::io_uring_result",
[&](void* /*arg*/) { read_async_called = true; });
@ -1709,7 +1704,7 @@ TEST_P(PrefetchTest, DBIterLevelReadAheadWithAsyncIO) {
// Not all platforms support iouring. In that case, ReadAsync in posix
// won't submit async requests.
if (read_async_called) {
ASSERT_GT(buff_async_prefetch_count, 0);
ASSERT_GT(buff_prefetch_count, 0);
ASSERT_GT(async_read_bytes.count, 0);
} else {
ASSERT_GT(buff_prefetch_count, 0);
@ -1953,7 +1948,7 @@ TEST_P(PrefetchTest1, SeekWithExtraPrefetchAsyncIO) {
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &least, &greatest));
Close();
int buff_prefetch_count = 0, extra_prefetch_buff_cnt = 0;
for (size_t i = 0; i < 3; i++) {
table_options.num_file_reads_for_auto_readahead = i;
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
@ -1961,14 +1956,14 @@ TEST_P(PrefetchTest1, SeekWithExtraPrefetchAsyncIO) {
s = TryReopen(options);
ASSERT_OK(s);
int buff_prefetch_count = 0;
int extra_prefetch_buff_cnt = 0;
buff_prefetch_count = 0;
extra_prefetch_buff_cnt = 0;
SyncPoint::GetInstance()->SetCallBack(
"FilePrefetchBuffer::PrefetchAsync:ExtraPrefetching",
[&](void*) { extra_prefetch_buff_cnt++; });
SyncPoint::GetInstance()->SetCallBack(
"FilePrefetchBuffer::PrefetchAsyncInternal:Start",
"FilePrefetchBuffer::Prefetch:Start",
[&](void*) { buff_prefetch_count++; });
SyncPoint::GetInstance()->EnableProcessing();
@ -2287,12 +2282,6 @@ TEST_P(PrefetchTest1, SeekParallelizationTest) {
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &least, &greatest));
int buff_prefetch_count = 0;
int buff_prefetch_async_count = 0;
SyncPoint::GetInstance()->SetCallBack(
"FilePrefetchBuffer::PrefetchAsyncInternal:Start",
[&](void*) { buff_prefetch_async_count++; });
SyncPoint::GetInstance()->SetCallBack("FilePrefetchBuffer::Prefetch:Start",
[&](void*) { buff_prefetch_count++; });
@ -2339,7 +2328,7 @@ TEST_P(PrefetchTest1, SeekParallelizationTest) {
// not all platforms support io_uring. In that case it'll fallback to
// normal prefetching without async_io.
if (read_async_called) {
ASSERT_EQ(buff_prefetch_async_count, 2);
ASSERT_EQ(buff_prefetch_count, 2);
ASSERT_GT(async_read_bytes.count, 0);
ASSERT_GT(get_perf_context()->number_async_seek, 0);
} else {
@ -2431,9 +2420,8 @@ TEST_P(PrefetchTest, ReadAsyncWithPosixFS) {
ro.readahead_size = 16 * 1024;
}
SyncPoint::GetInstance()->SetCallBack(
"FilePrefetchBuffer::PrefetchAsyncInternal:Start",
[&](void*) { buff_prefetch_count++; });
SyncPoint::GetInstance()->SetCallBack("FilePrefetchBuffer::Prefetch:Start",
[&](void*) { buff_prefetch_count++; });
SyncPoint::GetInstance()->SetCallBack(
"UpdateResults::io_uring_result",
@ -2455,7 +2443,6 @@ TEST_P(PrefetchTest, ReadAsyncWithPosixFS) {
if (read_async_called) {
ASSERT_EQ(num_keys, total_keys);
ASSERT_GT(buff_prefetch_count, 0);
// Check stats to make sure async prefetch is done.
HistogramData async_read_bytes;
options.statistics->histogramData(ASYNC_READ_BYTES, &async_read_bytes);
@ -2469,8 +2456,8 @@ TEST_P(PrefetchTest, ReadAsyncWithPosixFS) {
// Not all platforms support iouring. In that case, ReadAsync in posix
// won't submit async requests.
ASSERT_EQ(num_keys, total_keys);
ASSERT_EQ(buff_prefetch_count, 0);
}
ASSERT_GT(buff_prefetch_count, 0);
}
SyncPoint::GetInstance()->DisableProcessing();
@ -2558,9 +2545,8 @@ TEST_P(PrefetchTest, MultipleSeekWithPosixFS) {
ro.readahead_size = 16 * 1024;
}
SyncPoint::GetInstance()->SetCallBack(
"FilePrefetchBuffer::PrefetchAsyncInternal:Start",
[&](void*) { buff_prefetch_count++; });
SyncPoint::GetInstance()->SetCallBack("FilePrefetchBuffer::Prefetch:Start",
[&](void*) { buff_prefetch_count++; });
SyncPoint::GetInstance()->SetCallBack(
"UpdateResults::io_uring_result",
@ -2686,9 +2672,8 @@ TEST_P(PrefetchTest, SeekParallelizationTestWithPosix) {
int buff_prefetch_count = 0;
SyncPoint::GetInstance()->SetCallBack(
"FilePrefetchBuffer::PrefetchAsyncInternal:Start",
[&](void*) { buff_prefetch_count++; });
SyncPoint::GetInstance()->SetCallBack("FilePrefetchBuffer::Prefetch:Start",
[&](void*) { buff_prefetch_count++; });
bool read_async_called = false;
SyncPoint::GetInstance()->SetCallBack(
@ -2808,9 +2793,8 @@ TEST_P(PrefetchTest, TraceReadAsyncWithCallbackWrapper) {
ro.readahead_size = 16 * 1024;
}
SyncPoint::GetInstance()->SetCallBack(
"FilePrefetchBuffer::PrefetchAsyncInternal:Start",
[&](void*) { buff_prefetch_count++; });
SyncPoint::GetInstance()->SetCallBack("FilePrefetchBuffer::Prefetch:Start",
[&](void*) { buff_prefetch_count++; });
SyncPoint::GetInstance()->SetCallBack(
"UpdateResults::io_uring_result",
@ -2927,7 +2911,11 @@ TEST_F(FilePrefetchBufferTest, SeekWithBlockCacheHit) {
std::unique_ptr<RandomAccessFileReader> r;
Read(fname, opts, &r);
FilePrefetchBuffer fpb(16384, 16384, true, false, false, 0, 0, fs());
ReadaheadParams readahead_params;
readahead_params.initial_readahead_size = 16384;
readahead_params.max_readahead_size = 16384;
FilePrefetchBuffer fpb(readahead_params, true, false, fs());
Slice result;
// Simulate a seek of 4096 bytes at offset 0. Due to the readahead settings,
// it will do two reads of 4096+8192 and 8192
@ -2945,14 +2933,13 @@ TEST_F(FilePrefetchBufferTest, SeekWithBlockCacheHit) {
// 16384
IOOptions io_opts;
io_opts.rate_limiter_priority = Env::IOPriority::IO_LOW;
ASSERT_TRUE(
fpb.TryReadFromCacheAsync(io_opts, r.get(), 8192, 8192, &result, &s));
ASSERT_TRUE(fpb.TryReadFromCache(io_opts, r.get(), 8192, 8192, &result, &s));
}
// Test to ensure when PrefetchAsync is called during seek, it doesn't do any
// alignment or prefetch extra if readahead is not enabled during seek.
TEST_F(FilePrefetchBufferTest, SeekWithoutAlignment) {
std::string fname = "seek-wwithout-alignment";
std::string fname = "seek-without-alignment";
Random rand(0);
std::string content = rand.RandomString(32768);
Write(fname, content);
@ -2973,10 +2960,16 @@ TEST_F(FilePrefetchBufferTest, SeekWithoutAlignment) {
// Without readahead enabled, there will be no alignment and offset of buffer
// will be n.
{
FilePrefetchBuffer fpb(
/*readahead_size=*/8192, /*max_readahead_size=*/16384, /*enable=*/true,
/*track_min_offset=*/false, /*implicit_auto_readahead=*/true,
/*num_file_reads=*/0, /*num_file_reads_for_auto_readahead=*/2, fs());
ReadaheadParams readahead_params;
readahead_params.initial_readahead_size = 8192;
readahead_params.max_readahead_size = 16384;
readahead_params.implicit_auto_readahead = true;
readahead_params.num_file_reads_for_auto_readahead = 2;
readahead_params.num_buffers = 2;
FilePrefetchBuffer fpb(readahead_params, /*enable=*/true,
/*track_min_offset=*/false, fs(), nullptr, nullptr,
nullptr, FilePrefetchBufferUsage::kUnknown);
Slice result;
// Simulate a seek of half of alignment bytes at offset n. Due to the
@ -2993,7 +2986,7 @@ TEST_F(FilePrefetchBufferTest, SeekWithoutAlignment) {
IOOptions io_opts;
io_opts.rate_limiter_priority = Env::IOPriority::IO_LOW;
ASSERT_TRUE(fpb.TryReadFromCacheAsync(io_opts, r.get(), n, n, &result, &s));
ASSERT_TRUE(fpb.TryReadFromCache(io_opts, r.get(), n, n, &result, &s));
if (read_async_called) {
ASSERT_EQ(fpb.GetPrefetchOffset(), n);
@ -3004,10 +2997,14 @@ TEST_F(FilePrefetchBufferTest, SeekWithoutAlignment) {
// buffer will be 0.
{
read_async_called = false;
FilePrefetchBuffer fpb(
/*readahead_size=*/16384, /*max_readahead_size=*/16384, /*enable=*/true,
/*track_min_offset=*/false, /*implicit_auto_readahead=*/false,
/*num_file_reads=*/0, /*num_file_reads_for_auto_readahead=*/2, fs());
ReadaheadParams readahead_params;
readahead_params.initial_readahead_size = 16384;
readahead_params.max_readahead_size = 16384;
readahead_params.num_file_reads_for_auto_readahead = 2;
readahead_params.num_buffers = 2;
FilePrefetchBuffer fpb(readahead_params, /*enable=*/true,
/*track_min_offset=*/false, fs(), nullptr, nullptr,
nullptr, FilePrefetchBufferUsage::kUnknown);
Slice result;
// Simulate a seek of half of alignment bytes at offset n.
@ -3022,7 +3019,7 @@ TEST_F(FilePrefetchBufferTest, SeekWithoutAlignment) {
IOOptions io_opts;
io_opts.rate_limiter_priority = Env::IOPriority::IO_LOW;
ASSERT_TRUE(fpb.TryReadFromCacheAsync(io_opts, r.get(), n, n, &result, &s));
ASSERT_TRUE(fpb.TryReadFromCache(io_opts, r.get(), n, n, &result, &s));
if (read_async_called) {
ASSERT_EQ(fpb.GetPrefetchOffset(), 0);
@ -3040,10 +3037,13 @@ TEST_F(FilePrefetchBufferTest, NoSyncWithAsyncIO) {
std::unique_ptr<RandomAccessFileReader> r;
Read(fname, opts, &r);
FilePrefetchBuffer fpb(
/*readahead_size=*/8192, /*max_readahead_size=*/16384, /*enable=*/true,
/*track_min_offset=*/false, /*implicit_auto_readahead=*/false,
/*num_file_reads=*/0, /*num_file_reads_for_auto_readahead=*/0, fs());
ReadaheadParams readahead_params;
readahead_params.initial_readahead_size = 8192;
readahead_params.max_readahead_size = 16384;
readahead_params.num_buffers = 2;
FilePrefetchBuffer fpb(readahead_params, /*enable=*/true,
/*track_min_offset=*/false, fs(), nullptr, nullptr,
nullptr, FilePrefetchBufferUsage::kUnknown);
int read_async_called = 0;
SyncPoint::GetInstance()->SetCallBack(
@ -3064,8 +3064,8 @@ TEST_F(FilePrefetchBufferTest, NoSyncWithAsyncIO) {
ASSERT_TRUE(s.IsTryAgain());
IOOptions io_opts;
io_opts.rate_limiter_priority = Env::IOPriority::IO_LOW;
ASSERT_TRUE(fpb.TryReadFromCacheAsync(io_opts, r.get(), /*offset=*/3000,
/*length=*/4000, &async_result, &s));
ASSERT_TRUE(fpb.TryReadFromCache(io_opts, r.get(), /*offset=*/3000,
/*length=*/4000, &async_result, &s));
// No sync call should be made.
HistogramData sst_read_micros;
stats()->histogramData(SST_READ_MICROS, &sst_read_micros);
@ -3092,11 +3092,14 @@ TEST_F(FilePrefetchBufferTest, SyncReadaheadStats) {
Read(fname, opts, &r);
std::shared_ptr<Statistics> stats = CreateDBStatistics();
FilePrefetchBuffer fpb(8192, 8192, true, false, false, 0, 0, fs(), nullptr,
ReadaheadParams readahead_params;
readahead_params.initial_readahead_size = 8192;
readahead_params.max_readahead_size = 8192;
FilePrefetchBuffer fpb(readahead_params, true, false, fs(), nullptr,
stats.get());
Slice result;
// Simulate a seek of 4096 bytes at offset 0. Due to the readahead settings,
// it will do two reads of 4096+8192 and 8192
// it will do a read of offset 0 and length - (4096 + 8192) 12288.
Status s;
ASSERT_TRUE(fpb.TryReadFromCache(IOOptions(), r.get(), 0, 4096, &result, &s));
ASSERT_EQ(s, Status::OK());
@ -3105,8 +3108,8 @@ TEST_F(FilePrefetchBufferTest, SyncReadaheadStats) {
// Simulate a block cache hit
fpb.UpdateReadPattern(4096, 4096, false);
// Now read some data that straddles the two prefetch buffers - offset 8192 to
// 16384
// Now read some data that'll prefetch additional data from 12288 to 16384
// (4096) + 8192 (readahead_size).
ASSERT_TRUE(
fpb.TryReadFromCache(IOOptions(), r.get(), 8192, 8192, &result, &s));
ASSERT_EQ(s, Status::OK());

View File

@ -384,7 +384,8 @@ void BlockBasedTableIterator::InitDataBlock() {
block_prefetcher_.PrefetchIfNeeded(
rep, data_block_handle, read_options_.readahead_size,
is_for_compaction,
/*no_sequential_checking=*/false, read_options_, readaheadsize_cb);
/*no_sequential_checking=*/false, read_options_, readaheadsize_cb,
read_options_.async_io);
Status s;
table_->NewDataBlockIterator<DataBlockIter>(
@ -444,7 +445,7 @@ void BlockBasedTableIterator::AsyncInitDataBlock(bool is_first_pass) {
block_prefetcher_.PrefetchIfNeeded(
rep, data_block_handle, read_options_.readahead_size,
is_for_compaction, /*no_sequential_checking=*/read_options_.async_io,
read_options_, readaheadsize_cb);
read_options_, readaheadsize_cb, read_options_.async_io);
Status s;
table_->NewDataBlockIterator<DataBlockIter>(
@ -712,7 +713,6 @@ void BlockBasedTableIterator::InitializeStartAndEndOffsets(
// It can be due to reading error in second buffer in FilePrefetchBuffer.
// BlockHandles already added to the queue but there was error in fetching
// those data blocks. So in this call they need to be read again.
assert(block_handles_.front().is_cache_hit_ == false);
found_first_miss_block = true;
// Initialize prev_handles_size to 0 as all those handles need to be read
// again.
@ -855,7 +855,8 @@ void BlockBasedTableIterator::BlockCacheLookupForReadAheadSize(
auto it_end =
block_handles_.rbegin() + (block_handles_.size() - prev_handles_size);
while (it != it_end && (*it).is_cache_hit_) {
while (it != it_end && (*it).is_cache_hit_ &&
start_updated_offset != (*it).handle_.offset()) {
it++;
}
end_updated_offset = (*it).handle_.offset() + footer + (*it).handle_.size();

View File

@ -613,8 +613,7 @@ Status BlockBasedTable::Open(
} else {
// Should not prefetch for mmap mode.
prefetch_buffer.reset(new FilePrefetchBuffer(
0 /* readahead_size */, 0 /* max_readahead_size */, false /* enable */,
true /* track_min_offset */));
ReadaheadParams(), false /* enable */, true /* track_min_offset */));
}
// Read in the following order:
@ -876,17 +875,14 @@ Status BlockBasedTable::PrefetchTail(
if (s.ok() && !file->use_direct_io() && !force_direct_prefetch) {
if (!file->Prefetch(opts, prefetch_off, prefetch_len).IsNotSupported()) {
prefetch_buffer->reset(new FilePrefetchBuffer(
0 /* readahead_size */, 0 /* max_readahead_size */,
false /* enable */, true /* track_min_offset */));
ReadaheadParams(), false /* enable */, true /* track_min_offset */));
return Status::OK();
}
}
// Use `FilePrefetchBuffer`
prefetch_buffer->reset(new FilePrefetchBuffer(
0 /* readahead_size */, 0 /* max_readahead_size */, true /* enable */,
true /* track_min_offset */, false /* implicit_auto_readahead */,
0 /* num_file_reads */, 0 /* num_file_reads_for_auto_readahead */,
ReadaheadParams(), true /* enable */, true /* track_min_offset */,
nullptr /* fs */, nullptr /* clock */, stats,
/* readahead_cb */ nullptr,
FilePrefetchBufferUsage::kTableOpenPrefetchTail));
@ -2499,10 +2495,11 @@ Status BlockBasedTable::VerifyChecksumInBlocks(
: rep_->table_options.max_auto_readahead_size;
// FilePrefetchBuffer doesn't work in mmap mode and readahead is not
// needed there.
ReadaheadParams readahead_params;
readahead_params.initial_readahead_size = readahead_size;
readahead_params.max_readahead_size = readahead_size;
FilePrefetchBuffer prefetch_buffer(
readahead_size /* readahead_size */,
readahead_size /* max_readahead_size */,
!rep_->ioptions.allow_mmap_reads /* enable */);
readahead_params, !rep_->ioptions.allow_mmap_reads /* enable */);
for (index_iter->SeekToFirst(); index_iter->Valid(); index_iter->Next()) {
s = index_iter->status();

View File

@ -696,32 +696,23 @@ struct BlockBasedTable::Rep {
return file ? TableFileNameToNumber(file->file_name()) : UINT64_MAX;
}
void CreateFilePrefetchBuffer(
size_t readahead_size, size_t max_readahead_size,
std::unique_ptr<FilePrefetchBuffer>* fpb, bool implicit_auto_readahead,
uint64_t num_file_reads, uint64_t num_file_reads_for_auto_readahead,
const ReadaheadParams& readahead_params,
std::unique_ptr<FilePrefetchBuffer>* fpb,
const std::function<void(bool, uint64_t&, uint64_t&)>& readaheadsize_cb,
FilePrefetchBufferUsage usage) const {
fpb->reset(new FilePrefetchBuffer(
readahead_size, max_readahead_size,
!ioptions.allow_mmap_reads /* enable */, false /* track_min_offset */,
implicit_auto_readahead, num_file_reads,
num_file_reads_for_auto_readahead, ioptions.fs.get(), ioptions.clock,
readahead_params, !ioptions.allow_mmap_reads /* enable */,
false /* track_min_offset */, ioptions.fs.get(), ioptions.clock,
ioptions.stats, readaheadsize_cb, usage));
}
void CreateFilePrefetchBufferIfNotExists(
size_t readahead_size, size_t max_readahead_size,
std::unique_ptr<FilePrefetchBuffer>* fpb, bool implicit_auto_readahead,
uint64_t num_file_reads, uint64_t num_file_reads_for_auto_readahead,
const ReadaheadParams& readahead_params,
std::unique_ptr<FilePrefetchBuffer>* fpb,
const std::function<void(bool, uint64_t&, uint64_t&)>& readaheadsize_cb,
FilePrefetchBufferUsage usage = FilePrefetchBufferUsage::kUnknown) const {
if (!(*fpb)) {
CreateFilePrefetchBuffer(readahead_size, max_readahead_size, fpb,
implicit_auto_readahead, num_file_reads,
num_file_reads_for_auto_readahead,
readaheadsize_cb, usage);
CreateFilePrefetchBuffer(readahead_params, fpb, readaheadsize_cb, usage);
}
}

View File

@ -16,7 +16,13 @@ void BlockPrefetcher::PrefetchIfNeeded(
const BlockBasedTable::Rep* rep, const BlockHandle& handle,
const size_t readahead_size, bool is_for_compaction,
const bool no_sequential_checking, const ReadOptions& read_options,
const std::function<void(bool, uint64_t&, uint64_t&)>& readaheadsize_cb) {
const std::function<void(bool, uint64_t&, uint64_t&)>& readaheadsize_cb,
bool is_async_io_prefetch) {
ReadaheadParams readahead_params;
readahead_params.initial_readahead_size = readahead_size;
readahead_params.max_readahead_size = readahead_size;
readahead_params.num_buffers = is_async_io_prefetch ? 2 : 1;
const size_t len = BlockBasedTable::BlockSizeWithTrailer(handle);
const size_t offset = handle.offset();
if (is_for_compaction) {
@ -44,20 +50,18 @@ void BlockPrefetcher::PrefetchIfNeeded(
//
// num_file_reads is used by FilePrefetchBuffer only when
// implicit_auto_readahead is set.
rep->CreateFilePrefetchBufferIfNotExists(
compaction_readahead_size_, compaction_readahead_size_,
&prefetch_buffer_, /*implicit_auto_readahead=*/false,
/*num_file_reads=*/0, /*num_file_reads_for_auto_readahead=*/0,
/*readaheadsize_cb=*/nullptr);
readahead_params.initial_readahead_size = compaction_readahead_size_;
readahead_params.max_readahead_size = compaction_readahead_size_;
rep->CreateFilePrefetchBufferIfNotExists(readahead_params,
&prefetch_buffer_,
/*readaheadsize_cb=*/nullptr);
return;
}
// Explicit user requested readahead.
if (readahead_size > 0) {
rep->CreateFilePrefetchBufferIfNotExists(
readahead_size, readahead_size, &prefetch_buffer_,
/*implicit_auto_readahead=*/false, /*num_file_reads=*/0,
/*num_file_reads_for_auto_readahead=*/0, readaheadsize_cb,
readahead_params, &prefetch_buffer_, readaheadsize_cb,
/*usage=*/FilePrefetchBufferUsage::kUserScanPrefetch);
return;
}
@ -75,14 +79,17 @@ void BlockPrefetcher::PrefetchIfNeeded(
initial_auto_readahead_size_ = max_auto_readahead_size;
}
readahead_params.initial_readahead_size = initial_auto_readahead_size_;
readahead_params.max_readahead_size = max_auto_readahead_size;
readahead_params.implicit_auto_readahead = true;
readahead_params.num_file_reads_for_auto_readahead =
rep->table_options.num_file_reads_for_auto_readahead;
// In case of no_sequential_checking, it will skip the num_file_reads_ and
// will always creates the FilePrefetchBuffer.
if (no_sequential_checking) {
rep->CreateFilePrefetchBufferIfNotExists(
initial_auto_readahead_size_, max_auto_readahead_size,
&prefetch_buffer_, /*implicit_auto_readahead=*/true,
/*num_file_reads=*/0,
rep->table_options.num_file_reads_for_auto_readahead, readaheadsize_cb,
readahead_params, &prefetch_buffer_, readaheadsize_cb,
/*usage=*/FilePrefetchBufferUsage::kUserScanPrefetch);
return;
}
@ -109,11 +116,10 @@ void BlockPrefetcher::PrefetchIfNeeded(
return;
}
readahead_params.num_file_reads = num_file_reads_;
if (rep->file->use_direct_io()) {
rep->CreateFilePrefetchBufferIfNotExists(
initial_auto_readahead_size_, max_auto_readahead_size,
&prefetch_buffer_, /*implicit_auto_readahead=*/true, num_file_reads_,
rep->table_options.num_file_reads_for_auto_readahead, readaheadsize_cb,
readahead_params, &prefetch_buffer_, readaheadsize_cb,
/*usage=*/FilePrefetchBufferUsage::kUserScanPrefetch);
return;
}
@ -133,9 +139,7 @@ void BlockPrefetcher::PrefetchIfNeeded(
BlockBasedTable::BlockSizeWithTrailer(handle) + readahead_size_);
if (s.IsNotSupported()) {
rep->CreateFilePrefetchBufferIfNotExists(
initial_auto_readahead_size_, max_auto_readahead_size,
&prefetch_buffer_, /*implicit_auto_readahead=*/true, num_file_reads_,
rep->table_options.num_file_reads_for_auto_readahead, readaheadsize_cb,
readahead_params, &prefetch_buffer_, readaheadsize_cb,
/*usage=*/FilePrefetchBufferUsage::kUserScanPrefetch);
return;
}

View File

@ -22,7 +22,8 @@ class BlockPrefetcher {
const BlockBasedTable::Rep* rep, const BlockHandle& handle,
size_t readahead_size, bool is_for_compaction,
const bool no_sequential_checking, const ReadOptions& read_options,
const std::function<void(bool, uint64_t&, uint64_t&)>& readaheadsize_cb);
const std::function<void(bool, uint64_t&, uint64_t&)>& readaheadsize_cb,
bool is_async_io_prefetch);
FilePrefetchBuffer* prefetch_buffer() { return prefetch_buffer_.get(); }
void UpdateReadPattern(const uint64_t& offset, const size_t& len) {

View File

@ -495,11 +495,9 @@ Status PartitionedFilterBlockReader::CacheDependencies(
std::unique_ptr<FilePrefetchBuffer> prefetch_buffer;
if (tail_prefetch_buffer == nullptr || !tail_prefetch_buffer->Enabled() ||
tail_prefetch_buffer->GetPrefetchOffset() > prefetch_off) {
rep->CreateFilePrefetchBuffer(
0, 0, &prefetch_buffer, false /* Implicit autoreadahead */,
0 /*num_reads_*/, 0 /*num_file_reads_for_auto_readahead*/,
/*readaheadsize_cb*/ nullptr,
/*usage=*/FilePrefetchBufferUsage::kUnknown);
rep->CreateFilePrefetchBuffer(ReadaheadParams(), &prefetch_buffer,
/*readaheadsize_cb*/ nullptr,
/*usage=*/FilePrefetchBufferUsage::kUnknown);
IOOptions opts;
s = rep->file->PrepareIOOptions(ro, opts);

View File

@ -92,7 +92,7 @@ void PartitionedIndexIterator::InitPartitionedIndexBlock() {
block_prefetcher_.PrefetchIfNeeded(
rep, partitioned_index_handle, read_options_.readahead_size,
is_for_compaction, /*no_sequential_checking=*/false, read_options_,
/*readaheadsize_cb=*/nullptr);
/*readaheadsize_cb=*/nullptr, /*is_async_io_prefetch=*/false);
Status s;
table_->NewDataBlockIterator<IndexBlockIter>(
read_options_, partitioned_index_handle, &block_iter_,

View File

@ -167,11 +167,9 @@ Status PartitionIndexReader::CacheDependencies(
std::unique_ptr<FilePrefetchBuffer> prefetch_buffer;
if (tail_prefetch_buffer == nullptr || !tail_prefetch_buffer->Enabled() ||
tail_prefetch_buffer->GetPrefetchOffset() > prefetch_off) {
rep->CreateFilePrefetchBuffer(
0, 0, &prefetch_buffer, false /*Implicit auto readahead*/,
0 /*num_reads_*/, 0 /*num_file_reads_for_auto_readahead*/,
/*readaheadsize_cb*/ nullptr,
/*usage=*/FilePrefetchBufferUsage::kUnknown);
rep->CreateFilePrefetchBuffer(ReadaheadParams(), &prefetch_buffer,
/*readaheadsize_cb*/ nullptr,
/*usage=*/FilePrefetchBufferUsage::kUnknown);
IOOptions opts;
{
Status s = rep->file->PrepareIOOptions(ro, opts);

View File

@ -76,16 +76,9 @@ inline bool BlockFetcher::TryGetFromPrefetchBuffer() {
IOOptions opts;
IOStatus io_s = file_->PrepareIOOptions(read_options_, opts);
if (io_s.ok()) {
bool read_from_prefetch_buffer = false;
if (read_options_.async_io && !for_compaction_) {
read_from_prefetch_buffer = prefetch_buffer_->TryReadFromCacheAsync(
opts, file_, handle_.offset(), block_size_with_trailer_, &slice_,
&io_s);
} else {
read_from_prefetch_buffer = prefetch_buffer_->TryReadFromCache(
opts, file_, handle_.offset(), block_size_with_trailer_, &slice_,
&io_s, for_compaction_);
}
bool read_from_prefetch_buffer = prefetch_buffer_->TryReadFromCache(
opts, file_, handle_.offset(), block_size_with_trailer_, &slice_,
&io_s, for_compaction_);
if (read_from_prefetch_buffer) {
ProcessTrailerIfPresent();
if (!io_status_.ok()) {

View File

@ -103,9 +103,9 @@ Status SstFileDumper::GetTableReader(const std::string& file_path) {
file_.reset(new RandomAccessFileReader(std::move(file), file_path));
FilePrefetchBuffer prefetch_buffer(
0 /* readahead_size */, 0 /* max_readahead_size */,
!fopts.use_mmap_reads /* enable */, false /* track_min_offset */);
FilePrefetchBuffer prefetch_buffer(ReadaheadParams(),
!fopts.use_mmap_reads /* enable */,
false /* track_min_offset */);
if (s.ok()) {
const uint64_t kSstDumpTailPrefetchSize = 512 * 1024;
uint64_t prefetch_size = (file_size > kSstDumpTailPrefetchSize)

View File

@ -3222,8 +3222,6 @@ TEST_P(BlockBasedTableTest, BlockCacheLookupSeqScans) {
Slice* ub_ptr = &ub;
read_options.iterate_upper_bound = ub_ptr;
read_options.readahead_size = 16384;
uint64_t buffer_offset;
size_t buffer_len;
// Test various functionalities -
// 5 blocks prefetched - Current + 4 additional (readahead_size).
@ -3255,13 +3253,14 @@ TEST_P(BlockBasedTableTest, BlockCacheLookupSeqScans) {
FilePrefetchBuffer* prefetch_buffer =
(reinterpret_cast<BlockBasedTableIterator*>(iter.get()))
->prefetch_buffer();
prefetch_buffer->TEST_GetBufferOffsetandSize(0, buffer_offset,
buffer_len);
std::vector<std::pair<uint64_t, size_t>> buffer_info(1);
prefetch_buffer->TEST_GetBufferOffsetandSize(buffer_info);
bbt->TEST_GetDataBlockHandle(read_options, kv_iter->first, block_handle);
// It won't prefetch the data of cache hit.
// One block data.
ASSERT_EQ(buffer_len, 4096);
ASSERT_EQ(buffer_offset, block_handle.offset());
ASSERT_EQ(buffer_info[0].second, 4096);
ASSERT_EQ(buffer_info[0].first, block_handle.offset());
ASSERT_EQ(options.statistics->getAndResetTickerCount(READAHEAD_TRIMMED),
1);
@ -3292,14 +3291,14 @@ TEST_P(BlockBasedTableTest, BlockCacheLookupSeqScans) {
FilePrefetchBuffer* prefetch_buffer =
(reinterpret_cast<BlockBasedTableIterator*>(iter.get()))
->prefetch_buffer();
prefetch_buffer->TEST_GetBufferOffsetandSize(0, buffer_offset,
buffer_len);
std::vector<std::pair<uint64_t, size_t>> buffer_info(1);
prefetch_buffer->TEST_GetBufferOffsetandSize(buffer_info);
bbt->TEST_GetDataBlockHandle(read_options, kv_iter->first, block_handle);
// It won't prefetch the data of cache hit.
// 3 blocks data.
ASSERT_EQ(buffer_len, 12288);
ASSERT_EQ(buffer_offset, block_handle.offset());
ASSERT_EQ(buffer_info[0].second, 12288);
ASSERT_EQ(buffer_info[0].first, block_handle.offset());
for (; kv_iter != kvmap.end() && iter->Valid(); kv_iter++) {
ASSERT_EQ(iter->key(), kv_iter->first);
@ -3313,11 +3312,10 @@ TEST_P(BlockBasedTableTest, BlockCacheLookupSeqScans) {
}
// Second Prefetch.
prefetch_buffer->TEST_GetBufferOffsetandSize(0, buffer_offset,
buffer_len);
prefetch_buffer->TEST_GetBufferOffsetandSize(buffer_info);
bbt->TEST_GetDataBlockHandle(read_options, kv_iter->first, block_handle);
ASSERT_EQ(buffer_offset, 106496);
ASSERT_EQ(buffer_offset, block_handle.offset());
ASSERT_EQ(buffer_info[0].second, 20480);
ASSERT_EQ(buffer_info[0].first, block_handle.offset());
ASSERT_EQ(options.statistics->getAndResetTickerCount(READAHEAD_TRIMMED),
1);
@ -3366,8 +3364,6 @@ TEST_P(BlockBasedTableTest, BlockCacheLookupAsyncScansSeek) {
read_options.iterate_upper_bound = ub_ptr;
read_options.readahead_size = 16384;
read_options.async_io = true;
uint64_t buffer_offset;
size_t buffer_len;
// Test Various functionalities -
// 3 blocks prefetched - Current + 2 additional (readahead_size/2).
@ -3403,14 +3399,13 @@ TEST_P(BlockBasedTableTest, BlockCacheLookupAsyncScansSeek) {
FilePrefetchBuffer* prefetch_buffer =
(reinterpret_cast<BlockBasedTableIterator*>(iter.get()))
->prefetch_buffer();
prefetch_buffer->TEST_GetBufferOffsetandSize(0, buffer_offset,
buffer_len);
std::vector<std::pair<uint64_t, size_t>> buffer_info(2);
prefetch_buffer->TEST_GetBufferOffsetandSize(buffer_info);
bbt->TEST_GetDataBlockHandle(read_options, kv_iter->first, block_handle);
ASSERT_EQ(buffer_len, 4096);
ASSERT_EQ(buffer_offset, block_handle.offset());
prefetch_buffer->TEST_GetBufferOffsetandSize(1, buffer_offset,
buffer_len);
ASSERT_EQ(buffer_len, 0);
ASSERT_EQ(buffer_info[0].first, block_handle.offset());
ASSERT_EQ(buffer_info[0].second, 4096);
ASSERT_EQ(buffer_info[1].second, 0);
ASSERT_EQ(options.statistics->getAndResetTickerCount(READAHEAD_TRIMMED),
2);
@ -3443,23 +3438,21 @@ TEST_P(BlockBasedTableTest, BlockCacheLookupAsyncScansSeek) {
FilePrefetchBuffer* prefetch_buffer =
(reinterpret_cast<BlockBasedTableIterator*>(iter.get()))
->prefetch_buffer();
std::vector<std::pair<uint64_t, size_t>> buffer_info(2);
prefetch_buffer->TEST_GetBufferOffsetandSize(buffer_info);
{
// 1st Buffer Verification.
prefetch_buffer->TEST_GetBufferOffsetandSize(0, buffer_offset,
buffer_len);
bbt->TEST_GetDataBlockHandle(read_options, kv_iter->first,
block_handle);
ASSERT_EQ(buffer_len, 8192);
ASSERT_EQ(buffer_offset, block_handle.offset());
ASSERT_EQ(buffer_info[0].first, block_handle.offset());
ASSERT_EQ(buffer_info[0].second, 8192);
// 2nd Buffer Verification.
prefetch_buffer->TEST_GetBufferOffsetandSize(1, buffer_offset,
buffer_len);
InternalKey ikey_tmp("00000360", 0, kTypeValue);
bbt->TEST_GetDataBlockHandle(read_options, ikey_tmp.Encode().ToString(),
block_handle);
ASSERT_EQ(buffer_len, 8192);
ASSERT_EQ(buffer_offset, block_handle.offset());
ASSERT_EQ(buffer_info[1].first, block_handle.offset());
ASSERT_EQ(buffer_info[1].second, 8192);
ASSERT_EQ(options.statistics->getAndResetTickerCount(READAHEAD_TRIMMED),
1);
@ -3496,23 +3489,23 @@ TEST_P(BlockBasedTableTest, BlockCacheLookupAsyncScansSeek) {
FilePrefetchBuffer* prefetch_buffer =
(reinterpret_cast<BlockBasedTableIterator*>(iter.get()))
->prefetch_buffer();
{
std::vector<std::pair<uint64_t, size_t>> buffer_info(2);
prefetch_buffer->TEST_GetBufferOffsetandSize(buffer_info);
// 1st Buffer Verification.
prefetch_buffer->TEST_GetBufferOffsetandSize(0, buffer_offset,
buffer_len);
bbt->TEST_GetDataBlockHandle(read_options, kv_iter->first,
block_handle);
ASSERT_EQ(buffer_len, 8192);
ASSERT_EQ(buffer_offset, block_handle.offset());
ASSERT_EQ(buffer_info[0].first, block_handle.offset());
ASSERT_EQ(buffer_info[0].second, 8192);
// 2nd Buffer Verification.
prefetch_buffer->TEST_GetBufferOffsetandSize(1, buffer_offset,
buffer_len);
InternalKey ikey_tmp("00000540", 0, kTypeValue);
bbt->TEST_GetDataBlockHandle(read_options, ikey_tmp.Encode().ToString(),
block_handle);
ASSERT_EQ(buffer_len, 8192);
ASSERT_EQ(buffer_offset, block_handle.offset());
ASSERT_EQ(buffer_info[1].first, block_handle.offset());
ASSERT_EQ(buffer_info[1].second, 8192);
ASSERT_EQ(options.statistics->getAndResetTickerCount(READAHEAD_TRIMMED),
1);
@ -3532,23 +3525,21 @@ TEST_P(BlockBasedTableTest, BlockCacheLookupAsyncScansSeek) {
}
{
std::vector<std::pair<uint64_t, size_t>> buffer_info(2);
prefetch_buffer->TEST_GetBufferOffsetandSize(buffer_info);
// 1st Buffer Verification.
// curr buffer - 1.
prefetch_buffer->TEST_GetBufferOffsetandSize(1, buffer_offset,
buffer_len);
bbt->TEST_GetDataBlockHandle(read_options, kv_iter->first,
block_handle);
ASSERT_EQ(buffer_offset, block_handle.offset());
ASSERT_EQ(buffer_len, 8192);
ASSERT_EQ(buffer_info[0].first, block_handle.offset());
ASSERT_EQ(buffer_info[0].second, 8192);
// 2nd Buffer Verification.
prefetch_buffer->TEST_GetBufferOffsetandSize(0, buffer_offset,
buffer_len);
InternalKey ikey_tmp("00000585", 0, kTypeValue);
bbt->TEST_GetDataBlockHandle(read_options, ikey_tmp.Encode().ToString(),
block_handle);
ASSERT_EQ(buffer_len, 4096);
ASSERT_EQ(buffer_offset, block_handle.offset());
ASSERT_EQ(buffer_info[1].first, block_handle.offset());
ASSERT_EQ(buffer_info[1].second, 4096);
ASSERT_EQ(options.statistics->getAndResetTickerCount(READAHEAD_TRIMMED),
1);
@ -3568,23 +3559,21 @@ TEST_P(BlockBasedTableTest, BlockCacheLookupAsyncScansSeek) {
}
{
std::vector<std::pair<uint64_t, size_t>> buffer_info(2);
prefetch_buffer->TEST_GetBufferOffsetandSize(buffer_info);
// 1st Buffer Verification.
// curr buffer - 0.
prefetch_buffer->TEST_GetBufferOffsetandSize(0, buffer_offset,
buffer_len);
bbt->TEST_GetDataBlockHandle(read_options, kv_iter->first,
block_handle);
ASSERT_EQ(buffer_offset, block_handle.offset());
ASSERT_EQ(buffer_len, 4096);
ASSERT_EQ(buffer_info[0].first, block_handle.offset());
ASSERT_EQ(buffer_info[0].second, 4096);
// 2nd Buffer Verification.
prefetch_buffer->TEST_GetBufferOffsetandSize(1, buffer_offset,
buffer_len);
InternalKey ikey_tmp("00000615", 0, kTypeValue);
bbt->TEST_GetDataBlockHandle(read_options, ikey_tmp.Encode().ToString(),
block_handle);
ASSERT_EQ(buffer_len, 4096);
ASSERT_EQ(buffer_offset, block_handle.offset());
ASSERT_EQ(buffer_info[1].first, block_handle.offset());
ASSERT_EQ(buffer_info[1].second, 4096);
ASSERT_EQ(options.statistics->getAndResetTickerCount(READAHEAD_TRIMMED),
1);
@ -3604,23 +3593,21 @@ TEST_P(BlockBasedTableTest, BlockCacheLookupAsyncScansSeek) {
}
{
std::vector<std::pair<uint64_t, size_t>> buffer_info(2);
prefetch_buffer->TEST_GetBufferOffsetandSize(buffer_info);
// 1st Buffer Verification.
// curr_ - 1.
prefetch_buffer->TEST_GetBufferOffsetandSize(1, buffer_offset,
buffer_len);
ASSERT_EQ(buffer_len, 4096);
bbt->TEST_GetDataBlockHandle(read_options, kv_iter->first,
block_handle);
ASSERT_EQ(buffer_offset, block_handle.offset());
ASSERT_EQ(buffer_info[0].first, block_handle.offset());
ASSERT_EQ(buffer_info[0].second, 4096);
// 2nd Buffer Verification.
prefetch_buffer->TEST_GetBufferOffsetandSize(0, buffer_offset,
buffer_len);
InternalKey ikey_tmp("00000630", 0, kTypeValue);
bbt->TEST_GetDataBlockHandle(read_options, ikey_tmp.Encode().ToString(),
block_handle);
ASSERT_EQ(buffer_len, 8192);
ASSERT_EQ(buffer_offset, block_handle.offset());
ASSERT_EQ(buffer_info[1].first, block_handle.offset());
ASSERT_EQ(buffer_info[1].second, 8192);
ASSERT_EQ(options.statistics->getAndResetTickerCount(READAHEAD_TRIMMED),
0);
@ -5889,8 +5876,8 @@ TEST_F(BBTTailPrefetchTest, TestTailPrefetchStats) {
TEST_F(BBTTailPrefetchTest, FilePrefetchBufferMinOffset) {
TailPrefetchStats tpstats;
FilePrefetchBuffer buffer(0 /* readahead_size */, 0 /* max_readahead_size */,
false /* enable */, true /* track_min_offset */);
FilePrefetchBuffer buffer(ReadaheadParams(), false /* enable */,
true /* track_min_offset */);
IOOptions opts;
buffer.TryReadFromCache(opts, nullptr /* reader */, 500 /* offset */,
10 /* n */, nullptr /* result */,