From ff9ad2c39bd1ce6b8b7e2aba4834357115fd9d9d Mon Sep 17 00:00:00 2001 From: akankshamahajan Date: Tue, 1 Nov 2022 16:06:51 -0700 Subject: [PATCH] Fix async_io failures in case there is error in reading data (#10890) Summary: Fix memory corruption error in scans if async_io is enabled. Memory corruption happened if data is overlapping between two buffers. If there is IOError while reading the data, it leads to empty buffer and other buffer already in progress of async read goes again for reading causing the error. Fix: Added check to abort IO in second buffer if curr_ got empty. This PR also fixes db_stress failures which happened when buffers are not aligned. Pull Request resolved: https://github.com/facebook/rocksdb/pull/10890 Test Plan: - Ran make crash_test -j32 with async_io enabled. - Ran benchmarks to make sure there is no regression. Reviewed By: anand1976 Differential Revision: D40881731 Pulled By: akankshamahajan15 fbshipit-source-id: 39fcf2134c7b1bbb08415ede3e1ef261ac2dbc58 --- HISTORY.md | 1 + file/file_prefetch_buffer.cc | 201 +++++++++++++++++++++++------------ file/file_prefetch_buffer.h | 29 +++++ 3 files changed, 165 insertions(+), 66 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index 2dfce83a1a..b57b84374c 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -5,6 +5,7 @@ ### Bug Fixes * Fix FIFO compaction causing corruption of overlapping seqnos in L0 files due to ingesting files of overlapping seqnos with memtable's under `CompactionOptionsFIFO::allow_compaction=true` or `CompactionOptionsFIFO::age_for_warm>0` or `CompactRange()/CompactFiles()` is used. Before the fix, `force_consistency_checks=true` may catch the corruption before it's exposed to readers, in which case writes returning `Status::Corruption` would be expected. +* Fix memory corruption error in scans if async_io is enabled. Memory corruption happened if there is IOError while reading the data leading to empty buffer and other buffer already in progress of async read goes again for reading. ### New Features * Add basic support for user-defined timestamp to Merge (#10819). diff --git a/file/file_prefetch_buffer.cc b/file/file_prefetch_buffer.cc index 00a76aed53..9222acdfa7 100644 --- a/file/file_prefetch_buffer.cc +++ b/file/file_prefetch_buffer.cc @@ -225,10 +225,8 @@ void FilePrefetchBuffer::AbortIOIfNeeded(uint64_t offset) { bufs_[second].async_read_in_progress_ = false; } - if (bufs_[curr_].io_handle_ == nullptr && - bufs_[curr_].async_read_in_progress_) { + if (bufs_[curr_].io_handle_ == nullptr) { bufs_[curr_].async_read_in_progress_ = false; - curr_ = curr_ ^ 1; } } @@ -268,16 +266,36 @@ void FilePrefetchBuffer::UpdateBuffersIfNeeded(uint64_t offset) { bufs_[second].buffer_.Clear(); } - // If data starts from second buffer, make it curr_. Second buffer can be - // either partial filled or full. - if (!bufs_[second].async_read_in_progress_ && DoesBufferContainData(second) && - IsOffsetInBuffer(offset, second)) { - // Clear the curr_ as buffers have been swapped and curr_ contains the - // outdated data and switch the buffers. - if (!bufs_[curr_].async_read_in_progress_) { - bufs_[curr_].buffer_.Clear(); + { + // In case buffers do not align, reset second buffer. This can happen in + // case readahead_size is set. + if (!bufs_[second].async_read_in_progress_ && + !bufs_[curr_].async_read_in_progress_) { + if (DoesBufferContainData(curr_)) { + if (bufs_[curr_].offset_ + bufs_[curr_].buffer_.CurrentSize() != + bufs_[second].offset_) { + bufs_[second].buffer_.Clear(); + } + } else { + if (!IsOffsetInBuffer(offset, second)) { + bufs_[second].buffer_.Clear(); + } + } + } + } + + // If data starts from second buffer, make it curr_. Second buffer can be + // either partial filled, full or async read is in progress. + if (bufs_[second].async_read_in_progress_) { + if (IsOffsetInBufferWithAsyncProgress(offset, second)) { + curr_ = curr_ ^ 1; + } + } else { + if (DoesBufferContainData(second) && IsOffsetInBuffer(offset, second)) { + assert(bufs_[curr_].async_read_in_progress_ || + bufs_[curr_].buffer_.CurrentSize() == 0); + curr_ = curr_ ^ 1; } - curr_ = curr_ ^ 1; } } @@ -300,53 +318,16 @@ void FilePrefetchBuffer::PollAndUpdateBuffersIfNeeded(uint64_t offset) { UpdateBuffersIfNeeded(offset); } -// If async_io is enabled in case of sequential reads, PrefetchAsyncInternal is -// called. When buffers are switched, we clear the curr_ buffer as we assume the -// data has been consumed because of sequential reads. -// Data in buffers will always be sequential with curr_ following second and -// not vice versa. -// -// Scenarios for prefetching asynchronously: -// Case1: If both buffers are empty, prefetch n + readahead_size_/2 bytes -// synchronously in curr_ and prefetch readahead_size_/2 async in second -// buffer. -// Case2: If second buffer has partial or full data, make it current and -// prefetch readahead_size_/2 async in second buffer. In case of -// partial data, prefetch remaining bytes from size n synchronously to -// fulfill the requested bytes request. -// Case3: If curr_ has partial data, prefetch remaining bytes from size n -// synchronously in curr_ to fulfill the requested bytes request and -// prefetch readahead_size_/2 bytes async in second buffer. -// Case4: (Special case) If data is in both buffers, copy requested data from -// curr_, send async request on curr_, wait for poll to fill second -// buffer (if any), and copy remaining data from second buffer to third -// buffer. -Status FilePrefetchBuffer::PrefetchAsyncInternal( +Status FilePrefetchBuffer::HandleOverlappingData( const IOOptions& opts, RandomAccessFileReader* reader, uint64_t offset, - size_t length, size_t readahead_size, Env::IOPriority rate_limiter_priority, - bool& copy_to_third_buffer) { - if (!enable_) { - return Status::OK(); - } - - TEST_SYNC_POINT("FilePrefetchBuffer::PrefetchAsyncInternal:Start"); - - size_t alignment = reader->file()->GetRequiredBufferAlignment(); + size_t length, size_t readahead_size, + Env::IOPriority /*rate_limiter_priority*/, bool& copy_to_third_buffer, + uint64_t& tmp_offset, size_t& tmp_length) { Status s; - uint64_t tmp_offset = offset; - size_t tmp_length = length; - - // 1. Abort IO and swap buffers if needed to point curr_ to first buffer with - // data. - { - if (!explicit_prefetch_submitted_) { - AbortIOIfNeeded(offset); - } - UpdateBuffersIfNeeded(offset); - } + size_t alignment = reader->file()->GetRequiredBufferAlignment(); uint32_t second = curr_ ^ 1; - // 2. If data is overlapping over two buffers, copy the data from curr_ and + // If data is overlapping over two buffers, copy the data from curr_ and // call ReadAsync on curr_. if (!bufs_[curr_].async_read_in_progress_ && DoesBufferContainData(curr_) && IsOffsetInBuffer(offset, curr_) && @@ -391,21 +372,80 @@ Status FilePrefetchBuffer::PrefetchAsyncInternal( } curr_ = curr_ ^ 1; } + return s; +} +// If async_io is enabled in case of sequential reads, PrefetchAsyncInternal is +// called. When buffers are switched, we clear the curr_ buffer as we assume the +// data has been consumed because of sequential reads. +// Data in buffers will always be sequential with curr_ following second and +// not vice versa. +// +// Scenarios for prefetching asynchronously: +// Case1: If both buffers are empty, prefetch n + readahead_size_/2 bytes +// synchronously in curr_ and prefetch readahead_size_/2 async in second +// buffer. +// Case2: If second buffer has partial or full data, make it current and +// prefetch readahead_size_/2 async in second buffer. In case of +// partial data, prefetch remaining bytes from size n synchronously to +// fulfill the requested bytes request. +// Case3: If curr_ has partial data, prefetch remaining bytes from size n +// synchronously in curr_ to fulfill the requested bytes request and +// prefetch readahead_size_/2 bytes async in second buffer. +// Case4: (Special case) If data is in both buffers, copy requested data from +// curr_, send async request on curr_, wait for poll to fill second +// buffer (if any), and copy remaining data from second buffer to third +// buffer. +Status FilePrefetchBuffer::PrefetchAsyncInternal( + const IOOptions& opts, RandomAccessFileReader* reader, uint64_t offset, + size_t length, size_t readahead_size, Env::IOPriority rate_limiter_priority, + bool& copy_to_third_buffer) { + if (!enable_) { + return Status::OK(); + } + + TEST_SYNC_POINT("FilePrefetchBuffer::PrefetchAsyncInternal:Start"); + + size_t alignment = reader->file()->GetRequiredBufferAlignment(); + Status s; + uint64_t tmp_offset = offset; + size_t tmp_length = length; + + // 1. Abort IO and swap buffers if needed to point curr_ to first buffer with + // data. + if (!explicit_prefetch_submitted_) { + AbortIOIfNeeded(offset); + } + UpdateBuffersIfNeeded(offset); + + // 2. Handle overlapping data over two buffers. If data is overlapping then + // during this call: + // - data from curr_ is copied into third buffer, + // - curr_ is send for async prefetching of further data if second buffer + // contains remaining requested data or in progress for async prefetch, + // - switch buffers and curr_ now points to second buffer to copy remaining + // data. + s = HandleOverlappingData(opts, reader, offset, length, readahead_size, + rate_limiter_priority, copy_to_third_buffer, + tmp_offset, tmp_length); + if (!s.ok()) { + return s; + } // 3. Call Poll only if data is needed for the second buffer. - // - Return if whole data is in curr_ and second buffer in progress. + // - Return if whole data is in curr_ and second buffer is in progress or + // already full. // - If second buffer is empty, it will go for ReadAsync for second buffer. if (!bufs_[curr_].async_read_in_progress_ && DoesBufferContainData(curr_) && IsDataBlockInBuffer(offset, length, curr_)) { // Whole data is in curr_. UpdateBuffersIfNeeded(offset); - second = curr_ ^ 1; - if (bufs_[second].async_read_in_progress_) { + if (!IsSecondBuffEligibleForPrefetching()) { return s; } } else { + // After poll request, curr_ might be empty because of IOError in + // callback while reading or may contain required data. PollAndUpdateBuffersIfNeeded(offset); - second = curr_ ^ 1; } if (copy_to_third_buffer) { @@ -427,19 +467,42 @@ Status FilePrefetchBuffer::PrefetchAsyncInternal( if (explicit_prefetch_submitted_) { return s; } + if (!IsSecondBuffEligibleForPrefetching()) { + return s; + } + } + + uint32_t second = curr_ ^ 1; + assert(!bufs_[curr_].async_read_in_progress_); + + // In case because of some IOError curr_ got empty, abort IO for second as + // well. Otherwise data might not align if more data needs to be read in curr_ + // which might overlap with second buffer. + if (!DoesBufferContainData(curr_) && bufs_[second].async_read_in_progress_) { + if (bufs_[second].io_handle_ != nullptr) { + std::vector handles; + handles.emplace_back(bufs_[second].io_handle_); + { + StopWatch sw(clock_, stats_, ASYNC_PREFETCH_ABORT_MICROS); + Status status = fs_->AbortIO(handles); + assert(status.ok()); + } + } + DestroyAndClearIOHandle(second); + bufs_[second].buffer_.Clear(); } // 5. Data is overlapping i.e. some of the data has been copied to third - // buffer - // and remaining will be updated below. - if (copy_to_third_buffer) { + // buffer and remaining will be updated below. + if (copy_to_third_buffer && DoesBufferContainData(curr_)) { CopyDataToBuffer(curr_, offset, length); // Length == 0: All the requested data has been copied to third buffer and // it has already gone for async prefetching. It can return without doing // anything further. - // Length > 0: More data needs to be consumed so it will continue async and - // sync prefetching and copy the remaining data to third buffer in the end. + // Length > 0: More data needs to be consumed so it will continue async + // and sync prefetching and copy the remaining data to third buffer in the + // end. if (length == 0) { return s; } @@ -458,6 +521,9 @@ Status FilePrefetchBuffer::PrefetchAsyncInternal( uint64_t chunk_len1 = 0; uint64_t read_len1 = 0; + assert(!bufs_[second].async_read_in_progress_ && + !DoesBufferContainData(second)); + // For length == 0, skip the synchronous prefetching. read_len1 will be 0. if (length > 0) { CalculateOffsetAndLen(alignment, offset, roundup_len1, curr_, @@ -594,8 +660,11 @@ bool FilePrefetchBuffer::TryReadFromCacheAsync( } if (explicit_prefetch_submitted_) { - if (prev_offset_ != offset) { - // Random offset called. So abort the IOs. + // explicit_prefetch_submitted_ is special case where it expects request + // submitted in PrefetchAsync should match with this request. Otherwise + // buffers will be outdated. + // Random offset called. So abort the IOs. + if (bufs_[curr_].offset_ != offset) { AbortAllIOs(); bufs_[curr_].buffer_.Clear(); bufs_[curr_ ^ 1].buffer_.Clear(); diff --git a/file/file_prefetch_buffer.h b/file/file_prefetch_buffer.h index 00ea8ae645..bd8ba9d7d4 100644 --- a/file/file_prefetch_buffer.h +++ b/file/file_prefetch_buffer.h @@ -8,6 +8,7 @@ // found in the LICENSE file. See the AUTHORS file for names of contributors. #pragma once + #include #include #include @@ -363,6 +364,27 @@ class FilePrefetchBuffer { bufs_[index].io_handle_ != nullptr && offset >= bufs_[index].offset_ + bufs_[index].async_req_len_); } + bool IsOffsetInBufferWithAsyncProgress(uint64_t offset, uint32_t index) { + return (bufs_[index].async_read_in_progress_ && + offset >= bufs_[index].offset_ && + offset < bufs_[index].offset_ + bufs_[index].async_req_len_); + } + + bool IsSecondBuffEligibleForPrefetching() { + uint32_t second = curr_ ^ 1; + if (bufs_[second].async_read_in_progress_) { + return false; + } + assert(!bufs_[curr_].async_read_in_progress_); + + if (DoesBufferContainData(curr_) && DoesBufferContainData(second) && + (bufs_[curr_].offset_ + bufs_[curr_].buffer_.CurrentSize() == + bufs_[second].offset_)) { + return false; + } + bufs_[second].buffer_.Clear(); + return true; + } void DestroyAndClearIOHandle(uint32_t index) { if (bufs_[index].io_handle_ != nullptr && bufs_[index].del_fn_ != nullptr) { @@ -373,6 +395,13 @@ class FilePrefetchBuffer { bufs_[index].async_read_in_progress_ = false; } + Status HandleOverlappingData(const IOOptions& opts, + RandomAccessFileReader* reader, uint64_t offset, + size_t length, size_t readahead_size, + Env::IOPriority rate_limiter_priority, + bool& copy_to_third_buffer, uint64_t& tmp_offset, + size_t& tmp_length); + std::vector bufs_; // curr_ represents the index for bufs_ indicating which buffer is being // consumed currently.