From 26b480609c4cd69784d731012a7d4ee6ce023c5c Mon Sep 17 00:00:00 2001 From: Andrew Chang Date: Thu, 21 Nov 2024 12:32:13 -0800 Subject: [PATCH] Update FilePrefetchBuffer::Read to reuse file system buffer when possible (#13118) Summary: This PR adds support for reusing the file system provided buffer to avoid an extra `memcpy` into RockDB's buffer. This optimization has already been implemented for point lookups, as well as compaction and scan reads _when prefetching is disabled_. This PR extends this optimization to work with synchronous prefetching (`num_buffers == 1`). Asynchronous prefetching can be addressed in a future PR (and probably should be to keep this PR from growing too large). Remarks - To handle the case where the main buffer only has part of the requested data, I used the existing `overlap_buf_` (currently used in the async prefetching case) instead of defining a separate buffer. This was discussed in https://github.com/facebook/rocksdb/pull/13118#discussion_r1842839360. - We use `MultiRead` with a single request to take advantage of the file system buffer. This is consistent with previous work (e.g. https://github.com/facebook/rocksdb/pull/12266). - Even without the tests I added, there was some code coverage inside in at least `DBIOCorruptionTest.IterReadCorruptionRetry`, since those tests were failing before I addressed a bug in my code for this PR. [Run with failed test](https://github.com/facebook/rocksdb/actions/runs/11708830448/job/32611508818?pr=13118). - This prefetching code is not too easy to follow, so I added quite a bit of comments to both the code and test case to try to make it easier to understand the exact internal state of the prefetch buffer at every point in time. Pull Request resolved: https://github.com/facebook/rocksdb/pull/13118 Test Plan: I wrote pretty thorough unit tests that cover synchronous prefetching with file system buffer reuse. The flows for partial hits, complete hits, and complete misses are tested. I also parametrized the test to make sure the async prefetching (without file system buffer reuse) still work as expected. Once we agree on the changes, I will run a long stress test before merging. Reviewed By: anand1976 Differential Revision: D65559101 Pulled By: archang19 fbshipit-source-id: 1a56d846e918c20a009b83f1371c1791f69849ae --- file/file_prefetch_buffer.cc | 166 +++++-- file/file_prefetch_buffer.h | 111 ++++- file/prefetch_test.cc | 420 ++++++++++++++++++ file/random_access_file_reader.cc | 7 +- file/random_access_file_reader.h | 2 +- table/table_test.cc | 74 +-- .../reuse_file_system_buffer_prefetch.md | 1 + util/aligned_buffer.h | 25 +- 8 files changed, 693 insertions(+), 113 deletions(-) create mode 100644 unreleased_history/performance_improvements/reuse_file_system_buffer_prefetch.md diff --git a/file/file_prefetch_buffer.cc b/file/file_prefetch_buffer.cc index 73050aed79..764f450c03 100644 --- a/file/file_prefetch_buffer.cc +++ b/file/file_prefetch_buffer.cc @@ -22,11 +22,9 @@ namespace ROCKSDB_NAMESPACE { -void FilePrefetchBuffer::PrepareBufferForRead(BufferInfo* buf, size_t alignment, - uint64_t offset, - size_t roundup_len, - bool refit_tail, - uint64_t& aligned_useful_len) { +void FilePrefetchBuffer::PrepareBufferForRead( + BufferInfo* buf, size_t alignment, uint64_t offset, size_t roundup_len, + bool refit_tail, bool use_fs_buffer, uint64_t& aligned_useful_len) { uint64_t aligned_useful_offset_in_buf = 0; bool copy_data_to_new_buffer = false; // Check if requested bytes are in the existing buffer_. @@ -39,6 +37,9 @@ void FilePrefetchBuffer::PrepareBufferForRead(BufferInfo* buf, size_t alignment, // new buffer is created. aligned_useful_offset_in_buf = Rounddown(static_cast(offset - buf->offset_), alignment); + // aligned_useful_len is passed by reference and used to calculate how much + // data needs to be read, so it is needed regardless of whether + // use_fs_buffer is true aligned_useful_len = static_cast(buf->CurrentSize()) - aligned_useful_offset_in_buf; assert(aligned_useful_offset_in_buf % alignment == 0); @@ -53,6 +54,16 @@ void FilePrefetchBuffer::PrepareBufferForRead(BufferInfo* buf, size_t alignment, } } + // The later buffer allocation / tail refitting does not apply when + // use_fs_buffer is true. If we allocate a new buffer, we end up throwing it + // away later when we reuse the file system allocated buffer. If we refit + // the tail in the main buffer, we don't have a place to put the next chunk of + // data provided by the file system (without performing another copy, which we + // are trying to avoid in the first place) + if (use_fs_buffer) { + return; + } + // Create a new buffer only if current capacity is not sufficient, and memcopy // bytes from old buffer if needed (i.e., if aligned_useful_len is greater // than 0). @@ -62,8 +73,8 @@ void FilePrefetchBuffer::PrepareBufferForRead(BufferInfo* buf, size_t alignment, static_cast(roundup_len), copy_data_to_new_buffer, aligned_useful_offset_in_buf, static_cast(aligned_useful_len)); } else if (aligned_useful_len > 0 && refit_tail) { - // New buffer not needed. But memmove bytes from tail to the beginning since - // aligned_useful_len is greater than 0. + // New buffer not needed. But memmove bytes from tail to the beginning + // since aligned_useful_len is greater than 0. buf->buffer_.RefitTail(static_cast(aligned_useful_offset_in_buf), static_cast(aligned_useful_len)); } else if (aligned_useful_len > 0) { @@ -84,9 +95,18 @@ Status FilePrefetchBuffer::Read(BufferInfo* buf, const IOOptions& opts, uint64_t read_len, uint64_t aligned_useful_len, uint64_t start_offset) { Slice result; - char* to_buf = buf->buffer_.BufferStart() + aligned_useful_len; - Status s = reader->Read(opts, start_offset + aligned_useful_len, read_len, - &result, to_buf, /*aligned_buf=*/nullptr); + Status s; + char* to_buf = nullptr; + bool use_fs_buffer = UseFSBuffer(reader); + if (use_fs_buffer) { + s = FSBufferDirectRead(reader, buf, opts, start_offset + aligned_useful_len, + read_len, result); + } else { + to_buf = buf->buffer_.BufferStart() + aligned_useful_len; + s = reader->Read(opts, start_offset + aligned_useful_len, read_len, &result, + to_buf, /*aligned_buf=*/nullptr); + } + #ifndef NDEBUG if (result.size() < read_len) { // Fake an IO error to force db_stress fault injection to ignore @@ -97,7 +117,7 @@ Status FilePrefetchBuffer::Read(BufferInfo* buf, const IOOptions& opts, if (!s.ok()) { return s; } - if (result.data() != to_buf) { + if (!use_fs_buffer && result.data() != to_buf) { // If the read is coming from some other buffer already in memory (such as // mmap) then it would be inefficient to create another copy in this // FilePrefetchBuffer. The caller is expected to exclude this case. @@ -108,8 +128,11 @@ Status FilePrefetchBuffer::Read(BufferInfo* buf, const IOOptions& opts, if (usage_ == FilePrefetchBufferUsage::kUserScanPrefetch) { RecordTick(stats_, PREFETCH_BYTES, read_len); } - // Update the buffer size. - buf->buffer_.Size(static_cast(aligned_useful_len) + result.size()); + if (!use_fs_buffer) { + // Update the buffer size. + // We already explicitly set the buffer size when we reuse the FS buffer + buf->buffer_.Size(static_cast(aligned_useful_len) + result.size()); + } return s; } @@ -157,13 +180,14 @@ Status FilePrefetchBuffer::Prefetch(const IOOptions& opts, return Status::OK(); } - size_t alignment = reader->file()->GetRequiredBufferAlignment(); + size_t alignment = GetRequiredBufferAlignment(reader); uint64_t rounddown_offset = offset, roundup_end = 0, aligned_useful_len = 0; size_t read_len = 0; + bool use_fs_buffer = UseFSBuffer(reader); ReadAheadSizeTuning(buf, /*read_curr_block=*/true, - /*refit_tail=*/true, rounddown_offset, alignment, 0, n, - rounddown_offset, roundup_end, read_len, + /*refit_tail=*/true, use_fs_buffer, rounddown_offset, + alignment, 0, n, rounddown_offset, roundup_end, read_len, aligned_useful_len); Status s; @@ -178,12 +202,13 @@ Status FilePrefetchBuffer::Prefetch(const IOOptions& opts, } // Copy data from src to overlap_buf_. -void FilePrefetchBuffer::CopyDataToBuffer(BufferInfo* src, uint64_t& offset, - size_t& length) { +void FilePrefetchBuffer::CopyDataToOverlapBuffer(BufferInfo* src, + uint64_t& offset, + size_t& length) { if (length == 0) { return; } - + assert(src->IsOffsetInBuffer(offset)); uint64_t copy_offset = (offset - src->offset_); size_t copy_len = 0; if (src->IsDataBlockInBuffer(offset, length)) { @@ -194,10 +219,8 @@ void FilePrefetchBuffer::CopyDataToBuffer(BufferInfo* src, uint64_t& offset, } BufferInfo* dst = overlap_buf_; - memcpy(dst->buffer_.BufferStart() + dst->CurrentSize(), - src->buffer_.BufferStart() + copy_offset, copy_len); - - dst->buffer_.Size(dst->CurrentSize() + copy_len); + assert(copy_len <= dst->buffer_.Capacity() - dst->buffer_.CurrentSize()); + dst->buffer_.Append(src->buffer_.BufferStart() + copy_offset, copy_len); // Update offset and length. offset += copy_len; @@ -208,6 +231,7 @@ void FilePrefetchBuffer::CopyDataToBuffer(BufferInfo* src, uint64_t& offset, if (length > 0) { FreeFrontBuffer(); } + TEST_SYNC_POINT("FilePrefetchBuffer::CopyDataToOverlapBuffer:Complete"); } // Clear the buffers if it contains outdated data. Outdated data can be because @@ -355,7 +379,7 @@ void FilePrefetchBuffer::PollIfNeeded(uint64_t offset, size_t length) { // of ReadAsync to make sure it doesn't read anything from // previous buffer which is already prefetched. void FilePrefetchBuffer::ReadAheadSizeTuning( - BufferInfo* buf, bool read_curr_block, bool refit_tail, + BufferInfo* buf, bool read_curr_block, bool refit_tail, bool use_fs_buffer, uint64_t prev_buf_end_offset, size_t alignment, size_t length, size_t readahead_size, uint64_t& start_offset, uint64_t& end_offset, size_t& read_len, uint64_t& aligned_useful_len) { @@ -408,7 +432,7 @@ void FilePrefetchBuffer::ReadAheadSizeTuning( uint64_t roundup_len = end_offset - start_offset; PrepareBufferForRead(buf, alignment, start_offset, roundup_len, refit_tail, - aligned_useful_len); + use_fs_buffer, aligned_useful_len); assert(roundup_len >= aligned_useful_len); // Update the buffer offset. @@ -422,11 +446,43 @@ void FilePrefetchBuffer::ReadAheadSizeTuning( (end_offset - start_offset)); } +// This is for when num_buffers_ = 1. +// If we are reusing the file system allocated buffer, and only some of the +// requested data is in the buffer, we copy the relevant data to overlap_buf_ +void FilePrefetchBuffer::HandleOverlappingSyncData(uint64_t offset, + size_t length, + uint64_t& tmp_offset, + size_t& tmp_length, + bool& use_overlap_buffer) { + if (IsBufferQueueEmpty()) { + return; + } + BufferInfo* buf = GetFirstBuffer(); + // We should only be calling this when num_buffers_ = 1, so there should + // not be any async reads. + assert(!buf->async_read_in_progress_); + + if (!buf->async_read_in_progress_ && buf->DoesBufferContainData() && + buf->IsOffsetInBuffer(offset) && + buf->offset_ + buf->CurrentSize() < offset + length) { + // Allocated overlap_buf_ is just enough to hold the result for the user + // Alignment does not matter here + use_overlap_buffer = true; + overlap_buf_->ClearBuffer(); + overlap_buf_->buffer_.Alignment(1); + overlap_buf_->buffer_.AllocateNewBuffer(length); + overlap_buf_->offset_ = offset; + CopyDataToOverlapBuffer(buf, tmp_offset, tmp_length); + UpdateStats(/*found_in_buffer=*/false, overlap_buf_->CurrentSize()); + } +} + +// This is for when num_buffers_ > 1. // If data is overlapping between two buffers then during this call: // - data from first buffer is copied into overlapping buffer, // - first is removed from bufs_ and freed so that it can be used for async // prefetching of further data. -Status FilePrefetchBuffer::HandleOverlappingData( +Status FilePrefetchBuffer::HandleOverlappingAsyncData( const IOOptions& opts, RandomAccessFileReader* reader, uint64_t offset, size_t length, size_t readahead_size, bool& copy_to_overlap_buffer, uint64_t& tmp_offset, size_t& tmp_length) { @@ -436,7 +492,7 @@ Status FilePrefetchBuffer::HandleOverlappingData( } Status s; - size_t alignment = reader->file()->GetRequiredBufferAlignment(); + size_t alignment = GetRequiredBufferAlignment(reader); BufferInfo* buf = GetFirstBuffer(); @@ -470,7 +526,7 @@ Status FilePrefetchBuffer::HandleOverlappingData( overlap_buf_->offset_ = offset; copy_to_overlap_buffer = true; - CopyDataToBuffer(buf, tmp_offset, tmp_length); + CopyDataToOverlapBuffer(buf, tmp_offset, tmp_length); UpdateStats(/*found_in_buffer=*/false, overlap_buf_->CurrentSize()); // Call async prefetching on freed buffer since data has been consumed @@ -495,8 +551,8 @@ Status FilePrefetchBuffer::HandleOverlappingData( uint64_t end_offset = start_offset, aligned_useful_len = 0; ReadAheadSizeTuning(new_buf, /*read_curr_block=*/false, - /*refit_tail=*/false, next_buf->offset_ + second_size, - alignment, + /*refit_tail=*/false, /*use_fs_buffer=*/false, + next_buf->offset_ + second_size, alignment, /*length=*/0, readahead_size, start_offset, end_offset, read_len, aligned_useful_len); if (read_len > 0) { @@ -537,7 +593,7 @@ Status FilePrefetchBuffer::PrefetchInternal(const IOOptions& opts, TEST_SYNC_POINT("FilePrefetchBuffer::Prefetch:Start"); - size_t alignment = reader->file()->GetRequiredBufferAlignment(); + size_t alignment = GetRequiredBufferAlignment(reader); Status s; uint64_t tmp_offset = offset; size_t tmp_length = length; @@ -550,12 +606,20 @@ Status FilePrefetchBuffer::PrefetchInternal(const IOOptions& opts, } ClearOutdatedData(offset, length); - // Handle overlapping data over two buffers. - s = HandleOverlappingData(opts, reader, offset, length, readahead_size, - copy_to_overlap_buffer, tmp_offset, tmp_length); + // Handle overlapping data over two buffers (async prefetching case). + s = HandleOverlappingAsyncData(opts, reader, offset, length, readahead_size, + copy_to_overlap_buffer, tmp_offset, + tmp_length); if (!s.ok()) { return s; } + // Handle partially available data when reusing the file system buffer + // and num_buffers_ = 1 (sync prefetching case) + bool use_fs_buffer = UseFSBuffer(reader); + if (!copy_to_overlap_buffer && use_fs_buffer) { + HandleOverlappingSyncData(offset, length, tmp_offset, tmp_length, + copy_to_overlap_buffer); + } AllocateBufferIfEmpty(); BufferInfo* buf = GetFirstBuffer(); @@ -586,8 +650,18 @@ Status FilePrefetchBuffer::PrefetchInternal(const IOOptions& opts, if (copy_to_overlap_buffer) { // Data is overlapping i.e. some of the data has been copied to overlap // buffer and remaining will be updated below. + // Note: why do we not end up performing a duplicate copy when we already + // copy to the overlap buffer in HandleOverlappingAsyncData / + // HandleOverlappingSyncData? The reason is that when we call + // CopyDataToOverlapBuffer, if the buffer is only a "partial hit", then we + // clear it out since it does not have any more useful data once we copy + // to the overlap buffer. Once we reallocate a fresh buffer, that buffer + // will have no data, and it will be the "first" buffer when num_buffers_ + // = 1. When num_buffers_ > 1, we call ClearOutdatedData() so we know + // that, if we get to this point in the control flow, the "front" buffer + // has to have the data we need. size_t initial_buf_size = overlap_buf_->CurrentSize(); - CopyDataToBuffer(buf, offset, length); + CopyDataToOverlapBuffer(buf, offset, length); UpdateStats( /*found_in_buffer=*/false, overlap_buf_->CurrentSize() - initial_buf_size); @@ -636,10 +710,10 @@ Status FilePrefetchBuffer::PrefetchInternal(const IOOptions& opts, UpdateStats(/*found_in_buffer=*/false, (buf->offset_ + buf->CurrentSize() - offset)); } - ReadAheadSizeTuning(buf, /*read_curr_block=*/true, /*refit_tail*/ - true, start_offset1, alignment, length, readahead_size, - start_offset1, end_offset1, read_len1, - aligned_useful_len1); + ReadAheadSizeTuning(buf, /*read_curr_block=*/true, /*refit_tail=*/ + true, /*use_fs_buffer=*/use_fs_buffer, start_offset1, + alignment, length, readahead_size, start_offset1, + end_offset1, read_len1, aligned_useful_len1); } else { UpdateStats(/*found_in_buffer=*/true, original_length); } @@ -662,10 +736,10 @@ Status FilePrefetchBuffer::PrefetchInternal(const IOOptions& opts, } } - // Copy remaining requested bytes to overlap_buffer. No need to update stats - // as data is prefetched during this call. + // Copy remaining requested bytes to overlap_buf_. No need to + // update stats as data is prefetched during this call. if (copy_to_overlap_buffer && length > 0) { - CopyDataToBuffer(buf, offset, length); + CopyDataToOverlapBuffer(buf, offset, length); } return s; } @@ -892,7 +966,7 @@ Status FilePrefetchBuffer::PrefetchAsync(const IOOptions& opts, std::string msg; Status s; - size_t alignment = reader->file()->GetRequiredBufferAlignment(); + size_t alignment = GetRequiredBufferAlignment(reader); size_t readahead_size = is_eligible_for_prefetching ? readahead_size_ / 2 : 0; size_t offset_to_read = static_cast(offset); uint64_t start_offset1 = offset, end_offset1 = 0, aligned_useful_len1 = 0; @@ -915,6 +989,7 @@ Status FilePrefetchBuffer::PrefetchAsync(const IOOptions& opts, // Prefetch full data + readahead_size in the first buffer. if (is_eligible_for_prefetching || reader->use_direct_io()) { ReadAheadSizeTuning(buf, /*read_curr_block=*/true, /*refit_tail=*/false, + /*use_fs_buffer=*/false, /*prev_buf_end_offset=*/start_offset1, alignment, n, readahead_size, start_offset1, end_offset1, read_len1, aligned_useful_len1); @@ -923,7 +998,8 @@ Status FilePrefetchBuffer::PrefetchAsync(const IOOptions& opts, start_offset1 = offset_to_read; end_offset1 = offset_to_read + n; roundup_len1 = end_offset1 - start_offset1; - PrepareBufferForRead(buf, alignment, start_offset1, roundup_len1, false, + PrepareBufferForRead(buf, alignment, start_offset1, roundup_len1, + /*refit_tail=*/false, /*use_fs_buffer=*/false, aligned_useful_len1); assert(aligned_useful_len1 == 0); assert(roundup_len1 >= aligned_useful_len1); @@ -970,7 +1046,7 @@ Status FilePrefetchBuffer::PrefetchRemBuffers(const IOOptions& opts, uint64_t end_offset2 = start_offset2, aligned_useful_len2 = 0; size_t read_len2 = 0; ReadAheadSizeTuning(new_buf, /*read_curr_block=*/false, - /*refit_tail=*/false, + /*refit_tail=*/false, /*use_fs_buffer=*/false, /*prev_buf_end_offset=*/end_offset1, alignment, /*length=*/0, readahead_size, start_offset2, end_offset2, read_len2, aligned_useful_len2); diff --git a/file/file_prefetch_buffer.h b/file/file_prefetch_buffer.h index dfa8389294..833e6e6015 100644 --- a/file/file_prefetch_buffer.h +++ b/file/file_prefetch_buffer.h @@ -15,7 +15,9 @@ #include #include +#include "file/random_access_file_reader.h" #include "file/readahead_file_info.h" +#include "file_util.h" #include "monitoring/statistics_impl.h" #include "port/port.h" #include "rocksdb/env.h" @@ -149,6 +151,9 @@ enum class FilePrefetchBufferUsage { // // If num_buffers_ == 1, it's a sequential read flow. Read API will be called on // that one buffer whenever the data is requested and is not in the buffer. +// When reusing the file system allocated buffer, overlap_buf_ is used if the +// main buffer only contains part of the requested data. It is returned to +// the caller after the remaining data is fetched. // If num_buffers_ > 1, then the data is prefetched asynchronosuly in the // buffers whenever the data is consumed from the buffers and that buffer is // freed. @@ -206,10 +211,15 @@ class FilePrefetchBuffer { assert((num_file_reads_ >= num_file_reads_for_auto_readahead_ + 1) || (num_file_reads_ == 0)); - // If num_buffers_ > 1, data is asynchronously filled in the - // queue. As result, data can be overlapping in two buffers. It copies the - // data to overlap_buf_ in order to to return continuous buffer. - if (num_buffers_ > 1) { + // overlap_buf_ is used whenever the main buffer only has part of the + // requested data. The relevant data is copied into overlap_buf_ and the + // remaining data is copied in later to satisfy the user's request. This is + // used in both the synchronous (num_buffers_ = 1) and asynchronous + // (num_buffers_ > 1) cases. In the asynchronous case, the requested data + // may be spread out over 2 buffers. + if (num_buffers_ > 1 || + (fs_ != nullptr && + CheckFSFeatureSupport(fs_, FSSupportedOps::kFSBuffer))) { overlap_buf_ = new BufferInfo(); } @@ -379,12 +389,21 @@ class FilePrefetchBuffer { void PrefetchAsyncCallback(FSReadRequest& req, void* cb_arg); void TEST_GetBufferOffsetandSize( - std::vector>& buffer_info) { + std::vector>& buffer_info) { for (size_t i = 0; i < bufs_.size(); i++) { - buffer_info[i].first = bufs_[i]->offset_; - buffer_info[i].second = bufs_[i]->async_read_in_progress_ - ? bufs_[i]->async_req_len_ - : bufs_[i]->CurrentSize(); + std::get<0>(buffer_info[i]) = bufs_[i]->offset_; + std::get<1>(buffer_info[i]) = bufs_[i]->async_read_in_progress_ + ? bufs_[i]->async_req_len_ + : bufs_[i]->CurrentSize(); + std::get<2>(buffer_info[i]) = bufs_[i]->async_read_in_progress_; + } + } + + void TEST_GetOverlapBufferOffsetandSize( + std::pair& buffer_info) { + if (overlap_buf_ != nullptr) { + buffer_info.first = overlap_buf_->offset_; + buffer_info.second = overlap_buf_->CurrentSize(); } } @@ -394,7 +413,7 @@ class FilePrefetchBuffer { // required. void PrepareBufferForRead(BufferInfo* buf, size_t alignment, uint64_t offset, size_t roundup_len, bool refit_tail, - uint64_t& aligned_useful_len); + bool use_fs_buffer, uint64_t& aligned_useful_len); void AbortOutdatedIO(uint64_t offset); @@ -418,7 +437,8 @@ class FilePrefetchBuffer { uint64_t start_offset); // Copy the data from src to overlap_buf_. - void CopyDataToBuffer(BufferInfo* src, uint64_t& offset, size_t& length); + void CopyDataToOverlapBuffer(BufferInfo* src, uint64_t& offset, + size_t& length); bool IsBlockSequential(const size_t& offset) { return (prev_len_ == 0 || (prev_offset_ + prev_len_ == offset)); @@ -465,6 +485,50 @@ class FilePrefetchBuffer { return true; } + // Whether we reuse the file system provided buffer + // Until we also handle the async read case, only enable this optimization + // for the synchronous case when num_buffers_ = 1. + bool UseFSBuffer(RandomAccessFileReader* reader) { + return reader->file() != nullptr && !reader->use_direct_io() && + fs_ != nullptr && + CheckFSFeatureSupport(fs_, FSSupportedOps::kFSBuffer) && + num_buffers_ == 1; + } + + // When we are reusing the file system provided buffer, we are not concerned + // with alignment. However, quite a bit of prefetch code incorporates + // alignment, so we can put in 1 to keep the code simpler. + size_t GetRequiredBufferAlignment(RandomAccessFileReader* reader) { + if (UseFSBuffer(reader)) { + return 1; + } + return reader->file()->GetRequiredBufferAlignment(); + } + + // Reuses the file system allocated buffer to avoid an extra copy + IOStatus FSBufferDirectRead(RandomAccessFileReader* reader, BufferInfo* buf, + const IOOptions& opts, uint64_t offset, size_t n, + Slice& result) { + FSReadRequest read_req; + read_req.offset = offset; + read_req.len = n; + read_req.scratch = nullptr; + IOStatus s = reader->MultiRead(opts, &read_req, 1, nullptr); + if (!s.ok()) { + return s; + } + s = read_req.status; + if (!s.ok()) { + return s; + } + buf->buffer_.SetBuffer(read_req.result.size(), + std::move(read_req.fs_scratch)); + buf->offset_ = offset; + buf->initial_end_offset_ = offset + read_req.result.size(); + result = read_req.result; + return s; + } + void DestroyAndClearIOHandle(BufferInfo* buf) { if (buf->io_handle_ != nullptr && buf->del_fn_ != nullptr) { buf->del_fn_(buf->io_handle_); @@ -474,11 +538,16 @@ class FilePrefetchBuffer { buf->async_read_in_progress_ = false; } - Status HandleOverlappingData(const IOOptions& opts, - RandomAccessFileReader* reader, uint64_t offset, - size_t length, size_t readahead_size, - bool& copy_to_third_buffer, uint64_t& tmp_offset, - size_t& tmp_length); + void HandleOverlappingSyncData(uint64_t offset, size_t length, + uint64_t& tmp_offset, size_t& tmp_length, + bool& use_overlap_buffer); + + Status HandleOverlappingAsyncData(const IOOptions& opts, + RandomAccessFileReader* reader, + uint64_t offset, size_t length, + size_t readahead_size, + bool& copy_to_third_buffer, + uint64_t& tmp_offset, size_t& tmp_length); bool TryReadFromCacheUntracked(const IOOptions& opts, RandomAccessFileReader* reader, @@ -487,11 +556,11 @@ class FilePrefetchBuffer { bool for_compaction = false); void ReadAheadSizeTuning(BufferInfo* buf, bool read_curr_block, - bool refit_tail, uint64_t prev_buf_end_offset, - size_t alignment, size_t length, - size_t readahead_size, uint64_t& offset, - uint64_t& end_offset, size_t& read_len, - uint64_t& aligned_useful_len); + bool refit_tail, bool use_fs_buffer, + uint64_t prev_buf_end_offset, size_t alignment, + size_t length, size_t readahead_size, + uint64_t& offset, uint64_t& end_offset, + size_t& read_len, uint64_t& aligned_useful_len); void UpdateStats(bool found_in_buffer, size_t length_found) { if (found_in_buffer) { diff --git a/file/prefetch_test.cc b/file/prefetch_test.cc index 62d44be544..534d0e22d6 100644 --- a/file/prefetch_test.cc +++ b/file/prefetch_test.cc @@ -3290,6 +3290,426 @@ TEST_F(FilePrefetchBufferTest, SyncReadaheadStats) { /* 24576(end offset of the buffer) - 16000(requested offset) =*/8576); } +class FSBufferPrefetchTest : public testing::Test, + public ::testing::WithParamInterface { + public: + // Mock file system supporting the kFSBuffer buffer reuse operation + class BufferReuseFS : public FileSystemWrapper { + public: + explicit BufferReuseFS(const std::shared_ptr& _target) + : FileSystemWrapper(_target) {} + ~BufferReuseFS() override {} + const char* Name() const override { return "BufferReuseFS"; } + + IOStatus NewRandomAccessFile(const std::string& fname, + const FileOptions& opts, + std::unique_ptr* result, + IODebugContext* dbg) override { + class WrappedRandomAccessFile : public FSRandomAccessFileOwnerWrapper { + public: + explicit WrappedRandomAccessFile( + std::unique_ptr& file) + : FSRandomAccessFileOwnerWrapper(std::move(file)) {} + + IOStatus MultiRead(FSReadRequest* reqs, size_t num_reqs, + const IOOptions& options, + IODebugContext* dbg) override { + for (size_t i = 0; i < num_reqs; ++i) { + FSReadRequest& req = reqs[i]; + FSAllocationPtr buffer(new char[req.len], [](void* ptr) { + delete[] static_cast(ptr); + }); + req.fs_scratch = std::move(buffer); + req.status = Read(req.offset, req.len, options, &req.result, + static_cast(req.fs_scratch.get()), dbg); + } + return IOStatus::OK(); + } + }; + + std::unique_ptr file; + IOStatus s = target()->NewRandomAccessFile(fname, opts, &file, dbg); + EXPECT_OK(s); + result->reset(new WrappedRandomAccessFile(file)); + return s; + } + + void SupportedOps(int64_t& supported_ops) override { + supported_ops = 1 << FSSupportedOps::kAsyncIO; + supported_ops |= 1 << FSSupportedOps::kFSBuffer; + } + }; + + void SetUp() override { + SetupSyncPointsToMockDirectIO(); + env_ = Env::Default(); + bool use_async_prefetch = GetParam(); + if (use_async_prefetch) { + fs_ = FileSystem::Default(); + } else { + fs_ = std::make_shared(FileSystem::Default()); + } + + test_dir_ = test::PerThreadDBPath("fs_buffer_prefetch_test"); + ASSERT_OK(fs_->CreateDir(test_dir_, IOOptions(), nullptr)); + stats_ = CreateDBStatistics(); + } + + void TearDown() override { EXPECT_OK(DestroyDir(env_, test_dir_)); } + + void Write(const std::string& fname, const std::string& content) { + std::unique_ptr f; + ASSERT_OK(fs_->NewWritableFile(Path(fname), FileOptions(), &f, nullptr)); + ASSERT_OK(f->Append(content, IOOptions(), nullptr)); + ASSERT_OK(f->Close(IOOptions(), nullptr)); + } + + void Read(const std::string& fname, const FileOptions& opts, + std::unique_ptr* reader) { + std::string fpath = Path(fname); + std::unique_ptr f; + ASSERT_OK(fs_->NewRandomAccessFile(fpath, opts, &f, nullptr)); + reader->reset(new RandomAccessFileReader( + std::move(f), fpath, env_->GetSystemClock().get(), + /*io_tracer=*/nullptr, stats_.get())); + } + + FileSystem* fs() { return fs_.get(); } + Statistics* stats() { return stats_.get(); } + SystemClock* clock() { return env_->GetSystemClock().get(); } + + private: + Env* env_; + std::shared_ptr fs_; + std::string test_dir_; + std::shared_ptr stats_; + + std::string Path(const std::string& fname) { return test_dir_ + "/" + fname; } +}; + +INSTANTIATE_TEST_CASE_P(FSBufferPrefetchTest, FSBufferPrefetchTest, + ::testing::Bool()); + +TEST_P(FSBufferPrefetchTest, FSBufferPrefetchStatsInternals) { + // Check that the main buffer, the overlap_buf_, and the secondary buffer (in + // the case of num_buffers_ > 1) are populated correctly while reading a 32 + // KiB file + std::string fname = "fs-buffer-prefetch-stats-internals"; + Random rand(0); + std::string content = rand.RandomString(32768); + Write(fname, content); + + FileOptions opts; + std::unique_ptr r; + Read(fname, opts, &r); + + std::shared_ptr stats = CreateDBStatistics(); + ReadaheadParams readahead_params; + readahead_params.initial_readahead_size = 8192; + readahead_params.max_readahead_size = 8192; + bool use_async_prefetch = GetParam(); + size_t num_buffers = use_async_prefetch ? 2 : 1; + readahead_params.num_buffers = num_buffers; + + FilePrefetchBuffer fpb(readahead_params, true, false, fs(), clock(), + stats.get()); + + int overlap_buffer_write_ct = 0; + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "FilePrefetchBuffer::CopyDataToOverlapBuffer:Complete", + [&](void* /*arg*/) { overlap_buffer_write_ct++; }); + SyncPoint::GetInstance()->EnableProcessing(); + + Slice result; + // Read 4096 bytes at offset 0. + Status s; + std::vector> buffer_info(num_buffers); + std::pair overlap_buffer_info; + bool could_read_from_cache = + fpb.TryReadFromCache(IOOptions(), r.get(), 0, 4096, &result, &s); + // Platforms that don't have IO uring may not support async IO. + if (use_async_prefetch && s.IsNotSupported()) { + return; + } + ASSERT_TRUE(could_read_from_cache); + ASSERT_EQ(s, Status::OK()); + ASSERT_EQ(stats->getAndResetTickerCount(PREFETCH_HITS), 0); + ASSERT_EQ(stats->getAndResetTickerCount(PREFETCH_BYTES_USEFUL), 0); + ASSERT_EQ(strncmp(result.data(), content.substr(0, 4096).c_str(), 4096), 0); + fpb.TEST_GetOverlapBufferOffsetandSize(overlap_buffer_info); + fpb.TEST_GetBufferOffsetandSize(buffer_info); + if (use_async_prefetch) { + // Cut the readahead of 8192 in half. + // Overlap buffer is not used + ASSERT_EQ(overlap_buffer_info.first, 0); + ASSERT_EQ(overlap_buffer_info.second, 0); + // Buffers: 0-8192, 8192-12288 + ASSERT_EQ(std::get<0>(buffer_info[0]), 0); + ASSERT_EQ(std::get<1>(buffer_info[0]), 4096 + 8192 / 2); + ASSERT_EQ(std::get<0>(buffer_info[1]), 4096 + 8192 / 2); + ASSERT_EQ(std::get<1>(buffer_info[1]), 8192 / 2); + } else { + // Read at offset 0 with length 4096 + 8192 = 12288. + // Overlap buffer is not used + ASSERT_EQ(overlap_buffer_info.first, 0); + ASSERT_EQ(overlap_buffer_info.second, 0); + // Main buffer contains the requested data + the 8192 of prefetched data + ASSERT_EQ(std::get<0>(buffer_info[0]), 0); + ASSERT_EQ(std::get<1>(buffer_info[0]), 4096 + 8192); + } + + // Simulate a block cache hit + fpb.UpdateReadPattern(4096, 4096, false); + ASSERT_TRUE( + fpb.TryReadFromCache(IOOptions(), r.get(), 8192, 8192, &result, &s)); + ASSERT_EQ(s, Status::OK()); + ASSERT_EQ(stats->getAndResetTickerCount(PREFETCH_HITS), 0); + ASSERT_EQ(stats->getAndResetTickerCount(PREFETCH_BYTES_USEFUL), + 4096); // 8192-12288 + ASSERT_EQ(strncmp(result.data(), content.substr(8192, 8192).c_str(), 8192), + 0); + fpb.TEST_GetOverlapBufferOffsetandSize(overlap_buffer_info); + fpb.TEST_GetBufferOffsetandSize(buffer_info); + + if (use_async_prefetch) { + // Our buffers were 0-8192, 8192-12288 at the start so we had some + // overlapping data in the second buffer + // We clean up outdated buffers so 0-8192 gets freed for more prefetching. + // Our remaining buffer 8192-12288 has data that we want, so we can reuse it + // We end up with: 8192-20480, 20480-24576 + ASSERT_EQ(overlap_buffer_info.first, 0); + ASSERT_EQ(overlap_buffer_info.second, 0); + ASSERT_EQ(std::get<0>(buffer_info[0]), 8192); + ASSERT_EQ(std::get<1>(buffer_info[0]), 8192 + 8192 / 2); + ASSERT_EQ(std::get<0>(buffer_info[1]), 8192 + (8192 + 8192 / 2)); + ASSERT_EQ(std::get<1>(buffer_info[1]), 8192 / 2); + } else { + // We only have 0-12288 cached, so reading from 8192-16384 will trigger a + // prefetch up through 16384 + 8192 = 24576. + // Overlap buffer reuses bytes 8192 to 12288 + ASSERT_EQ(overlap_buffer_info.first, 8192); + ASSERT_EQ(overlap_buffer_info.second, 8192); + ASSERT_EQ(overlap_buffer_write_ct, 2); + // We spill to the overlap buffer so the remaining buffer only has the + // missing and prefetched part + ASSERT_EQ(std::get<0>(buffer_info[0]), 12288); + ASSERT_EQ(std::get<1>(buffer_info[0]), 12288); + } + + ASSERT_TRUE( + fpb.TryReadFromCache(IOOptions(), r.get(), 12288, 4096, &result, &s)); + ASSERT_EQ(s, Status::OK()); + ASSERT_EQ(stats->getAndResetTickerCount(PREFETCH_HITS), 1); + ASSERT_EQ(stats->getAndResetTickerCount(PREFETCH_BYTES_USEFUL), + 4096); // 12288-16384 + ASSERT_EQ(strncmp(result.data(), content.substr(12288, 4096).c_str(), 4096), + 0); + fpb.TEST_GetOverlapBufferOffsetandSize(overlap_buffer_info); + fpb.TEST_GetBufferOffsetandSize(buffer_info); + + if (use_async_prefetch) { + // Same as before: 8192-20480, 20480-24576 (cache hit in first buffer) + ASSERT_EQ(overlap_buffer_info.first, 0); + ASSERT_EQ(overlap_buffer_info.second, 0); + ASSERT_EQ(std::get<0>(buffer_info[0]), 8192); + ASSERT_EQ(std::get<1>(buffer_info[0]), 8192 + 8192 / 2); + ASSERT_EQ(std::get<0>(buffer_info[1]), 8192 + (8192 + 8192 / 2)); + ASSERT_EQ(std::get<1>(buffer_info[1]), 8192 / 2); + } else { + // The main buffer has 12288-24576, so 12288-16384 is a cache hit. + // Overlap buffer does not get used + fpb.TEST_GetOverlapBufferOffsetandSize(overlap_buffer_info); + ASSERT_EQ(overlap_buffer_info.first, 8192); + ASSERT_EQ(overlap_buffer_info.second, 8192); + ASSERT_EQ(overlap_buffer_write_ct, 2); + // Main buffer stays the same + ASSERT_EQ(std::get<0>(buffer_info[0]), 12288); + ASSERT_EQ(std::get<1>(buffer_info[0]), 12288); + } + + // Read from 16000-26000 (start and end do not meet normal alignment) + ASSERT_TRUE( + fpb.TryReadFromCache(IOOptions(), r.get(), 16000, 10000, &result, &s)); + ASSERT_EQ(s, Status::OK()); + ASSERT_EQ(stats->getAndResetTickerCount(PREFETCH_HITS), 0); + ASSERT_EQ( + stats->getAndResetTickerCount(PREFETCH_BYTES_USEFUL), + /* 24576(end offset of the buffer) - 16000(requested offset) =*/8576); + ASSERT_EQ(strncmp(result.data(), content.substr(16000, 10000).c_str(), 10000), + 0); + fpb.TEST_GetOverlapBufferOffsetandSize(overlap_buffer_info); + fpb.TEST_GetBufferOffsetandSize(buffer_info); + if (use_async_prefetch) { + // Overlap buffer reuses bytes 16000 to 20480 + ASSERT_EQ(overlap_buffer_info.first, 16000); + ASSERT_EQ(overlap_buffer_info.second, 10000); + // First 2 writes are reusing existing 2 buffers. Last write fills in + // what could not be found in either. + ASSERT_EQ(overlap_buffer_write_ct, 3); + ASSERT_EQ(std::get<0>(buffer_info[0]), 24576); + ASSERT_EQ(std::get<1>(buffer_info[0]), 32768 - 24576); + ASSERT_EQ(std::get<0>(buffer_info[1]), 32768); + ASSERT_EQ(std::get<1>(buffer_info[1]), 4096); + ASSERT_TRUE(std::get<2>( + buffer_info[1])); // in progress async request (otherwise we should not + // be getting 4096 for the size) + } else { + // Overlap buffer reuses bytes 16000 to 24576 + ASSERT_EQ(overlap_buffer_info.first, 16000); + ASSERT_EQ(overlap_buffer_info.second, 10000); + ASSERT_EQ(overlap_buffer_write_ct, 4); + // Even if you try to readahead to offset 16000 + 10000 + 8192, there are + // only 32768 bytes in the original file + ASSERT_EQ(std::get<0>(buffer_info[0]), 12288 + 12288); + ASSERT_EQ(std::get<1>(buffer_info[0]), 8192); + } +} + +TEST_P(FSBufferPrefetchTest, FSBufferPrefetchUnalignedReads) { + // Check that the main buffer, the overlap_buf_, and the secondary buffer (in + // the case of num_buffers_ > 1) are populated correctly + // while reading with no regard to alignment + std::string fname = "fs-buffer-prefetch-unaligned-reads"; + Random rand(0); + std::string content = rand.RandomString(1000); + Write(fname, content); + + FileOptions opts; + std::unique_ptr r; + Read(fname, opts, &r); + + std::shared_ptr stats = CreateDBStatistics(); + ReadaheadParams readahead_params; + // Readahead size will double each time + readahead_params.initial_readahead_size = 5; + readahead_params.max_readahead_size = 100; + bool use_async_prefetch = GetParam(); + size_t num_buffers = use_async_prefetch ? 2 : 1; + readahead_params.num_buffers = num_buffers; + FilePrefetchBuffer fpb(readahead_params, true, false, fs(), clock(), + stats.get()); + + int overlap_buffer_write_ct = 0; + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "FilePrefetchBuffer::CopyDataToOverlapBuffer:Complete", + [&](void* /*arg*/) { overlap_buffer_write_ct++; }); + SyncPoint::GetInstance()->EnableProcessing(); + + Slice result; + // Read 3 bytes at offset 5 + Status s; + std::vector> buffer_info(num_buffers); + std::pair overlap_buffer_info; + bool could_read_from_cache = + fpb.TryReadFromCache(IOOptions(), r.get(), 5, 3, &result, &s); + // Platforms that don't have IO uring may not support async IO. + if (use_async_prefetch && s.IsNotSupported()) { + return; + } + ASSERT_TRUE(could_read_from_cache); + ASSERT_EQ(s, Status::OK()); + ASSERT_EQ(strncmp(result.data(), content.substr(5, 3).c_str(), 3), 0); + fpb.TEST_GetOverlapBufferOffsetandSize(overlap_buffer_info); + fpb.TEST_GetBufferOffsetandSize(buffer_info); + if (use_async_prefetch) { + // Overlap buffer is not used + ASSERT_EQ(overlap_buffer_info.first, 0); + ASSERT_EQ(overlap_buffer_info.second, 0); + // With async prefetching, we still try to align to 4096 bytes, so + // our main buffer read and secondary buffer prefetch are rounded up + ASSERT_EQ(std::get<0>(buffer_info[0]), 0); + ASSERT_EQ(std::get<1>(buffer_info[0]), 1000); + // This buffer won't actually get filled up with data since there is nothing + // after 1000 + ASSERT_EQ(std::get<0>(buffer_info[1]), 4096); + ASSERT_EQ(std::get<1>(buffer_info[1]), 4096); + ASSERT_TRUE(std::get<2>(buffer_info[1])); // in progress async request + } else { + // Overlap buffer is not used + ASSERT_EQ(overlap_buffer_info.first, 0); + ASSERT_EQ(overlap_buffer_info.second, 0); + // Main buffer contains the requested data + 5 of prefetched data (5 - 13) + ASSERT_EQ(std::get<0>(buffer_info[0]), 5); + ASSERT_EQ(std::get<1>(buffer_info[0]), 3 + 5); + } + + ASSERT_TRUE(fpb.TryReadFromCache(IOOptions(), r.get(), 16, 7, &result, &s)); + ASSERT_EQ(s, Status::OK()); + ASSERT_EQ(strncmp(result.data(), content.substr(16, 7).c_str(), 7), 0); + fpb.TEST_GetOverlapBufferOffsetandSize(overlap_buffer_info); + fpb.TEST_GetBufferOffsetandSize(buffer_info); + if (use_async_prefetch) { + // Complete hit since we have the entire file loaded in the main buffer + // The remaining requests will be the same when use_async_prefetch is true + ASSERT_EQ(overlap_buffer_info.first, 0); + ASSERT_EQ(overlap_buffer_info.second, 0); + ASSERT_EQ(std::get<0>(buffer_info[0]), 0); + ASSERT_EQ(std::get<1>(buffer_info[0]), 1000); + } else { + // Complete miss: read 7 bytes at offset 16 + // Overlap buffer is not used (no partial hit) + ASSERT_EQ(overlap_buffer_info.first, 0); + ASSERT_EQ(overlap_buffer_info.second, 0); + // Main buffer contains the requested data + 10 of prefetched data (16 - 33) + ASSERT_EQ(std::get<0>(buffer_info[0]), 16); + ASSERT_EQ(std::get<1>(buffer_info[0]), 7 + 10); + } + + // Go backwards + if (use_async_prefetch) { + ASSERT_TRUE(fpb.TryReadFromCache(IOOptions(), r.get(), 10, 8, &result, &s)); + } else { + // TryReadFromCacheUntracked returns false since the offset + // requested is less than the start of our buffer + ASSERT_FALSE( + fpb.TryReadFromCache(IOOptions(), r.get(), 10, 8, &result, &s)); + } + ASSERT_EQ(s, Status::OK()); + + ASSERT_TRUE(fpb.TryReadFromCache(IOOptions(), r.get(), 27, 6, &result, &s)); + ASSERT_EQ(s, Status::OK()); + ASSERT_EQ(strncmp(result.data(), content.substr(27, 6).c_str(), 6), 0); + fpb.TEST_GetOverlapBufferOffsetandSize(overlap_buffer_info); + fpb.TEST_GetBufferOffsetandSize(buffer_info); + if (use_async_prefetch) { + // Complete hit since we have the entire file loaded in the main buffer + ASSERT_EQ(overlap_buffer_info.first, 0); + ASSERT_EQ(overlap_buffer_info.second, 0); + ASSERT_EQ(std::get<0>(buffer_info[0]), 0); + ASSERT_EQ(std::get<1>(buffer_info[0]), 1000); + } else { + // Complete hit + // Overlap buffer still not used + ASSERT_EQ(overlap_buffer_info.first, 0); + ASSERT_EQ(overlap_buffer_info.second, 0); + // Main buffer unchanged + ASSERT_EQ(std::get<0>(buffer_info[0]), 16); + ASSERT_EQ(std::get<1>(buffer_info[0]), 7 + 10); + } + + ASSERT_TRUE(fpb.TryReadFromCache(IOOptions(), r.get(), 30, 20, &result, &s)); + ASSERT_EQ(s, Status::OK()); + ASSERT_EQ(strncmp(result.data(), content.substr(30, 20).c_str(), 20), 0); + fpb.TEST_GetOverlapBufferOffsetandSize(overlap_buffer_info); + fpb.TEST_GetBufferOffsetandSize(buffer_info); + if (use_async_prefetch) { + // Complete hit since we have the entire file loaded in the main buffer + ASSERT_EQ(overlap_buffer_info.first, 0); + ASSERT_EQ(overlap_buffer_info.second, 0); + ASSERT_EQ(std::get<0>(buffer_info[0]), 0); + ASSERT_EQ(std::get<1>(buffer_info[0]), 1000); + } else { + // Partial hit (overlapping with end of main buffer) + // Overlap buffer is used because we already had 30-33 + ASSERT_EQ(overlap_buffer_info.first, 30); + ASSERT_EQ(overlap_buffer_info.second, 20); + ASSERT_EQ(overlap_buffer_write_ct, 2); + // Main buffer has up to offset 50 + 20 of prefetched data + ASSERT_EQ(std::get<0>(buffer_info[0]), 33); + ASSERT_EQ(std::get<1>(buffer_info[0]), (50 - 33) + 20); + } +} + } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) { diff --git a/file/random_access_file_reader.cc b/file/random_access_file_reader.cc index e2f757f5bf..46f5d1c262 100644 --- a/file/random_access_file_reader.cc +++ b/file/random_access_file_reader.cc @@ -199,7 +199,7 @@ IOStatus RandomAccessFileReader::Read(const IOOptions& opts, uint64_t offset, buf.Read(scratch, offset_advance, res_len); } else { scratch = buf.BufferStart() + offset_advance; - aligned_buf->reset(buf.Release()); + *aligned_buf = buf.Release(); } } *result = Slice(scratch, res_len); @@ -384,7 +384,7 @@ IOStatus RandomAccessFileReader::MultiRead(const IOOptions& opts, scratch += r.len; } - aligned_buf->reset(buf.Release()); + *aligned_buf = buf.Release(); fs_reqs = aligned_reqs.data(); num_fs_reqs = aligned_reqs.size(); } @@ -598,8 +598,7 @@ void RandomAccessFileReader::ReadAsyncCallback(FSReadRequest& req, // Set aligned_buf provided by user without additional copy. user_req.scratch = read_async_info->buf_.BufferStart() + offset_advance_len; - read_async_info->user_aligned_buf_->reset( - read_async_info->buf_.Release()); + *read_async_info->user_aligned_buf_ = read_async_info->buf_.Release(); } user_req.result = Slice(user_req.scratch, res_len); } else { diff --git a/file/random_access_file_reader.h b/file/random_access_file_reader.h index 3155136776..3f590be1ca 100644 --- a/file/random_access_file_reader.h +++ b/file/random_access_file_reader.h @@ -25,7 +25,7 @@ class Statistics; class HistogramImpl; class SystemClock; -using AlignedBuf = std::unique_ptr; +using AlignedBuf = FSAllocationPtr; // Align the request r according to alignment and return the aligned result. FSReadRequest Align(const FSReadRequest& r, size_t alignment); diff --git a/table/table_test.cc b/table/table_test.cc index d4e4b3936d..b198baada4 100644 --- a/table/table_test.cc +++ b/table/table_test.cc @@ -3252,14 +3252,14 @@ TEST_P(BlockBasedTableTest, BlockCacheLookupSeqScans) { FilePrefetchBuffer* prefetch_buffer = (static_cast(iter.get())) ->prefetch_buffer(); - std::vector> buffer_info(1); + std::vector> buffer_info(1); prefetch_buffer->TEST_GetBufferOffsetandSize(buffer_info); bbt->TEST_GetDataBlockHandle(read_options, kv_iter->first, block_handle); // It won't prefetch the data of cache hit. // One block data. - ASSERT_EQ(buffer_info[0].second, 4096); - ASSERT_EQ(buffer_info[0].first, block_handle.offset()); + ASSERT_EQ(std::get<1>(buffer_info[0]), 4096); + ASSERT_EQ(std::get<0>(buffer_info[0]), block_handle.offset()); ASSERT_EQ(options.statistics->getAndResetTickerCount(READAHEAD_TRIMMED), 1); @@ -3290,14 +3290,14 @@ TEST_P(BlockBasedTableTest, BlockCacheLookupSeqScans) { FilePrefetchBuffer* prefetch_buffer = (static_cast(iter.get())) ->prefetch_buffer(); - std::vector> buffer_info(1); + std::vector> buffer_info(1); prefetch_buffer->TEST_GetBufferOffsetandSize(buffer_info); bbt->TEST_GetDataBlockHandle(read_options, kv_iter->first, block_handle); // It won't prefetch the data of cache hit. // 3 blocks data. - ASSERT_EQ(buffer_info[0].second, 12288); - ASSERT_EQ(buffer_info[0].first, block_handle.offset()); + ASSERT_EQ(std::get<1>(buffer_info[0]), 12288); + ASSERT_EQ(std::get<0>(buffer_info[0]), block_handle.offset()); for (; kv_iter != kvmap.end() && iter->Valid(); kv_iter++) { ASSERT_EQ(iter->key(), kv_iter->first); @@ -3313,8 +3313,8 @@ TEST_P(BlockBasedTableTest, BlockCacheLookupSeqScans) { // Second Prefetch. prefetch_buffer->TEST_GetBufferOffsetandSize(buffer_info); bbt->TEST_GetDataBlockHandle(read_options, kv_iter->first, block_handle); - ASSERT_EQ(buffer_info[0].second, 20480); - ASSERT_EQ(buffer_info[0].first, block_handle.offset()); + ASSERT_EQ(std::get<1>(buffer_info[0]), 20480); + ASSERT_EQ(std::get<0>(buffer_info[0]), block_handle.offset()); ASSERT_EQ(options.statistics->getAndResetTickerCount(READAHEAD_TRIMMED), 1); @@ -3401,13 +3401,13 @@ TEST_P(BlockBasedTableTest, BlockCacheLookupAsyncScansSeek) { FilePrefetchBuffer* prefetch_buffer = (static_cast(iter.get())) ->prefetch_buffer(); - std::vector> buffer_info(2); + std::vector> buffer_info(2); prefetch_buffer->TEST_GetBufferOffsetandSize(buffer_info); bbt->TEST_GetDataBlockHandle(read_options, kv_iter->first, block_handle); - ASSERT_EQ(buffer_info[0].first, block_handle.offset()); - ASSERT_EQ(buffer_info[0].second, 4096); - ASSERT_EQ(buffer_info[1].second, 0); + ASSERT_EQ(std::get<0>(buffer_info[0]), block_handle.offset()); + ASSERT_EQ(std::get<1>(buffer_info[0]), 4096); + ASSERT_EQ(std::get<1>(buffer_info[1]), 0); ASSERT_EQ(options.statistics->getAndResetTickerCount(READAHEAD_TRIMMED), 2); @@ -3440,21 +3440,21 @@ TEST_P(BlockBasedTableTest, BlockCacheLookupAsyncScansSeek) { FilePrefetchBuffer* prefetch_buffer = (static_cast(iter.get())) ->prefetch_buffer(); - std::vector> buffer_info(2); + std::vector> buffer_info(2); prefetch_buffer->TEST_GetBufferOffsetandSize(buffer_info); { // 1st Buffer Verification. bbt->TEST_GetDataBlockHandle(read_options, kv_iter->first, block_handle); - ASSERT_EQ(buffer_info[0].first, block_handle.offset()); - ASSERT_EQ(buffer_info[0].second, 8192); + ASSERT_EQ(std::get<0>(buffer_info[0]), block_handle.offset()); + ASSERT_EQ(std::get<1>(buffer_info[0]), 8192); // 2nd Buffer Verification. InternalKey ikey_tmp("00000360", 0, kTypeValue); bbt->TEST_GetDataBlockHandle(read_options, ikey_tmp.Encode().ToString(), block_handle); - ASSERT_EQ(buffer_info[1].first, block_handle.offset()); - ASSERT_EQ(buffer_info[1].second, 8192); + ASSERT_EQ(std::get<0>(buffer_info[1]), block_handle.offset()); + ASSERT_EQ(std::get<1>(buffer_info[1]), 8192); ASSERT_EQ(options.statistics->getAndResetTickerCount(READAHEAD_TRIMMED), 1); @@ -3493,21 +3493,21 @@ TEST_P(BlockBasedTableTest, BlockCacheLookupAsyncScansSeek) { ->prefetch_buffer(); { - std::vector> buffer_info(2); + std::vector> buffer_info(2); prefetch_buffer->TEST_GetBufferOffsetandSize(buffer_info); // 1st Buffer Verification. bbt->TEST_GetDataBlockHandle(read_options, kv_iter->first, block_handle); - ASSERT_EQ(buffer_info[0].first, block_handle.offset()); - ASSERT_EQ(buffer_info[0].second, 8192); + ASSERT_EQ(std::get<0>(buffer_info[0]), block_handle.offset()); + ASSERT_EQ(std::get<1>(buffer_info[0]), 8192); // 2nd Buffer Verification. InternalKey ikey_tmp("00000540", 0, kTypeValue); bbt->TEST_GetDataBlockHandle(read_options, ikey_tmp.Encode().ToString(), block_handle); - ASSERT_EQ(buffer_info[1].first, block_handle.offset()); - ASSERT_EQ(buffer_info[1].second, 8192); + ASSERT_EQ(std::get<0>(buffer_info[1]), block_handle.offset()); + ASSERT_EQ(std::get<1>(buffer_info[1]), 8192); ASSERT_EQ(options.statistics->getAndResetTickerCount(READAHEAD_TRIMMED), 1); @@ -3527,21 +3527,21 @@ TEST_P(BlockBasedTableTest, BlockCacheLookupAsyncScansSeek) { } { - std::vector> buffer_info(2); + std::vector> buffer_info(2); prefetch_buffer->TEST_GetBufferOffsetandSize(buffer_info); // 1st Buffer Verification. bbt->TEST_GetDataBlockHandle(read_options, kv_iter->first, block_handle); - ASSERT_EQ(buffer_info[0].first, block_handle.offset()); - ASSERT_EQ(buffer_info[0].second, 8192); + ASSERT_EQ(std::get<0>(buffer_info[0]), block_handle.offset()); + ASSERT_EQ(std::get<1>(buffer_info[0]), 8192); // 2nd Buffer Verification. InternalKey ikey_tmp("00000585", 0, kTypeValue); bbt->TEST_GetDataBlockHandle(read_options, ikey_tmp.Encode().ToString(), block_handle); - ASSERT_EQ(buffer_info[1].first, block_handle.offset()); - ASSERT_EQ(buffer_info[1].second, 4096); + ASSERT_EQ(std::get<0>(buffer_info[1]), block_handle.offset()); + ASSERT_EQ(std::get<1>(buffer_info[1]), 4096); ASSERT_EQ(options.statistics->getAndResetTickerCount(READAHEAD_TRIMMED), 1); @@ -3561,21 +3561,21 @@ TEST_P(BlockBasedTableTest, BlockCacheLookupAsyncScansSeek) { } { - std::vector> buffer_info(2); + std::vector> buffer_info(2); prefetch_buffer->TEST_GetBufferOffsetandSize(buffer_info); // 1st Buffer Verification. bbt->TEST_GetDataBlockHandle(read_options, kv_iter->first, block_handle); - ASSERT_EQ(buffer_info[0].first, block_handle.offset()); - ASSERT_EQ(buffer_info[0].second, 4096); + ASSERT_EQ(std::get<0>(buffer_info[0]), block_handle.offset()); + ASSERT_EQ(std::get<1>(buffer_info[0]), 4096); // 2nd Buffer Verification. InternalKey ikey_tmp("00000615", 0, kTypeValue); bbt->TEST_GetDataBlockHandle(read_options, ikey_tmp.Encode().ToString(), block_handle); - ASSERT_EQ(buffer_info[1].first, block_handle.offset()); - ASSERT_EQ(buffer_info[1].second, 4096); + ASSERT_EQ(std::get<0>(buffer_info[1]), block_handle.offset()); + ASSERT_EQ(std::get<1>(buffer_info[1]), 4096); ASSERT_EQ(options.statistics->getAndResetTickerCount(READAHEAD_TRIMMED), 1); @@ -3595,21 +3595,21 @@ TEST_P(BlockBasedTableTest, BlockCacheLookupAsyncScansSeek) { } { - std::vector> buffer_info(2); + std::vector> buffer_info(2); prefetch_buffer->TEST_GetBufferOffsetandSize(buffer_info); // 1st Buffer Verification. bbt->TEST_GetDataBlockHandle(read_options, kv_iter->first, block_handle); - ASSERT_EQ(buffer_info[0].first, block_handle.offset()); - ASSERT_EQ(buffer_info[0].second, 4096); + ASSERT_EQ(std::get<0>(buffer_info[0]), block_handle.offset()); + ASSERT_EQ(std::get<1>(buffer_info[0]), 4096); // 2nd Buffer Verification. InternalKey ikey_tmp("00000630", 0, kTypeValue); bbt->TEST_GetDataBlockHandle(read_options, ikey_tmp.Encode().ToString(), block_handle); - ASSERT_EQ(buffer_info[1].first, block_handle.offset()); - ASSERT_EQ(buffer_info[1].second, 8192); + ASSERT_EQ(std::get<0>(buffer_info[1]), block_handle.offset()); + ASSERT_EQ(std::get<1>(buffer_info[1]), 8192); ASSERT_EQ(options.statistics->getAndResetTickerCount(READAHEAD_TRIMMED), 0); diff --git a/unreleased_history/performance_improvements/reuse_file_system_buffer_prefetch.md b/unreleased_history/performance_improvements/reuse_file_system_buffer_prefetch.md new file mode 100644 index 0000000000..f35735ccc4 --- /dev/null +++ b/unreleased_history/performance_improvements/reuse_file_system_buffer_prefetch.md @@ -0,0 +1 @@ +* Enable reuse of file system allocated buffer for synchronous prefetching. diff --git a/util/aligned_buffer.h b/util/aligned_buffer.h index acab56c215..dbe34b6da0 100644 --- a/util/aligned_buffer.h +++ b/util/aligned_buffer.h @@ -12,7 +12,7 @@ #include #include "port/port.h" - +#include "rocksdb/file_system.h" namespace ROCKSDB_NAMESPACE { // This file contains utilities to handle the alignment of pages and buffers. @@ -56,7 +56,7 @@ inline size_t Rounddown(size_t x, size_t y) { return (x / y) * y; } // copy_offset, copy_len); class AlignedBuffer { size_t alignment_; - std::unique_ptr buf_; + FSAllocationPtr buf_; size_t capacity_; size_t cursize_; char* bufstart_; @@ -100,11 +100,11 @@ class AlignedBuffer { void Clear() { cursize_ = 0; } - char* Release() { + FSAllocationPtr Release() { cursize_ = 0; capacity_ = 0; bufstart_ = nullptr; - return buf_.release(); + return std::move(buf_); } void Alignment(size_t alignment) { @@ -113,6 +113,17 @@ class AlignedBuffer { alignment_ = alignment; } + // Points the buffer to new_buf (taking ownership) without allocating extra + // memory or performing any data copies. This method is called when we want to + // reuse the buffer provided by the file system + void SetBuffer(size_t size, FSAllocationPtr&& new_buf) { + alignment_ = 1; + capacity_ = size; + cursize_ = size; + bufstart_ = reinterpret_cast(new_buf.get()); + buf_ = std::move(new_buf); + } + // Allocates a new buffer and sets the start position to the first aligned // byte. // @@ -156,7 +167,11 @@ class AlignedBuffer { bufstart_ = new_bufstart; capacity_ = new_capacity; - buf_.reset(new_buf); + // buf_ is a FSAllocationPtr which takes in a deleter + // we can just wrap the regular default delete that would have been called + buf_ = std::unique_ptr>( + static_cast(new_buf), + [](void* p) { delete[] static_cast(p); }); } // Append to the buffer.