diff --git a/file/file_prefetch_buffer.cc b/file/file_prefetch_buffer.cc index 73050aed79..764f450c03 100644 --- a/file/file_prefetch_buffer.cc +++ b/file/file_prefetch_buffer.cc @@ -22,11 +22,9 @@ namespace ROCKSDB_NAMESPACE { -void FilePrefetchBuffer::PrepareBufferForRead(BufferInfo* buf, size_t alignment, - uint64_t offset, - size_t roundup_len, - bool refit_tail, - uint64_t& aligned_useful_len) { +void FilePrefetchBuffer::PrepareBufferForRead( + BufferInfo* buf, size_t alignment, uint64_t offset, size_t roundup_len, + bool refit_tail, bool use_fs_buffer, uint64_t& aligned_useful_len) { uint64_t aligned_useful_offset_in_buf = 0; bool copy_data_to_new_buffer = false; // Check if requested bytes are in the existing buffer_. @@ -39,6 +37,9 @@ void FilePrefetchBuffer::PrepareBufferForRead(BufferInfo* buf, size_t alignment, // new buffer is created. aligned_useful_offset_in_buf = Rounddown(static_cast(offset - buf->offset_), alignment); + // aligned_useful_len is passed by reference and used to calculate how much + // data needs to be read, so it is needed regardless of whether + // use_fs_buffer is true aligned_useful_len = static_cast(buf->CurrentSize()) - aligned_useful_offset_in_buf; assert(aligned_useful_offset_in_buf % alignment == 0); @@ -53,6 +54,16 @@ void FilePrefetchBuffer::PrepareBufferForRead(BufferInfo* buf, size_t alignment, } } + // The later buffer allocation / tail refitting does not apply when + // use_fs_buffer is true. If we allocate a new buffer, we end up throwing it + // away later when we reuse the file system allocated buffer. If we refit + // the tail in the main buffer, we don't have a place to put the next chunk of + // data provided by the file system (without performing another copy, which we + // are trying to avoid in the first place) + if (use_fs_buffer) { + return; + } + // Create a new buffer only if current capacity is not sufficient, and memcopy // bytes from old buffer if needed (i.e., if aligned_useful_len is greater // than 0). @@ -62,8 +73,8 @@ void FilePrefetchBuffer::PrepareBufferForRead(BufferInfo* buf, size_t alignment, static_cast(roundup_len), copy_data_to_new_buffer, aligned_useful_offset_in_buf, static_cast(aligned_useful_len)); } else if (aligned_useful_len > 0 && refit_tail) { - // New buffer not needed. But memmove bytes from tail to the beginning since - // aligned_useful_len is greater than 0. + // New buffer not needed. But memmove bytes from tail to the beginning + // since aligned_useful_len is greater than 0. buf->buffer_.RefitTail(static_cast(aligned_useful_offset_in_buf), static_cast(aligned_useful_len)); } else if (aligned_useful_len > 0) { @@ -84,9 +95,18 @@ Status FilePrefetchBuffer::Read(BufferInfo* buf, const IOOptions& opts, uint64_t read_len, uint64_t aligned_useful_len, uint64_t start_offset) { Slice result; - char* to_buf = buf->buffer_.BufferStart() + aligned_useful_len; - Status s = reader->Read(opts, start_offset + aligned_useful_len, read_len, - &result, to_buf, /*aligned_buf=*/nullptr); + Status s; + char* to_buf = nullptr; + bool use_fs_buffer = UseFSBuffer(reader); + if (use_fs_buffer) { + s = FSBufferDirectRead(reader, buf, opts, start_offset + aligned_useful_len, + read_len, result); + } else { + to_buf = buf->buffer_.BufferStart() + aligned_useful_len; + s = reader->Read(opts, start_offset + aligned_useful_len, read_len, &result, + to_buf, /*aligned_buf=*/nullptr); + } + #ifndef NDEBUG if (result.size() < read_len) { // Fake an IO error to force db_stress fault injection to ignore @@ -97,7 +117,7 @@ Status FilePrefetchBuffer::Read(BufferInfo* buf, const IOOptions& opts, if (!s.ok()) { return s; } - if (result.data() != to_buf) { + if (!use_fs_buffer && result.data() != to_buf) { // If the read is coming from some other buffer already in memory (such as // mmap) then it would be inefficient to create another copy in this // FilePrefetchBuffer. The caller is expected to exclude this case. @@ -108,8 +128,11 @@ Status FilePrefetchBuffer::Read(BufferInfo* buf, const IOOptions& opts, if (usage_ == FilePrefetchBufferUsage::kUserScanPrefetch) { RecordTick(stats_, PREFETCH_BYTES, read_len); } - // Update the buffer size. - buf->buffer_.Size(static_cast(aligned_useful_len) + result.size()); + if (!use_fs_buffer) { + // Update the buffer size. + // We already explicitly set the buffer size when we reuse the FS buffer + buf->buffer_.Size(static_cast(aligned_useful_len) + result.size()); + } return s; } @@ -157,13 +180,14 @@ Status FilePrefetchBuffer::Prefetch(const IOOptions& opts, return Status::OK(); } - size_t alignment = reader->file()->GetRequiredBufferAlignment(); + size_t alignment = GetRequiredBufferAlignment(reader); uint64_t rounddown_offset = offset, roundup_end = 0, aligned_useful_len = 0; size_t read_len = 0; + bool use_fs_buffer = UseFSBuffer(reader); ReadAheadSizeTuning(buf, /*read_curr_block=*/true, - /*refit_tail=*/true, rounddown_offset, alignment, 0, n, - rounddown_offset, roundup_end, read_len, + /*refit_tail=*/true, use_fs_buffer, rounddown_offset, + alignment, 0, n, rounddown_offset, roundup_end, read_len, aligned_useful_len); Status s; @@ -178,12 +202,13 @@ Status FilePrefetchBuffer::Prefetch(const IOOptions& opts, } // Copy data from src to overlap_buf_. -void FilePrefetchBuffer::CopyDataToBuffer(BufferInfo* src, uint64_t& offset, - size_t& length) { +void FilePrefetchBuffer::CopyDataToOverlapBuffer(BufferInfo* src, + uint64_t& offset, + size_t& length) { if (length == 0) { return; } - + assert(src->IsOffsetInBuffer(offset)); uint64_t copy_offset = (offset - src->offset_); size_t copy_len = 0; if (src->IsDataBlockInBuffer(offset, length)) { @@ -194,10 +219,8 @@ void FilePrefetchBuffer::CopyDataToBuffer(BufferInfo* src, uint64_t& offset, } BufferInfo* dst = overlap_buf_; - memcpy(dst->buffer_.BufferStart() + dst->CurrentSize(), - src->buffer_.BufferStart() + copy_offset, copy_len); - - dst->buffer_.Size(dst->CurrentSize() + copy_len); + assert(copy_len <= dst->buffer_.Capacity() - dst->buffer_.CurrentSize()); + dst->buffer_.Append(src->buffer_.BufferStart() + copy_offset, copy_len); // Update offset and length. offset += copy_len; @@ -208,6 +231,7 @@ void FilePrefetchBuffer::CopyDataToBuffer(BufferInfo* src, uint64_t& offset, if (length > 0) { FreeFrontBuffer(); } + TEST_SYNC_POINT("FilePrefetchBuffer::CopyDataToOverlapBuffer:Complete"); } // Clear the buffers if it contains outdated data. Outdated data can be because @@ -355,7 +379,7 @@ void FilePrefetchBuffer::PollIfNeeded(uint64_t offset, size_t length) { // of ReadAsync to make sure it doesn't read anything from // previous buffer which is already prefetched. void FilePrefetchBuffer::ReadAheadSizeTuning( - BufferInfo* buf, bool read_curr_block, bool refit_tail, + BufferInfo* buf, bool read_curr_block, bool refit_tail, bool use_fs_buffer, uint64_t prev_buf_end_offset, size_t alignment, size_t length, size_t readahead_size, uint64_t& start_offset, uint64_t& end_offset, size_t& read_len, uint64_t& aligned_useful_len) { @@ -408,7 +432,7 @@ void FilePrefetchBuffer::ReadAheadSizeTuning( uint64_t roundup_len = end_offset - start_offset; PrepareBufferForRead(buf, alignment, start_offset, roundup_len, refit_tail, - aligned_useful_len); + use_fs_buffer, aligned_useful_len); assert(roundup_len >= aligned_useful_len); // Update the buffer offset. @@ -422,11 +446,43 @@ void FilePrefetchBuffer::ReadAheadSizeTuning( (end_offset - start_offset)); } +// This is for when num_buffers_ = 1. +// If we are reusing the file system allocated buffer, and only some of the +// requested data is in the buffer, we copy the relevant data to overlap_buf_ +void FilePrefetchBuffer::HandleOverlappingSyncData(uint64_t offset, + size_t length, + uint64_t& tmp_offset, + size_t& tmp_length, + bool& use_overlap_buffer) { + if (IsBufferQueueEmpty()) { + return; + } + BufferInfo* buf = GetFirstBuffer(); + // We should only be calling this when num_buffers_ = 1, so there should + // not be any async reads. + assert(!buf->async_read_in_progress_); + + if (!buf->async_read_in_progress_ && buf->DoesBufferContainData() && + buf->IsOffsetInBuffer(offset) && + buf->offset_ + buf->CurrentSize() < offset + length) { + // Allocated overlap_buf_ is just enough to hold the result for the user + // Alignment does not matter here + use_overlap_buffer = true; + overlap_buf_->ClearBuffer(); + overlap_buf_->buffer_.Alignment(1); + overlap_buf_->buffer_.AllocateNewBuffer(length); + overlap_buf_->offset_ = offset; + CopyDataToOverlapBuffer(buf, tmp_offset, tmp_length); + UpdateStats(/*found_in_buffer=*/false, overlap_buf_->CurrentSize()); + } +} + +// This is for when num_buffers_ > 1. // If data is overlapping between two buffers then during this call: // - data from first buffer is copied into overlapping buffer, // - first is removed from bufs_ and freed so that it can be used for async // prefetching of further data. -Status FilePrefetchBuffer::HandleOverlappingData( +Status FilePrefetchBuffer::HandleOverlappingAsyncData( const IOOptions& opts, RandomAccessFileReader* reader, uint64_t offset, size_t length, size_t readahead_size, bool& copy_to_overlap_buffer, uint64_t& tmp_offset, size_t& tmp_length) { @@ -436,7 +492,7 @@ Status FilePrefetchBuffer::HandleOverlappingData( } Status s; - size_t alignment = reader->file()->GetRequiredBufferAlignment(); + size_t alignment = GetRequiredBufferAlignment(reader); BufferInfo* buf = GetFirstBuffer(); @@ -470,7 +526,7 @@ Status FilePrefetchBuffer::HandleOverlappingData( overlap_buf_->offset_ = offset; copy_to_overlap_buffer = true; - CopyDataToBuffer(buf, tmp_offset, tmp_length); + CopyDataToOverlapBuffer(buf, tmp_offset, tmp_length); UpdateStats(/*found_in_buffer=*/false, overlap_buf_->CurrentSize()); // Call async prefetching on freed buffer since data has been consumed @@ -495,8 +551,8 @@ Status FilePrefetchBuffer::HandleOverlappingData( uint64_t end_offset = start_offset, aligned_useful_len = 0; ReadAheadSizeTuning(new_buf, /*read_curr_block=*/false, - /*refit_tail=*/false, next_buf->offset_ + second_size, - alignment, + /*refit_tail=*/false, /*use_fs_buffer=*/false, + next_buf->offset_ + second_size, alignment, /*length=*/0, readahead_size, start_offset, end_offset, read_len, aligned_useful_len); if (read_len > 0) { @@ -537,7 +593,7 @@ Status FilePrefetchBuffer::PrefetchInternal(const IOOptions& opts, TEST_SYNC_POINT("FilePrefetchBuffer::Prefetch:Start"); - size_t alignment = reader->file()->GetRequiredBufferAlignment(); + size_t alignment = GetRequiredBufferAlignment(reader); Status s; uint64_t tmp_offset = offset; size_t tmp_length = length; @@ -550,12 +606,20 @@ Status FilePrefetchBuffer::PrefetchInternal(const IOOptions& opts, } ClearOutdatedData(offset, length); - // Handle overlapping data over two buffers. - s = HandleOverlappingData(opts, reader, offset, length, readahead_size, - copy_to_overlap_buffer, tmp_offset, tmp_length); + // Handle overlapping data over two buffers (async prefetching case). + s = HandleOverlappingAsyncData(opts, reader, offset, length, readahead_size, + copy_to_overlap_buffer, tmp_offset, + tmp_length); if (!s.ok()) { return s; } + // Handle partially available data when reusing the file system buffer + // and num_buffers_ = 1 (sync prefetching case) + bool use_fs_buffer = UseFSBuffer(reader); + if (!copy_to_overlap_buffer && use_fs_buffer) { + HandleOverlappingSyncData(offset, length, tmp_offset, tmp_length, + copy_to_overlap_buffer); + } AllocateBufferIfEmpty(); BufferInfo* buf = GetFirstBuffer(); @@ -586,8 +650,18 @@ Status FilePrefetchBuffer::PrefetchInternal(const IOOptions& opts, if (copy_to_overlap_buffer) { // Data is overlapping i.e. some of the data has been copied to overlap // buffer and remaining will be updated below. + // Note: why do we not end up performing a duplicate copy when we already + // copy to the overlap buffer in HandleOverlappingAsyncData / + // HandleOverlappingSyncData? The reason is that when we call + // CopyDataToOverlapBuffer, if the buffer is only a "partial hit", then we + // clear it out since it does not have any more useful data once we copy + // to the overlap buffer. Once we reallocate a fresh buffer, that buffer + // will have no data, and it will be the "first" buffer when num_buffers_ + // = 1. When num_buffers_ > 1, we call ClearOutdatedData() so we know + // that, if we get to this point in the control flow, the "front" buffer + // has to have the data we need. size_t initial_buf_size = overlap_buf_->CurrentSize(); - CopyDataToBuffer(buf, offset, length); + CopyDataToOverlapBuffer(buf, offset, length); UpdateStats( /*found_in_buffer=*/false, overlap_buf_->CurrentSize() - initial_buf_size); @@ -636,10 +710,10 @@ Status FilePrefetchBuffer::PrefetchInternal(const IOOptions& opts, UpdateStats(/*found_in_buffer=*/false, (buf->offset_ + buf->CurrentSize() - offset)); } - ReadAheadSizeTuning(buf, /*read_curr_block=*/true, /*refit_tail*/ - true, start_offset1, alignment, length, readahead_size, - start_offset1, end_offset1, read_len1, - aligned_useful_len1); + ReadAheadSizeTuning(buf, /*read_curr_block=*/true, /*refit_tail=*/ + true, /*use_fs_buffer=*/use_fs_buffer, start_offset1, + alignment, length, readahead_size, start_offset1, + end_offset1, read_len1, aligned_useful_len1); } else { UpdateStats(/*found_in_buffer=*/true, original_length); } @@ -662,10 +736,10 @@ Status FilePrefetchBuffer::PrefetchInternal(const IOOptions& opts, } } - // Copy remaining requested bytes to overlap_buffer. No need to update stats - // as data is prefetched during this call. + // Copy remaining requested bytes to overlap_buf_. No need to + // update stats as data is prefetched during this call. if (copy_to_overlap_buffer && length > 0) { - CopyDataToBuffer(buf, offset, length); + CopyDataToOverlapBuffer(buf, offset, length); } return s; } @@ -892,7 +966,7 @@ Status FilePrefetchBuffer::PrefetchAsync(const IOOptions& opts, std::string msg; Status s; - size_t alignment = reader->file()->GetRequiredBufferAlignment(); + size_t alignment = GetRequiredBufferAlignment(reader); size_t readahead_size = is_eligible_for_prefetching ? readahead_size_ / 2 : 0; size_t offset_to_read = static_cast(offset); uint64_t start_offset1 = offset, end_offset1 = 0, aligned_useful_len1 = 0; @@ -915,6 +989,7 @@ Status FilePrefetchBuffer::PrefetchAsync(const IOOptions& opts, // Prefetch full data + readahead_size in the first buffer. if (is_eligible_for_prefetching || reader->use_direct_io()) { ReadAheadSizeTuning(buf, /*read_curr_block=*/true, /*refit_tail=*/false, + /*use_fs_buffer=*/false, /*prev_buf_end_offset=*/start_offset1, alignment, n, readahead_size, start_offset1, end_offset1, read_len1, aligned_useful_len1); @@ -923,7 +998,8 @@ Status FilePrefetchBuffer::PrefetchAsync(const IOOptions& opts, start_offset1 = offset_to_read; end_offset1 = offset_to_read + n; roundup_len1 = end_offset1 - start_offset1; - PrepareBufferForRead(buf, alignment, start_offset1, roundup_len1, false, + PrepareBufferForRead(buf, alignment, start_offset1, roundup_len1, + /*refit_tail=*/false, /*use_fs_buffer=*/false, aligned_useful_len1); assert(aligned_useful_len1 == 0); assert(roundup_len1 >= aligned_useful_len1); @@ -970,7 +1046,7 @@ Status FilePrefetchBuffer::PrefetchRemBuffers(const IOOptions& opts, uint64_t end_offset2 = start_offset2, aligned_useful_len2 = 0; size_t read_len2 = 0; ReadAheadSizeTuning(new_buf, /*read_curr_block=*/false, - /*refit_tail=*/false, + /*refit_tail=*/false, /*use_fs_buffer=*/false, /*prev_buf_end_offset=*/end_offset1, alignment, /*length=*/0, readahead_size, start_offset2, end_offset2, read_len2, aligned_useful_len2); diff --git a/file/file_prefetch_buffer.h b/file/file_prefetch_buffer.h index dfa8389294..833e6e6015 100644 --- a/file/file_prefetch_buffer.h +++ b/file/file_prefetch_buffer.h @@ -15,7 +15,9 @@ #include #include +#include "file/random_access_file_reader.h" #include "file/readahead_file_info.h" +#include "file_util.h" #include "monitoring/statistics_impl.h" #include "port/port.h" #include "rocksdb/env.h" @@ -149,6 +151,9 @@ enum class FilePrefetchBufferUsage { // // If num_buffers_ == 1, it's a sequential read flow. Read API will be called on // that one buffer whenever the data is requested and is not in the buffer. +// When reusing the file system allocated buffer, overlap_buf_ is used if the +// main buffer only contains part of the requested data. It is returned to +// the caller after the remaining data is fetched. // If num_buffers_ > 1, then the data is prefetched asynchronosuly in the // buffers whenever the data is consumed from the buffers and that buffer is // freed. @@ -206,10 +211,15 @@ class FilePrefetchBuffer { assert((num_file_reads_ >= num_file_reads_for_auto_readahead_ + 1) || (num_file_reads_ == 0)); - // If num_buffers_ > 1, data is asynchronously filled in the - // queue. As result, data can be overlapping in two buffers. It copies the - // data to overlap_buf_ in order to to return continuous buffer. - if (num_buffers_ > 1) { + // overlap_buf_ is used whenever the main buffer only has part of the + // requested data. The relevant data is copied into overlap_buf_ and the + // remaining data is copied in later to satisfy the user's request. This is + // used in both the synchronous (num_buffers_ = 1) and asynchronous + // (num_buffers_ > 1) cases. In the asynchronous case, the requested data + // may be spread out over 2 buffers. + if (num_buffers_ > 1 || + (fs_ != nullptr && + CheckFSFeatureSupport(fs_, FSSupportedOps::kFSBuffer))) { overlap_buf_ = new BufferInfo(); } @@ -379,12 +389,21 @@ class FilePrefetchBuffer { void PrefetchAsyncCallback(FSReadRequest& req, void* cb_arg); void TEST_GetBufferOffsetandSize( - std::vector>& buffer_info) { + std::vector>& buffer_info) { for (size_t i = 0; i < bufs_.size(); i++) { - buffer_info[i].first = bufs_[i]->offset_; - buffer_info[i].second = bufs_[i]->async_read_in_progress_ - ? bufs_[i]->async_req_len_ - : bufs_[i]->CurrentSize(); + std::get<0>(buffer_info[i]) = bufs_[i]->offset_; + std::get<1>(buffer_info[i]) = bufs_[i]->async_read_in_progress_ + ? bufs_[i]->async_req_len_ + : bufs_[i]->CurrentSize(); + std::get<2>(buffer_info[i]) = bufs_[i]->async_read_in_progress_; + } + } + + void TEST_GetOverlapBufferOffsetandSize( + std::pair& buffer_info) { + if (overlap_buf_ != nullptr) { + buffer_info.first = overlap_buf_->offset_; + buffer_info.second = overlap_buf_->CurrentSize(); } } @@ -394,7 +413,7 @@ class FilePrefetchBuffer { // required. void PrepareBufferForRead(BufferInfo* buf, size_t alignment, uint64_t offset, size_t roundup_len, bool refit_tail, - uint64_t& aligned_useful_len); + bool use_fs_buffer, uint64_t& aligned_useful_len); void AbortOutdatedIO(uint64_t offset); @@ -418,7 +437,8 @@ class FilePrefetchBuffer { uint64_t start_offset); // Copy the data from src to overlap_buf_. - void CopyDataToBuffer(BufferInfo* src, uint64_t& offset, size_t& length); + void CopyDataToOverlapBuffer(BufferInfo* src, uint64_t& offset, + size_t& length); bool IsBlockSequential(const size_t& offset) { return (prev_len_ == 0 || (prev_offset_ + prev_len_ == offset)); @@ -465,6 +485,50 @@ class FilePrefetchBuffer { return true; } + // Whether we reuse the file system provided buffer + // Until we also handle the async read case, only enable this optimization + // for the synchronous case when num_buffers_ = 1. + bool UseFSBuffer(RandomAccessFileReader* reader) { + return reader->file() != nullptr && !reader->use_direct_io() && + fs_ != nullptr && + CheckFSFeatureSupport(fs_, FSSupportedOps::kFSBuffer) && + num_buffers_ == 1; + } + + // When we are reusing the file system provided buffer, we are not concerned + // with alignment. However, quite a bit of prefetch code incorporates + // alignment, so we can put in 1 to keep the code simpler. + size_t GetRequiredBufferAlignment(RandomAccessFileReader* reader) { + if (UseFSBuffer(reader)) { + return 1; + } + return reader->file()->GetRequiredBufferAlignment(); + } + + // Reuses the file system allocated buffer to avoid an extra copy + IOStatus FSBufferDirectRead(RandomAccessFileReader* reader, BufferInfo* buf, + const IOOptions& opts, uint64_t offset, size_t n, + Slice& result) { + FSReadRequest read_req; + read_req.offset = offset; + read_req.len = n; + read_req.scratch = nullptr; + IOStatus s = reader->MultiRead(opts, &read_req, 1, nullptr); + if (!s.ok()) { + return s; + } + s = read_req.status; + if (!s.ok()) { + return s; + } + buf->buffer_.SetBuffer(read_req.result.size(), + std::move(read_req.fs_scratch)); + buf->offset_ = offset; + buf->initial_end_offset_ = offset + read_req.result.size(); + result = read_req.result; + return s; + } + void DestroyAndClearIOHandle(BufferInfo* buf) { if (buf->io_handle_ != nullptr && buf->del_fn_ != nullptr) { buf->del_fn_(buf->io_handle_); @@ -474,11 +538,16 @@ class FilePrefetchBuffer { buf->async_read_in_progress_ = false; } - Status HandleOverlappingData(const IOOptions& opts, - RandomAccessFileReader* reader, uint64_t offset, - size_t length, size_t readahead_size, - bool& copy_to_third_buffer, uint64_t& tmp_offset, - size_t& tmp_length); + void HandleOverlappingSyncData(uint64_t offset, size_t length, + uint64_t& tmp_offset, size_t& tmp_length, + bool& use_overlap_buffer); + + Status HandleOverlappingAsyncData(const IOOptions& opts, + RandomAccessFileReader* reader, + uint64_t offset, size_t length, + size_t readahead_size, + bool& copy_to_third_buffer, + uint64_t& tmp_offset, size_t& tmp_length); bool TryReadFromCacheUntracked(const IOOptions& opts, RandomAccessFileReader* reader, @@ -487,11 +556,11 @@ class FilePrefetchBuffer { bool for_compaction = false); void ReadAheadSizeTuning(BufferInfo* buf, bool read_curr_block, - bool refit_tail, uint64_t prev_buf_end_offset, - size_t alignment, size_t length, - size_t readahead_size, uint64_t& offset, - uint64_t& end_offset, size_t& read_len, - uint64_t& aligned_useful_len); + bool refit_tail, bool use_fs_buffer, + uint64_t prev_buf_end_offset, size_t alignment, + size_t length, size_t readahead_size, + uint64_t& offset, uint64_t& end_offset, + size_t& read_len, uint64_t& aligned_useful_len); void UpdateStats(bool found_in_buffer, size_t length_found) { if (found_in_buffer) { diff --git a/file/prefetch_test.cc b/file/prefetch_test.cc index 62d44be544..534d0e22d6 100644 --- a/file/prefetch_test.cc +++ b/file/prefetch_test.cc @@ -3290,6 +3290,426 @@ TEST_F(FilePrefetchBufferTest, SyncReadaheadStats) { /* 24576(end offset of the buffer) - 16000(requested offset) =*/8576); } +class FSBufferPrefetchTest : public testing::Test, + public ::testing::WithParamInterface { + public: + // Mock file system supporting the kFSBuffer buffer reuse operation + class BufferReuseFS : public FileSystemWrapper { + public: + explicit BufferReuseFS(const std::shared_ptr& _target) + : FileSystemWrapper(_target) {} + ~BufferReuseFS() override {} + const char* Name() const override { return "BufferReuseFS"; } + + IOStatus NewRandomAccessFile(const std::string& fname, + const FileOptions& opts, + std::unique_ptr* result, + IODebugContext* dbg) override { + class WrappedRandomAccessFile : public FSRandomAccessFileOwnerWrapper { + public: + explicit WrappedRandomAccessFile( + std::unique_ptr& file) + : FSRandomAccessFileOwnerWrapper(std::move(file)) {} + + IOStatus MultiRead(FSReadRequest* reqs, size_t num_reqs, + const IOOptions& options, + IODebugContext* dbg) override { + for (size_t i = 0; i < num_reqs; ++i) { + FSReadRequest& req = reqs[i]; + FSAllocationPtr buffer(new char[req.len], [](void* ptr) { + delete[] static_cast(ptr); + }); + req.fs_scratch = std::move(buffer); + req.status = Read(req.offset, req.len, options, &req.result, + static_cast(req.fs_scratch.get()), dbg); + } + return IOStatus::OK(); + } + }; + + std::unique_ptr file; + IOStatus s = target()->NewRandomAccessFile(fname, opts, &file, dbg); + EXPECT_OK(s); + result->reset(new WrappedRandomAccessFile(file)); + return s; + } + + void SupportedOps(int64_t& supported_ops) override { + supported_ops = 1 << FSSupportedOps::kAsyncIO; + supported_ops |= 1 << FSSupportedOps::kFSBuffer; + } + }; + + void SetUp() override { + SetupSyncPointsToMockDirectIO(); + env_ = Env::Default(); + bool use_async_prefetch = GetParam(); + if (use_async_prefetch) { + fs_ = FileSystem::Default(); + } else { + fs_ = std::make_shared(FileSystem::Default()); + } + + test_dir_ = test::PerThreadDBPath("fs_buffer_prefetch_test"); + ASSERT_OK(fs_->CreateDir(test_dir_, IOOptions(), nullptr)); + stats_ = CreateDBStatistics(); + } + + void TearDown() override { EXPECT_OK(DestroyDir(env_, test_dir_)); } + + void Write(const std::string& fname, const std::string& content) { + std::unique_ptr f; + ASSERT_OK(fs_->NewWritableFile(Path(fname), FileOptions(), &f, nullptr)); + ASSERT_OK(f->Append(content, IOOptions(), nullptr)); + ASSERT_OK(f->Close(IOOptions(), nullptr)); + } + + void Read(const std::string& fname, const FileOptions& opts, + std::unique_ptr* reader) { + std::string fpath = Path(fname); + std::unique_ptr f; + ASSERT_OK(fs_->NewRandomAccessFile(fpath, opts, &f, nullptr)); + reader->reset(new RandomAccessFileReader( + std::move(f), fpath, env_->GetSystemClock().get(), + /*io_tracer=*/nullptr, stats_.get())); + } + + FileSystem* fs() { return fs_.get(); } + Statistics* stats() { return stats_.get(); } + SystemClock* clock() { return env_->GetSystemClock().get(); } + + private: + Env* env_; + std::shared_ptr fs_; + std::string test_dir_; + std::shared_ptr stats_; + + std::string Path(const std::string& fname) { return test_dir_ + "/" + fname; } +}; + +INSTANTIATE_TEST_CASE_P(FSBufferPrefetchTest, FSBufferPrefetchTest, + ::testing::Bool()); + +TEST_P(FSBufferPrefetchTest, FSBufferPrefetchStatsInternals) { + // Check that the main buffer, the overlap_buf_, and the secondary buffer (in + // the case of num_buffers_ > 1) are populated correctly while reading a 32 + // KiB file + std::string fname = "fs-buffer-prefetch-stats-internals"; + Random rand(0); + std::string content = rand.RandomString(32768); + Write(fname, content); + + FileOptions opts; + std::unique_ptr r; + Read(fname, opts, &r); + + std::shared_ptr stats = CreateDBStatistics(); + ReadaheadParams readahead_params; + readahead_params.initial_readahead_size = 8192; + readahead_params.max_readahead_size = 8192; + bool use_async_prefetch = GetParam(); + size_t num_buffers = use_async_prefetch ? 2 : 1; + readahead_params.num_buffers = num_buffers; + + FilePrefetchBuffer fpb(readahead_params, true, false, fs(), clock(), + stats.get()); + + int overlap_buffer_write_ct = 0; + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "FilePrefetchBuffer::CopyDataToOverlapBuffer:Complete", + [&](void* /*arg*/) { overlap_buffer_write_ct++; }); + SyncPoint::GetInstance()->EnableProcessing(); + + Slice result; + // Read 4096 bytes at offset 0. + Status s; + std::vector> buffer_info(num_buffers); + std::pair overlap_buffer_info; + bool could_read_from_cache = + fpb.TryReadFromCache(IOOptions(), r.get(), 0, 4096, &result, &s); + // Platforms that don't have IO uring may not support async IO. + if (use_async_prefetch && s.IsNotSupported()) { + return; + } + ASSERT_TRUE(could_read_from_cache); + ASSERT_EQ(s, Status::OK()); + ASSERT_EQ(stats->getAndResetTickerCount(PREFETCH_HITS), 0); + ASSERT_EQ(stats->getAndResetTickerCount(PREFETCH_BYTES_USEFUL), 0); + ASSERT_EQ(strncmp(result.data(), content.substr(0, 4096).c_str(), 4096), 0); + fpb.TEST_GetOverlapBufferOffsetandSize(overlap_buffer_info); + fpb.TEST_GetBufferOffsetandSize(buffer_info); + if (use_async_prefetch) { + // Cut the readahead of 8192 in half. + // Overlap buffer is not used + ASSERT_EQ(overlap_buffer_info.first, 0); + ASSERT_EQ(overlap_buffer_info.second, 0); + // Buffers: 0-8192, 8192-12288 + ASSERT_EQ(std::get<0>(buffer_info[0]), 0); + ASSERT_EQ(std::get<1>(buffer_info[0]), 4096 + 8192 / 2); + ASSERT_EQ(std::get<0>(buffer_info[1]), 4096 + 8192 / 2); + ASSERT_EQ(std::get<1>(buffer_info[1]), 8192 / 2); + } else { + // Read at offset 0 with length 4096 + 8192 = 12288. + // Overlap buffer is not used + ASSERT_EQ(overlap_buffer_info.first, 0); + ASSERT_EQ(overlap_buffer_info.second, 0); + // Main buffer contains the requested data + the 8192 of prefetched data + ASSERT_EQ(std::get<0>(buffer_info[0]), 0); + ASSERT_EQ(std::get<1>(buffer_info[0]), 4096 + 8192); + } + + // Simulate a block cache hit + fpb.UpdateReadPattern(4096, 4096, false); + ASSERT_TRUE( + fpb.TryReadFromCache(IOOptions(), r.get(), 8192, 8192, &result, &s)); + ASSERT_EQ(s, Status::OK()); + ASSERT_EQ(stats->getAndResetTickerCount(PREFETCH_HITS), 0); + ASSERT_EQ(stats->getAndResetTickerCount(PREFETCH_BYTES_USEFUL), + 4096); // 8192-12288 + ASSERT_EQ(strncmp(result.data(), content.substr(8192, 8192).c_str(), 8192), + 0); + fpb.TEST_GetOverlapBufferOffsetandSize(overlap_buffer_info); + fpb.TEST_GetBufferOffsetandSize(buffer_info); + + if (use_async_prefetch) { + // Our buffers were 0-8192, 8192-12288 at the start so we had some + // overlapping data in the second buffer + // We clean up outdated buffers so 0-8192 gets freed for more prefetching. + // Our remaining buffer 8192-12288 has data that we want, so we can reuse it + // We end up with: 8192-20480, 20480-24576 + ASSERT_EQ(overlap_buffer_info.first, 0); + ASSERT_EQ(overlap_buffer_info.second, 0); + ASSERT_EQ(std::get<0>(buffer_info[0]), 8192); + ASSERT_EQ(std::get<1>(buffer_info[0]), 8192 + 8192 / 2); + ASSERT_EQ(std::get<0>(buffer_info[1]), 8192 + (8192 + 8192 / 2)); + ASSERT_EQ(std::get<1>(buffer_info[1]), 8192 / 2); + } else { + // We only have 0-12288 cached, so reading from 8192-16384 will trigger a + // prefetch up through 16384 + 8192 = 24576. + // Overlap buffer reuses bytes 8192 to 12288 + ASSERT_EQ(overlap_buffer_info.first, 8192); + ASSERT_EQ(overlap_buffer_info.second, 8192); + ASSERT_EQ(overlap_buffer_write_ct, 2); + // We spill to the overlap buffer so the remaining buffer only has the + // missing and prefetched part + ASSERT_EQ(std::get<0>(buffer_info[0]), 12288); + ASSERT_EQ(std::get<1>(buffer_info[0]), 12288); + } + + ASSERT_TRUE( + fpb.TryReadFromCache(IOOptions(), r.get(), 12288, 4096, &result, &s)); + ASSERT_EQ(s, Status::OK()); + ASSERT_EQ(stats->getAndResetTickerCount(PREFETCH_HITS), 1); + ASSERT_EQ(stats->getAndResetTickerCount(PREFETCH_BYTES_USEFUL), + 4096); // 12288-16384 + ASSERT_EQ(strncmp(result.data(), content.substr(12288, 4096).c_str(), 4096), + 0); + fpb.TEST_GetOverlapBufferOffsetandSize(overlap_buffer_info); + fpb.TEST_GetBufferOffsetandSize(buffer_info); + + if (use_async_prefetch) { + // Same as before: 8192-20480, 20480-24576 (cache hit in first buffer) + ASSERT_EQ(overlap_buffer_info.first, 0); + ASSERT_EQ(overlap_buffer_info.second, 0); + ASSERT_EQ(std::get<0>(buffer_info[0]), 8192); + ASSERT_EQ(std::get<1>(buffer_info[0]), 8192 + 8192 / 2); + ASSERT_EQ(std::get<0>(buffer_info[1]), 8192 + (8192 + 8192 / 2)); + ASSERT_EQ(std::get<1>(buffer_info[1]), 8192 / 2); + } else { + // The main buffer has 12288-24576, so 12288-16384 is a cache hit. + // Overlap buffer does not get used + fpb.TEST_GetOverlapBufferOffsetandSize(overlap_buffer_info); + ASSERT_EQ(overlap_buffer_info.first, 8192); + ASSERT_EQ(overlap_buffer_info.second, 8192); + ASSERT_EQ(overlap_buffer_write_ct, 2); + // Main buffer stays the same + ASSERT_EQ(std::get<0>(buffer_info[0]), 12288); + ASSERT_EQ(std::get<1>(buffer_info[0]), 12288); + } + + // Read from 16000-26000 (start and end do not meet normal alignment) + ASSERT_TRUE( + fpb.TryReadFromCache(IOOptions(), r.get(), 16000, 10000, &result, &s)); + ASSERT_EQ(s, Status::OK()); + ASSERT_EQ(stats->getAndResetTickerCount(PREFETCH_HITS), 0); + ASSERT_EQ( + stats->getAndResetTickerCount(PREFETCH_BYTES_USEFUL), + /* 24576(end offset of the buffer) - 16000(requested offset) =*/8576); + ASSERT_EQ(strncmp(result.data(), content.substr(16000, 10000).c_str(), 10000), + 0); + fpb.TEST_GetOverlapBufferOffsetandSize(overlap_buffer_info); + fpb.TEST_GetBufferOffsetandSize(buffer_info); + if (use_async_prefetch) { + // Overlap buffer reuses bytes 16000 to 20480 + ASSERT_EQ(overlap_buffer_info.first, 16000); + ASSERT_EQ(overlap_buffer_info.second, 10000); + // First 2 writes are reusing existing 2 buffers. Last write fills in + // what could not be found in either. + ASSERT_EQ(overlap_buffer_write_ct, 3); + ASSERT_EQ(std::get<0>(buffer_info[0]), 24576); + ASSERT_EQ(std::get<1>(buffer_info[0]), 32768 - 24576); + ASSERT_EQ(std::get<0>(buffer_info[1]), 32768); + ASSERT_EQ(std::get<1>(buffer_info[1]), 4096); + ASSERT_TRUE(std::get<2>( + buffer_info[1])); // in progress async request (otherwise we should not + // be getting 4096 for the size) + } else { + // Overlap buffer reuses bytes 16000 to 24576 + ASSERT_EQ(overlap_buffer_info.first, 16000); + ASSERT_EQ(overlap_buffer_info.second, 10000); + ASSERT_EQ(overlap_buffer_write_ct, 4); + // Even if you try to readahead to offset 16000 + 10000 + 8192, there are + // only 32768 bytes in the original file + ASSERT_EQ(std::get<0>(buffer_info[0]), 12288 + 12288); + ASSERT_EQ(std::get<1>(buffer_info[0]), 8192); + } +} + +TEST_P(FSBufferPrefetchTest, FSBufferPrefetchUnalignedReads) { + // Check that the main buffer, the overlap_buf_, and the secondary buffer (in + // the case of num_buffers_ > 1) are populated correctly + // while reading with no regard to alignment + std::string fname = "fs-buffer-prefetch-unaligned-reads"; + Random rand(0); + std::string content = rand.RandomString(1000); + Write(fname, content); + + FileOptions opts; + std::unique_ptr r; + Read(fname, opts, &r); + + std::shared_ptr stats = CreateDBStatistics(); + ReadaheadParams readahead_params; + // Readahead size will double each time + readahead_params.initial_readahead_size = 5; + readahead_params.max_readahead_size = 100; + bool use_async_prefetch = GetParam(); + size_t num_buffers = use_async_prefetch ? 2 : 1; + readahead_params.num_buffers = num_buffers; + FilePrefetchBuffer fpb(readahead_params, true, false, fs(), clock(), + stats.get()); + + int overlap_buffer_write_ct = 0; + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "FilePrefetchBuffer::CopyDataToOverlapBuffer:Complete", + [&](void* /*arg*/) { overlap_buffer_write_ct++; }); + SyncPoint::GetInstance()->EnableProcessing(); + + Slice result; + // Read 3 bytes at offset 5 + Status s; + std::vector> buffer_info(num_buffers); + std::pair overlap_buffer_info; + bool could_read_from_cache = + fpb.TryReadFromCache(IOOptions(), r.get(), 5, 3, &result, &s); + // Platforms that don't have IO uring may not support async IO. + if (use_async_prefetch && s.IsNotSupported()) { + return; + } + ASSERT_TRUE(could_read_from_cache); + ASSERT_EQ(s, Status::OK()); + ASSERT_EQ(strncmp(result.data(), content.substr(5, 3).c_str(), 3), 0); + fpb.TEST_GetOverlapBufferOffsetandSize(overlap_buffer_info); + fpb.TEST_GetBufferOffsetandSize(buffer_info); + if (use_async_prefetch) { + // Overlap buffer is not used + ASSERT_EQ(overlap_buffer_info.first, 0); + ASSERT_EQ(overlap_buffer_info.second, 0); + // With async prefetching, we still try to align to 4096 bytes, so + // our main buffer read and secondary buffer prefetch are rounded up + ASSERT_EQ(std::get<0>(buffer_info[0]), 0); + ASSERT_EQ(std::get<1>(buffer_info[0]), 1000); + // This buffer won't actually get filled up with data since there is nothing + // after 1000 + ASSERT_EQ(std::get<0>(buffer_info[1]), 4096); + ASSERT_EQ(std::get<1>(buffer_info[1]), 4096); + ASSERT_TRUE(std::get<2>(buffer_info[1])); // in progress async request + } else { + // Overlap buffer is not used + ASSERT_EQ(overlap_buffer_info.first, 0); + ASSERT_EQ(overlap_buffer_info.second, 0); + // Main buffer contains the requested data + 5 of prefetched data (5 - 13) + ASSERT_EQ(std::get<0>(buffer_info[0]), 5); + ASSERT_EQ(std::get<1>(buffer_info[0]), 3 + 5); + } + + ASSERT_TRUE(fpb.TryReadFromCache(IOOptions(), r.get(), 16, 7, &result, &s)); + ASSERT_EQ(s, Status::OK()); + ASSERT_EQ(strncmp(result.data(), content.substr(16, 7).c_str(), 7), 0); + fpb.TEST_GetOverlapBufferOffsetandSize(overlap_buffer_info); + fpb.TEST_GetBufferOffsetandSize(buffer_info); + if (use_async_prefetch) { + // Complete hit since we have the entire file loaded in the main buffer + // The remaining requests will be the same when use_async_prefetch is true + ASSERT_EQ(overlap_buffer_info.first, 0); + ASSERT_EQ(overlap_buffer_info.second, 0); + ASSERT_EQ(std::get<0>(buffer_info[0]), 0); + ASSERT_EQ(std::get<1>(buffer_info[0]), 1000); + } else { + // Complete miss: read 7 bytes at offset 16 + // Overlap buffer is not used (no partial hit) + ASSERT_EQ(overlap_buffer_info.first, 0); + ASSERT_EQ(overlap_buffer_info.second, 0); + // Main buffer contains the requested data + 10 of prefetched data (16 - 33) + ASSERT_EQ(std::get<0>(buffer_info[0]), 16); + ASSERT_EQ(std::get<1>(buffer_info[0]), 7 + 10); + } + + // Go backwards + if (use_async_prefetch) { + ASSERT_TRUE(fpb.TryReadFromCache(IOOptions(), r.get(), 10, 8, &result, &s)); + } else { + // TryReadFromCacheUntracked returns false since the offset + // requested is less than the start of our buffer + ASSERT_FALSE( + fpb.TryReadFromCache(IOOptions(), r.get(), 10, 8, &result, &s)); + } + ASSERT_EQ(s, Status::OK()); + + ASSERT_TRUE(fpb.TryReadFromCache(IOOptions(), r.get(), 27, 6, &result, &s)); + ASSERT_EQ(s, Status::OK()); + ASSERT_EQ(strncmp(result.data(), content.substr(27, 6).c_str(), 6), 0); + fpb.TEST_GetOverlapBufferOffsetandSize(overlap_buffer_info); + fpb.TEST_GetBufferOffsetandSize(buffer_info); + if (use_async_prefetch) { + // Complete hit since we have the entire file loaded in the main buffer + ASSERT_EQ(overlap_buffer_info.first, 0); + ASSERT_EQ(overlap_buffer_info.second, 0); + ASSERT_EQ(std::get<0>(buffer_info[0]), 0); + ASSERT_EQ(std::get<1>(buffer_info[0]), 1000); + } else { + // Complete hit + // Overlap buffer still not used + ASSERT_EQ(overlap_buffer_info.first, 0); + ASSERT_EQ(overlap_buffer_info.second, 0); + // Main buffer unchanged + ASSERT_EQ(std::get<0>(buffer_info[0]), 16); + ASSERT_EQ(std::get<1>(buffer_info[0]), 7 + 10); + } + + ASSERT_TRUE(fpb.TryReadFromCache(IOOptions(), r.get(), 30, 20, &result, &s)); + ASSERT_EQ(s, Status::OK()); + ASSERT_EQ(strncmp(result.data(), content.substr(30, 20).c_str(), 20), 0); + fpb.TEST_GetOverlapBufferOffsetandSize(overlap_buffer_info); + fpb.TEST_GetBufferOffsetandSize(buffer_info); + if (use_async_prefetch) { + // Complete hit since we have the entire file loaded in the main buffer + ASSERT_EQ(overlap_buffer_info.first, 0); + ASSERT_EQ(overlap_buffer_info.second, 0); + ASSERT_EQ(std::get<0>(buffer_info[0]), 0); + ASSERT_EQ(std::get<1>(buffer_info[0]), 1000); + } else { + // Partial hit (overlapping with end of main buffer) + // Overlap buffer is used because we already had 30-33 + ASSERT_EQ(overlap_buffer_info.first, 30); + ASSERT_EQ(overlap_buffer_info.second, 20); + ASSERT_EQ(overlap_buffer_write_ct, 2); + // Main buffer has up to offset 50 + 20 of prefetched data + ASSERT_EQ(std::get<0>(buffer_info[0]), 33); + ASSERT_EQ(std::get<1>(buffer_info[0]), (50 - 33) + 20); + } +} + } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) { diff --git a/file/random_access_file_reader.cc b/file/random_access_file_reader.cc index e2f757f5bf..46f5d1c262 100644 --- a/file/random_access_file_reader.cc +++ b/file/random_access_file_reader.cc @@ -199,7 +199,7 @@ IOStatus RandomAccessFileReader::Read(const IOOptions& opts, uint64_t offset, buf.Read(scratch, offset_advance, res_len); } else { scratch = buf.BufferStart() + offset_advance; - aligned_buf->reset(buf.Release()); + *aligned_buf = buf.Release(); } } *result = Slice(scratch, res_len); @@ -384,7 +384,7 @@ IOStatus RandomAccessFileReader::MultiRead(const IOOptions& opts, scratch += r.len; } - aligned_buf->reset(buf.Release()); + *aligned_buf = buf.Release(); fs_reqs = aligned_reqs.data(); num_fs_reqs = aligned_reqs.size(); } @@ -598,8 +598,7 @@ void RandomAccessFileReader::ReadAsyncCallback(FSReadRequest& req, // Set aligned_buf provided by user without additional copy. user_req.scratch = read_async_info->buf_.BufferStart() + offset_advance_len; - read_async_info->user_aligned_buf_->reset( - read_async_info->buf_.Release()); + *read_async_info->user_aligned_buf_ = read_async_info->buf_.Release(); } user_req.result = Slice(user_req.scratch, res_len); } else { diff --git a/file/random_access_file_reader.h b/file/random_access_file_reader.h index 3155136776..3f590be1ca 100644 --- a/file/random_access_file_reader.h +++ b/file/random_access_file_reader.h @@ -25,7 +25,7 @@ class Statistics; class HistogramImpl; class SystemClock; -using AlignedBuf = std::unique_ptr; +using AlignedBuf = FSAllocationPtr; // Align the request r according to alignment and return the aligned result. FSReadRequest Align(const FSReadRequest& r, size_t alignment); diff --git a/table/table_test.cc b/table/table_test.cc index d4e4b3936d..b198baada4 100644 --- a/table/table_test.cc +++ b/table/table_test.cc @@ -3252,14 +3252,14 @@ TEST_P(BlockBasedTableTest, BlockCacheLookupSeqScans) { FilePrefetchBuffer* prefetch_buffer = (static_cast(iter.get())) ->prefetch_buffer(); - std::vector> buffer_info(1); + std::vector> buffer_info(1); prefetch_buffer->TEST_GetBufferOffsetandSize(buffer_info); bbt->TEST_GetDataBlockHandle(read_options, kv_iter->first, block_handle); // It won't prefetch the data of cache hit. // One block data. - ASSERT_EQ(buffer_info[0].second, 4096); - ASSERT_EQ(buffer_info[0].first, block_handle.offset()); + ASSERT_EQ(std::get<1>(buffer_info[0]), 4096); + ASSERT_EQ(std::get<0>(buffer_info[0]), block_handle.offset()); ASSERT_EQ(options.statistics->getAndResetTickerCount(READAHEAD_TRIMMED), 1); @@ -3290,14 +3290,14 @@ TEST_P(BlockBasedTableTest, BlockCacheLookupSeqScans) { FilePrefetchBuffer* prefetch_buffer = (static_cast(iter.get())) ->prefetch_buffer(); - std::vector> buffer_info(1); + std::vector> buffer_info(1); prefetch_buffer->TEST_GetBufferOffsetandSize(buffer_info); bbt->TEST_GetDataBlockHandle(read_options, kv_iter->first, block_handle); // It won't prefetch the data of cache hit. // 3 blocks data. - ASSERT_EQ(buffer_info[0].second, 12288); - ASSERT_EQ(buffer_info[0].first, block_handle.offset()); + ASSERT_EQ(std::get<1>(buffer_info[0]), 12288); + ASSERT_EQ(std::get<0>(buffer_info[0]), block_handle.offset()); for (; kv_iter != kvmap.end() && iter->Valid(); kv_iter++) { ASSERT_EQ(iter->key(), kv_iter->first); @@ -3313,8 +3313,8 @@ TEST_P(BlockBasedTableTest, BlockCacheLookupSeqScans) { // Second Prefetch. prefetch_buffer->TEST_GetBufferOffsetandSize(buffer_info); bbt->TEST_GetDataBlockHandle(read_options, kv_iter->first, block_handle); - ASSERT_EQ(buffer_info[0].second, 20480); - ASSERT_EQ(buffer_info[0].first, block_handle.offset()); + ASSERT_EQ(std::get<1>(buffer_info[0]), 20480); + ASSERT_EQ(std::get<0>(buffer_info[0]), block_handle.offset()); ASSERT_EQ(options.statistics->getAndResetTickerCount(READAHEAD_TRIMMED), 1); @@ -3401,13 +3401,13 @@ TEST_P(BlockBasedTableTest, BlockCacheLookupAsyncScansSeek) { FilePrefetchBuffer* prefetch_buffer = (static_cast(iter.get())) ->prefetch_buffer(); - std::vector> buffer_info(2); + std::vector> buffer_info(2); prefetch_buffer->TEST_GetBufferOffsetandSize(buffer_info); bbt->TEST_GetDataBlockHandle(read_options, kv_iter->first, block_handle); - ASSERT_EQ(buffer_info[0].first, block_handle.offset()); - ASSERT_EQ(buffer_info[0].second, 4096); - ASSERT_EQ(buffer_info[1].second, 0); + ASSERT_EQ(std::get<0>(buffer_info[0]), block_handle.offset()); + ASSERT_EQ(std::get<1>(buffer_info[0]), 4096); + ASSERT_EQ(std::get<1>(buffer_info[1]), 0); ASSERT_EQ(options.statistics->getAndResetTickerCount(READAHEAD_TRIMMED), 2); @@ -3440,21 +3440,21 @@ TEST_P(BlockBasedTableTest, BlockCacheLookupAsyncScansSeek) { FilePrefetchBuffer* prefetch_buffer = (static_cast(iter.get())) ->prefetch_buffer(); - std::vector> buffer_info(2); + std::vector> buffer_info(2); prefetch_buffer->TEST_GetBufferOffsetandSize(buffer_info); { // 1st Buffer Verification. bbt->TEST_GetDataBlockHandle(read_options, kv_iter->first, block_handle); - ASSERT_EQ(buffer_info[0].first, block_handle.offset()); - ASSERT_EQ(buffer_info[0].second, 8192); + ASSERT_EQ(std::get<0>(buffer_info[0]), block_handle.offset()); + ASSERT_EQ(std::get<1>(buffer_info[0]), 8192); // 2nd Buffer Verification. InternalKey ikey_tmp("00000360", 0, kTypeValue); bbt->TEST_GetDataBlockHandle(read_options, ikey_tmp.Encode().ToString(), block_handle); - ASSERT_EQ(buffer_info[1].first, block_handle.offset()); - ASSERT_EQ(buffer_info[1].second, 8192); + ASSERT_EQ(std::get<0>(buffer_info[1]), block_handle.offset()); + ASSERT_EQ(std::get<1>(buffer_info[1]), 8192); ASSERT_EQ(options.statistics->getAndResetTickerCount(READAHEAD_TRIMMED), 1); @@ -3493,21 +3493,21 @@ TEST_P(BlockBasedTableTest, BlockCacheLookupAsyncScansSeek) { ->prefetch_buffer(); { - std::vector> buffer_info(2); + std::vector> buffer_info(2); prefetch_buffer->TEST_GetBufferOffsetandSize(buffer_info); // 1st Buffer Verification. bbt->TEST_GetDataBlockHandle(read_options, kv_iter->first, block_handle); - ASSERT_EQ(buffer_info[0].first, block_handle.offset()); - ASSERT_EQ(buffer_info[0].second, 8192); + ASSERT_EQ(std::get<0>(buffer_info[0]), block_handle.offset()); + ASSERT_EQ(std::get<1>(buffer_info[0]), 8192); // 2nd Buffer Verification. InternalKey ikey_tmp("00000540", 0, kTypeValue); bbt->TEST_GetDataBlockHandle(read_options, ikey_tmp.Encode().ToString(), block_handle); - ASSERT_EQ(buffer_info[1].first, block_handle.offset()); - ASSERT_EQ(buffer_info[1].second, 8192); + ASSERT_EQ(std::get<0>(buffer_info[1]), block_handle.offset()); + ASSERT_EQ(std::get<1>(buffer_info[1]), 8192); ASSERT_EQ(options.statistics->getAndResetTickerCount(READAHEAD_TRIMMED), 1); @@ -3527,21 +3527,21 @@ TEST_P(BlockBasedTableTest, BlockCacheLookupAsyncScansSeek) { } { - std::vector> buffer_info(2); + std::vector> buffer_info(2); prefetch_buffer->TEST_GetBufferOffsetandSize(buffer_info); // 1st Buffer Verification. bbt->TEST_GetDataBlockHandle(read_options, kv_iter->first, block_handle); - ASSERT_EQ(buffer_info[0].first, block_handle.offset()); - ASSERT_EQ(buffer_info[0].second, 8192); + ASSERT_EQ(std::get<0>(buffer_info[0]), block_handle.offset()); + ASSERT_EQ(std::get<1>(buffer_info[0]), 8192); // 2nd Buffer Verification. InternalKey ikey_tmp("00000585", 0, kTypeValue); bbt->TEST_GetDataBlockHandle(read_options, ikey_tmp.Encode().ToString(), block_handle); - ASSERT_EQ(buffer_info[1].first, block_handle.offset()); - ASSERT_EQ(buffer_info[1].second, 4096); + ASSERT_EQ(std::get<0>(buffer_info[1]), block_handle.offset()); + ASSERT_EQ(std::get<1>(buffer_info[1]), 4096); ASSERT_EQ(options.statistics->getAndResetTickerCount(READAHEAD_TRIMMED), 1); @@ -3561,21 +3561,21 @@ TEST_P(BlockBasedTableTest, BlockCacheLookupAsyncScansSeek) { } { - std::vector> buffer_info(2); + std::vector> buffer_info(2); prefetch_buffer->TEST_GetBufferOffsetandSize(buffer_info); // 1st Buffer Verification. bbt->TEST_GetDataBlockHandle(read_options, kv_iter->first, block_handle); - ASSERT_EQ(buffer_info[0].first, block_handle.offset()); - ASSERT_EQ(buffer_info[0].second, 4096); + ASSERT_EQ(std::get<0>(buffer_info[0]), block_handle.offset()); + ASSERT_EQ(std::get<1>(buffer_info[0]), 4096); // 2nd Buffer Verification. InternalKey ikey_tmp("00000615", 0, kTypeValue); bbt->TEST_GetDataBlockHandle(read_options, ikey_tmp.Encode().ToString(), block_handle); - ASSERT_EQ(buffer_info[1].first, block_handle.offset()); - ASSERT_EQ(buffer_info[1].second, 4096); + ASSERT_EQ(std::get<0>(buffer_info[1]), block_handle.offset()); + ASSERT_EQ(std::get<1>(buffer_info[1]), 4096); ASSERT_EQ(options.statistics->getAndResetTickerCount(READAHEAD_TRIMMED), 1); @@ -3595,21 +3595,21 @@ TEST_P(BlockBasedTableTest, BlockCacheLookupAsyncScansSeek) { } { - std::vector> buffer_info(2); + std::vector> buffer_info(2); prefetch_buffer->TEST_GetBufferOffsetandSize(buffer_info); // 1st Buffer Verification. bbt->TEST_GetDataBlockHandle(read_options, kv_iter->first, block_handle); - ASSERT_EQ(buffer_info[0].first, block_handle.offset()); - ASSERT_EQ(buffer_info[0].second, 4096); + ASSERT_EQ(std::get<0>(buffer_info[0]), block_handle.offset()); + ASSERT_EQ(std::get<1>(buffer_info[0]), 4096); // 2nd Buffer Verification. InternalKey ikey_tmp("00000630", 0, kTypeValue); bbt->TEST_GetDataBlockHandle(read_options, ikey_tmp.Encode().ToString(), block_handle); - ASSERT_EQ(buffer_info[1].first, block_handle.offset()); - ASSERT_EQ(buffer_info[1].second, 8192); + ASSERT_EQ(std::get<0>(buffer_info[1]), block_handle.offset()); + ASSERT_EQ(std::get<1>(buffer_info[1]), 8192); ASSERT_EQ(options.statistics->getAndResetTickerCount(READAHEAD_TRIMMED), 0); diff --git a/unreleased_history/performance_improvements/reuse_file_system_buffer_prefetch.md b/unreleased_history/performance_improvements/reuse_file_system_buffer_prefetch.md new file mode 100644 index 0000000000..f35735ccc4 --- /dev/null +++ b/unreleased_history/performance_improvements/reuse_file_system_buffer_prefetch.md @@ -0,0 +1 @@ +* Enable reuse of file system allocated buffer for synchronous prefetching. diff --git a/util/aligned_buffer.h b/util/aligned_buffer.h index acab56c215..dbe34b6da0 100644 --- a/util/aligned_buffer.h +++ b/util/aligned_buffer.h @@ -12,7 +12,7 @@ #include #include "port/port.h" - +#include "rocksdb/file_system.h" namespace ROCKSDB_NAMESPACE { // This file contains utilities to handle the alignment of pages and buffers. @@ -56,7 +56,7 @@ inline size_t Rounddown(size_t x, size_t y) { return (x / y) * y; } // copy_offset, copy_len); class AlignedBuffer { size_t alignment_; - std::unique_ptr buf_; + FSAllocationPtr buf_; size_t capacity_; size_t cursize_; char* bufstart_; @@ -100,11 +100,11 @@ class AlignedBuffer { void Clear() { cursize_ = 0; } - char* Release() { + FSAllocationPtr Release() { cursize_ = 0; capacity_ = 0; bufstart_ = nullptr; - return buf_.release(); + return std::move(buf_); } void Alignment(size_t alignment) { @@ -113,6 +113,17 @@ class AlignedBuffer { alignment_ = alignment; } + // Points the buffer to new_buf (taking ownership) without allocating extra + // memory or performing any data copies. This method is called when we want to + // reuse the buffer provided by the file system + void SetBuffer(size_t size, FSAllocationPtr&& new_buf) { + alignment_ = 1; + capacity_ = size; + cursize_ = size; + bufstart_ = reinterpret_cast(new_buf.get()); + buf_ = std::move(new_buf); + } + // Allocates a new buffer and sets the start position to the first aligned // byte. // @@ -156,7 +167,11 @@ class AlignedBuffer { bufstart_ = new_bufstart; capacity_ = new_capacity; - buf_.reset(new_buf); + // buf_ is a FSAllocationPtr which takes in a deleter + // we can just wrap the regular default delete that would have been called + buf_ = std::unique_ptr>( + static_cast(new_buf), + [](void* p) { delete[] static_cast(p); }); } // Append to the buffer.