Fix Assertion `roundup_len2 >= alignment' failed in crash tests (#11852)

Summary:
When auto_readahead_size is enabled in async_io, during seek, first buffer will prefetch the data - (current block + readahead till upper_bound). There can be cases where
1.  first buffer prefetched all the data till upper bound, or
2.  first buffer already has the data from prev seek call
and second buffer prefetch further leading to alignment issues.

This PR fixes that assertion and second buffer won't go for prefetching if first buffer has already prefetched till upper_bound.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/11852

Test Plan:
- Added new unit test that failed without this fix.
- crash tests passed locally

Reviewed By: pdillinger

Differential Revision: D49384138

Pulled By: akankshamahajan15

fbshipit-source-id: 54417e909e4d986f1e5a17dbaea059cd4962fd4d
This commit is contained in:
akankshamahajan 2023-09-20 16:13:20 -07:00 committed by Facebook GitHub Bot
parent 089070cb36
commit c1a97fe1f6
4 changed files with 104 additions and 27 deletions

View File

@ -364,8 +364,11 @@ Status FilePrefetchBuffer::HandleOverlappingData(
size_t second_size = bufs_[second].async_read_in_progress_ size_t second_size = bufs_[second].async_read_in_progress_
? bufs_[second].async_req_len_ ? bufs_[second].async_req_len_
: bufs_[second].buffer_.CurrentSize(); : bufs_[second].buffer_.CurrentSize();
if (tmp_offset + tmp_length <= bufs_[second].offset_ + second_size) { uint64_t rounddown_start = bufs_[second].offset_ + second_size;
uint64_t rounddown_start = bufs_[second].offset_ + second_size; // Second buffer might be out of bound if first buffer already prefetched
// that data.
if (tmp_offset + tmp_length <= bufs_[second].offset_ + second_size &&
!IsOffsetOutOfBound(rounddown_start)) {
uint64_t roundup_end = uint64_t roundup_end =
Roundup(rounddown_start + readahead_size, alignment); Roundup(rounddown_start + readahead_size, alignment);
uint64_t roundup_len = roundup_end - rounddown_start; uint64_t roundup_len = roundup_end - rounddown_start;
@ -562,20 +565,24 @@ Status FilePrefetchBuffer::PrefetchAsyncInternal(const IOOptions& opts,
roundup_end2 = Roundup(rounddown_start2 + prefetch_size, alignment); roundup_end2 = Roundup(rounddown_start2 + prefetch_size, alignment);
} }
uint64_t roundup_len2 = roundup_end2 - rounddown_start2; // Second buffer might be out of bound if first buffer already prefetched
uint64_t chunk_len2 = 0; // that data.
CalculateOffsetAndLen(alignment, rounddown_start2, roundup_len2, second, if (!IsOffsetOutOfBound(rounddown_start2)) {
false /*refit_tail*/, chunk_len2); uint64_t roundup_len2 = roundup_end2 - rounddown_start2;
assert(chunk_len2 == 0); uint64_t chunk_len2 = 0;
// Update the buffer offset. CalculateOffsetAndLen(alignment, rounddown_start2, roundup_len2, second,
bufs_[second].offset_ = rounddown_start2; false /*refit_tail*/, chunk_len2);
assert(roundup_len2 >= chunk_len2); assert(chunk_len2 == 0);
uint64_t read_len2 = static_cast<size_t>(roundup_len2 - chunk_len2); // Update the buffer offset.
s = ReadAsync(opts, reader, read_len2, rounddown_start2, second); bufs_[second].offset_ = rounddown_start2;
if (!s.ok()) { assert(roundup_len2 >= chunk_len2);
DestroyAndClearIOHandle(second); uint64_t read_len2 = static_cast<size_t>(roundup_len2 - chunk_len2);
bufs_[second].buffer_.Clear(); s = ReadAsync(opts, reader, read_len2, rounddown_start2, second);
return s; if (!s.ok()) {
DestroyAndClearIOHandle(second);
bufs_[second].buffer_.Clear();
return s;
}
} }
} }
@ -925,17 +932,22 @@ Status FilePrefetchBuffer::PrefetchAsync(const IOOptions& opts,
rounddown_start2 = roundup_end1; rounddown_start2 = roundup_end1;
} }
roundup_end2 = Roundup(rounddown_start2 + prefetch_size, alignment); // Second buffer might be out of bound if first buffer already prefetched
uint64_t roundup_len2 = roundup_end2 - rounddown_start2; // that data.
if (!IsOffsetOutOfBound(rounddown_start2)) {
roundup_end2 = Roundup(rounddown_start2 + prefetch_size, alignment);
uint64_t roundup_len2 = roundup_end2 - rounddown_start2;
assert(roundup_len2 >= alignment); assert(roundup_len2 >= alignment);
CalculateOffsetAndLen(alignment, rounddown_start2, roundup_len2, second,
false, chunk_len2); CalculateOffsetAndLen(alignment, rounddown_start2, roundup_len2, second,
assert(chunk_len2 == 0); false, chunk_len2);
assert(roundup_len2 >= chunk_len2); assert(chunk_len2 == 0);
read_len2 = static_cast<size_t>(roundup_len2 - chunk_len2); assert(roundup_len2 >= chunk_len2);
// Update the buffer offset. read_len2 = static_cast<size_t>(roundup_len2 - chunk_len2);
bufs_[second].offset_ = rounddown_start2; // Update the buffer offset.
bufs_[second].offset_ = rounddown_start2;
}
} }
if (read_len1) { if (read_len1) {

View File

@ -441,6 +441,13 @@ class FilePrefetchBuffer {
} }
} }
inline bool IsOffsetOutOfBound(uint64_t offset) {
if (upper_bound_offset_ > 0) {
return (offset >= upper_bound_offset_);
}
return false;
}
std::vector<BufferInfo> bufs_; std::vector<BufferInfo> bufs_;
// curr_ represents the index for bufs_ indicating which buffer is being // curr_ represents the index for bufs_ indicating which buffer is being
// consumed currently. // consumed currently.

View File

@ -3117,7 +3117,64 @@ TEST_F(FilePrefetchBufferTest, NoSyncWithAsyncIO) {
// Length should be 4000. // Length should be 4000.
ASSERT_EQ(async_result.size(), 4000); ASSERT_EQ(async_result.size(), 4000);
// Data correctness. // Data correctness.
Slice result(content.c_str() + 3000, 4000); Slice result(&content[3000], 4000);
ASSERT_EQ(result.size(), 4000);
ASSERT_EQ(result, async_result);
}
// This test checks if during seek in async_io, if first buffer already
// prefetched the data till upper_bound offset, second buffer shouldn't go for
// prefetching.
TEST_F(FilePrefetchBufferTest, IterateUpperBoundTest1) {
std::string fname = "iterate-upperbound-test1";
Random rand(0);
std::string content = rand.RandomString(32768);
Write(fname, content);
FileOptions opts;
std::unique_ptr<RandomAccessFileReader> r;
Read(fname, opts, &r);
FilePrefetchBuffer fpb(
/*readahead_size=*/8192, /*max_readahead_size=*/16384, /*enable=*/true,
/*track_min_offset=*/false, /*implicit_auto_readahead=*/false,
/*num_file_reads=*/0, /*num_file_reads_for_auto_readahead=*/0,
/*upper_bound_offset=*/8000, fs());
int read_async_called = 0;
SyncPoint::GetInstance()->SetCallBack(
"FilePrefetchBuffer::ReadAsync",
[&](void* /*arg*/) { read_async_called++; });
SyncPoint::GetInstance()->EnableProcessing();
Slice async_result;
// Simulate a seek of 4000 bytes at offset 3000. Due to the readahead
// settings, it will do 1 read of 4000+1000 (till 8000 - upper bound).
Status s = fpb.PrefetchAsync(IOOptions(), r.get(), 3000, 4000, &async_result);
// Platforms that don't have IO uring may not support async IO
if (s.IsNotSupported()) {
return;
}
ASSERT_TRUE(s.IsTryAgain());
IOOptions io_opts;
io_opts.rate_limiter_priority = Env::IOPriority::IO_LOW;
ASSERT_TRUE(fpb.TryReadFromCacheAsync(io_opts, r.get(), /*offset=*/3000,
/*length=*/4000, &async_result, &s));
// No sync call should be made.
HistogramData sst_read_micros;
stats()->histogramData(SST_READ_MICROS, &sst_read_micros);
ASSERT_EQ(sst_read_micros.count, 0);
// Number of async calls should be 1.
// No Prefetching should happen in second buffer as first buffer has already
// prefetched till offset.
ASSERT_EQ(read_async_called, 1);
// Length should be 4000.
ASSERT_EQ(async_result.size(), 4000);
// Data correctness.
Slice result(&content[3000], 4000);
ASSERT_EQ(result.size(), 4000); ASSERT_EQ(result.size(), 4000);
ASSERT_EQ(result, async_result); ASSERT_EQ(result, async_result);
} }

View File

@ -0,0 +1 @@
Fix an assertion fault during seek with async_io when readahead trimming is enabled.