From 4868c10b44e8bf5174210c212b01824e29c724bf Mon Sep 17 00:00:00 2001 From: anand76 Date: Mon, 18 Mar 2024 16:16:05 -0700 Subject: [PATCH] Retry block reads on checksum mismatch (#12427) Summary: On file systems that support storage level data checksum and reconstruction, retry SST block reads for point lookups, scans, and flush and compaction if there's a checksum mismatch on the initial read. A file system can indicate its support by setting the `FSSupportedOps::kVerifyAndReconstructRead` bit in `SupportedOps`. Tests: Add new unit tests Pull Request resolved: https://github.com/facebook/rocksdb/pull/12427 Reviewed By: ajkr Differential Revision: D55025941 Pulled By: anand1976 fbshipit-source-id: dbd990cb75e03f756c8a66d42956f645c0b6d55e --- db/db_io_failure_test.cc | 218 +++++++++++++++++ include/rocksdb/file_system.h | 3 +- include/rocksdb/statistics.h | 3 + java/rocksjni/portal.h | 4 + .../src/main/java/org/rocksdb/TickerType.java | 2 + monitoring/statistics.cc | 1 + table/block_based/block_based_table_reader.cc | 13 ++ .../block_based_table_reader_sync_and_async.h | 30 ++- table/block_fetcher.cc | 219 ++++++++++-------- table/block_fetcher.h | 6 + .../new_features/retry_on_corruption.md | 1 + 11 files changed, 398 insertions(+), 102 deletions(-) create mode 100644 unreleased_history/new_features/retry_on_corruption.md diff --git a/db/db_io_failure_test.cc b/db/db_io_failure_test.cc index e79272ea7e..3b4509c8cd 100644 --- a/db/db_io_failure_test.cc +++ b/db/db_io_failure_test.cc @@ -13,6 +13,100 @@ #include "util/random.h" namespace ROCKSDB_NAMESPACE { +namespace { +// A wrapper that allows injection of errors. +class CorruptionFS : public FileSystemWrapper { + public: + bool writable_file_error_; + int num_writable_file_errors_; + + explicit CorruptionFS(const std::shared_ptr& _target) + : FileSystemWrapper(_target), + writable_file_error_(false), + num_writable_file_errors_(0), + corruption_trigger_(INT_MAX), + read_count_(0), + rnd_(300) {} + ~CorruptionFS() override { + // Assert that the corruption was reset, which means it got triggered + assert(corruption_trigger_ == INT_MAX); + } + const char* Name() const override { return "ErrorEnv"; } + + IOStatus NewWritableFile(const std::string& fname, const FileOptions& opts, + std::unique_ptr* result, + IODebugContext* dbg) override { + result->reset(); + if (writable_file_error_) { + ++num_writable_file_errors_; + return IOStatus::IOError(fname, "fake error"); + } + return target()->NewWritableFile(fname, opts, result, dbg); + } + + void SetCorruptionTrigger(const int trigger) { + corruption_trigger_ = trigger; + read_count_ = 0; + } + + IOStatus NewRandomAccessFile(const std::string& fname, + const FileOptions& opts, + std::unique_ptr* result, + IODebugContext* dbg) override { + class CorruptionRandomAccessFile : public FSRandomAccessFileOwnerWrapper { + public: + CorruptionRandomAccessFile(CorruptionFS& fs, + std::unique_ptr& file) + : FSRandomAccessFileOwnerWrapper(std::move(file)), fs_(fs) {} + + IOStatus Read(uint64_t offset, size_t len, const IOOptions& opts, + Slice* result, char* scratch, + IODebugContext* dbg) const override { + IOStatus s = target()->Read(offset, len, opts, result, scratch, dbg); + if (opts.verify_and_reconstruct_read) { + return s; + } + if (s.ok() && ++fs_.read_count_ >= fs_.corruption_trigger_) { + fs_.read_count_ = 0; + fs_.corruption_trigger_ = INT_MAX; + char* data = const_cast(result->data()); + std::memcpy( + data, + fs_.rnd_.RandomString(static_cast(result->size())).c_str(), + result->size()); + } + return s; + } + + IOStatus MultiRead(FSReadRequest* reqs, size_t num_reqs, + const IOOptions& options, + IODebugContext* dbg) override { + return FSRandomAccessFile::MultiRead(reqs, num_reqs, options, dbg); + } + + private: + CorruptionFS& fs_; + }; + + std::unique_ptr file; + IOStatus s = target()->NewRandomAccessFile(fname, opts, &file, dbg); + EXPECT_OK(s); + result->reset(new CorruptionRandomAccessFile(*this, file)); + + return s; + } + + void SupportedOps(int64_t& supported_ops) override { + supported_ops = 1 << FSSupportedOps::kVerifyAndReconstructRead | + 1 << FSSupportedOps::kAsyncIO; + } + + private: + int corruption_trigger_; + int read_count_; + Random rnd_; +}; +} // anonymous namespace class DBIOFailureTest : public DBTestBase { public: @@ -579,6 +673,130 @@ TEST_F(DBIOFailureTest, CompactionSstSyncError) { ASSERT_EQ("bar3", Get(1, "foo")); } #endif // !(defined NDEBUG) || !defined(OS_WIN) + +class DBIOCorruptionTest : public DBIOFailureTest, + public testing::WithParamInterface { + public: + DBIOCorruptionTest() : DBIOFailureTest() { + BlockBasedTableOptions bbto; + Options options = CurrentOptions(); + + base_env_ = env_; + EXPECT_NE(base_env_, nullptr); + fs_.reset(new CorruptionFS(base_env_->GetFileSystem())); + env_guard_ = NewCompositeEnv(fs_); + options.env = env_guard_.get(); + bbto.num_file_reads_for_auto_readahead = 0; + options.table_factory.reset(NewBlockBasedTableFactory(bbto)); + options.disable_auto_compactions = true; + + Reopen(options); + } + + ~DBIOCorruptionTest() { + Close(); + db_ = nullptr; + } + + protected: + std::unique_ptr env_guard_; + std::shared_ptr fs_; + Env* base_env_; +}; + +TEST_P(DBIOCorruptionTest, GetReadCorruptionRetry) { + CorruptionFS* fs = + static_cast(env_guard_->GetFileSystem().get()); + + ASSERT_OK(Put("key1", "val1")); + ASSERT_OK(Flush()); + fs->SetCorruptionTrigger(1); + + std::string val; + ReadOptions ro; + ro.async_io = GetParam(); + ASSERT_OK(dbfull()->Get(ReadOptions(), "key1", &val)); + ASSERT_EQ(val, "val1"); +} + +TEST_P(DBIOCorruptionTest, IterReadCorruptionRetry) { + CorruptionFS* fs = + static_cast(env_guard_->GetFileSystem().get()); + + ASSERT_OK(Put("key1", "val1")); + ASSERT_OK(Flush()); + fs->SetCorruptionTrigger(1); + + ReadOptions ro; + ro.readahead_size = 65536; + ro.async_io = GetParam(); + + Iterator* iter = dbfull()->NewIterator(ro); + iter->SeekToFirst(); + while (iter->status().ok() && iter->Valid()) { + iter->Next(); + } + ASSERT_OK(iter->status()); + delete iter; +} + +TEST_P(DBIOCorruptionTest, MultiGetReadCorruptionRetry) { + CorruptionFS* fs = + static_cast(env_guard_->GetFileSystem().get()); + + ASSERT_OK(Put("key1", "val1")); + ASSERT_OK(Put("key2", "val2")); + ASSERT_OK(Flush()); + fs->SetCorruptionTrigger(1); + + std::vector keystr{"key1", "key2"}; + std::vector keys{Slice(keystr[0]), Slice(keystr[1])}; + std::vector values(keys.size()); + std::vector statuses(keys.size()); + ReadOptions ro; + ro.async_io = GetParam(); + dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(), + keys.data(), values.data(), statuses.data()); + ASSERT_EQ(values[0].ToString(), "val1"); + ASSERT_EQ(values[1].ToString(), "val2"); +} + +TEST_P(DBIOCorruptionTest, CompactionReadCorruptionRetry) { + CorruptionFS* fs = + static_cast(env_guard_->GetFileSystem().get()); + + ASSERT_OK(Put("key1", "val1")); + ASSERT_OK(Put("key3", "val3")); + ASSERT_OK(Flush()); + ASSERT_OK(Put("key2", "val2")); + ASSERT_OK(Flush()); + fs->SetCorruptionTrigger(1); + ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + + std::string val; + ReadOptions ro; + ro.async_io = GetParam(); + ASSERT_OK(dbfull()->Get(ro, "key1", &val)); + ASSERT_EQ(val, "val1"); +} + +TEST_P(DBIOCorruptionTest, FlushReadCorruptionRetry) { + CorruptionFS* fs = + static_cast(env_guard_->GetFileSystem().get()); + + ASSERT_OK(Put("key1", "val1")); + fs->SetCorruptionTrigger(1); + ASSERT_OK(Flush()); + + std::string val; + ReadOptions ro; + ro.async_io = GetParam(); + ASSERT_OK(dbfull()->Get(ro, "key1", &val)); + ASSERT_EQ(val, "val1"); +} + +INSTANTIATE_TEST_CASE_P(DBIOCorruptionTest, DBIOCorruptionTest, + testing::Bool()); } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) { diff --git a/include/rocksdb/file_system.h b/include/rocksdb/file_system.h index c1d9b87add..eea69f43cc 100644 --- a/include/rocksdb/file_system.h +++ b/include/rocksdb/file_system.h @@ -150,7 +150,8 @@ struct IOOptions { rate_limiter_priority(Env::IO_TOTAL), type(IOType::kUnknown), force_dir_fsync(force_dir_fsync_), - do_not_recurse(false) {} + do_not_recurse(false), + verify_and_reconstruct_read(false) {} }; struct DirFsyncOptions { diff --git a/include/rocksdb/statistics.h b/include/rocksdb/statistics.h index 7e39ccb1b7..47bf8445fc 100644 --- a/include/rocksdb/statistics.h +++ b/include/rocksdb/statistics.h @@ -526,6 +526,9 @@ enum Tickers : uint32_t { // Number of FS reads avoided due to scan prefetching PREFETCH_HITS, + // Footer corruption detected when opening an SST file for reading + SST_FOOTER_CORRUPTION_COUNT, + TICKER_ENUM_MAX }; diff --git a/java/rocksjni/portal.h b/java/rocksjni/portal.h index 17461af30f..58fdfd9fa5 100644 --- a/java/rocksjni/portal.h +++ b/java/rocksjni/portal.h @@ -5267,6 +5267,8 @@ class TickerTypeJni { return -0x52; case ROCKSDB_NAMESPACE::Tickers::PREFETCH_HITS: return -0x53; + case ROCKSDB_NAMESPACE::Tickers::SST_FOOTER_CORRUPTION_COUNT: + return -0x55; case ROCKSDB_NAMESPACE::Tickers::TICKER_ENUM_MAX: // -0x54 is the max value at this time. Since these values are exposed // directly to Java clients, we'll keep the value the same till the next @@ -5722,6 +5724,8 @@ class TickerTypeJni { return ROCKSDB_NAMESPACE::Tickers::PREFETCH_BYTES_USEFUL; case -0x53: return ROCKSDB_NAMESPACE::Tickers::PREFETCH_HITS; + case -0x55: + return ROCKSDB_NAMESPACE::Tickers::SST_FOOTER_CORRUPTION_COUNT; case -0x54: // -0x54 is the max value at this time. Since these values are exposed // directly to Java clients, we'll keep the value the same till the next diff --git a/java/src/main/java/org/rocksdb/TickerType.java b/java/src/main/java/org/rocksdb/TickerType.java index 1cddbb66b6..90f0b6ba2e 100644 --- a/java/src/main/java/org/rocksdb/TickerType.java +++ b/java/src/main/java/org/rocksdb/TickerType.java @@ -876,6 +876,8 @@ public enum TickerType { PREFETCH_HITS((byte) -0x53), + SST_FOOTER_CORRUPTION_COUNT((byte) -0x55), + TICKER_ENUM_MAX((byte) -0x54); private final byte value; diff --git a/monitoring/statistics.cc b/monitoring/statistics.cc index fdd599aa3f..ed9a089af5 100644 --- a/monitoring/statistics.cc +++ b/monitoring/statistics.cc @@ -265,6 +265,7 @@ const std::vector> TickersNameMap = { {PREFETCH_BYTES, "rocksdb.prefetch.bytes"}, {PREFETCH_BYTES_USEFUL, "rocksdb.prefetch.bytes.useful"}, {PREFETCH_HITS, "rocksdb.prefetch.hits"}, + {SST_FOOTER_CORRUPTION_COUNT, "rocksdb.footer.corruption.count"}, }; const std::vector> HistogramsNameMap = { diff --git a/table/block_based/block_based_table_reader.cc b/table/block_based/block_based_table_reader.cc index 27c34361c3..b46619eddc 100644 --- a/table/block_based/block_based_table_reader.cc +++ b/table/block_based/block_based_table_reader.cc @@ -631,6 +631,19 @@ Status BlockBasedTable::Open( prefetch_buffer.get(), file_size, &footer, kBlockBasedTableMagicNumber); } + // If the footer is corrupted and the FS supports checksum verification and + // correction, try reading the footer again + if (s.IsCorruption()) { + RecordTick(ioptions.statistics.get(), SST_FOOTER_CORRUPTION_COUNT); + if (CheckFSFeatureSupport(ioptions.fs.get(), + FSSupportedOps::kVerifyAndReconstructRead)) { + IOOptions retry_opts = opts; + retry_opts.verify_and_reconstruct_read = true; + s = ReadFooterFromFile(retry_opts, file.get(), *ioptions.fs, + prefetch_buffer.get(), file_size, &footer, + kBlockBasedTableMagicNumber); + } + } if (!s.ok()) { return s; } diff --git a/table/block_based/block_based_table_reader_sync_and_async.h b/table/block_based/block_based_table_reader_sync_and_async.h index c4483a0dec..98cf73dcac 100644 --- a/table/block_based/block_based_table_reader_sync_and_async.h +++ b/table/block_based/block_based_table_reader_sync_and_async.h @@ -214,16 +214,42 @@ DEFINE_SYNC_AND_ASYNC(void, BlockBasedTable::RetrieveMultipleBlocks) if (options.verify_checksums) { PERF_TIMER_GUARD(block_checksum_time); - const char* data = req.result.data(); + const char* data = serialized_block.data.data(); // Since the scratch might be shared, the offset of the data block in // the buffer might not be 0. req.result.data() only point to the // begin address of each read request, we need to add the offset // in each read request. Checksum is stored in the block trailer, // beyond the payload size. - s = VerifyBlockChecksum(footer, data + req_offset, handle.size(), + s = VerifyBlockChecksum(footer, data, handle.size(), rep_->file->file_name(), handle.offset()); RecordTick(ioptions.stats, BLOCK_CHECKSUM_COMPUTE_COUNT); TEST_SYNC_POINT_CALLBACK("RetrieveMultipleBlocks:VerifyChecksum", &s); + if (!s.ok() && + CheckFSFeatureSupport(ioptions.fs.get(), + FSSupportedOps::kVerifyAndReconstructRead)) { + assert(s.IsCorruption()); + assert(!ioptions.allow_mmap_reads); + RecordTick(ioptions.stats, BLOCK_CHECKSUM_MISMATCH_COUNT); + + // Repeat the read for this particular block using the regular + // synchronous Read API. We can use the same chunk of memory + // pointed to by data, since the size is identical and we know + // its not a memory mapped file + Slice result; + IOOptions opts; + IOStatus io_s = file->PrepareIOOptions(options, opts); + opts.verify_and_reconstruct_read = true; + io_s = file->Read(opts, handle.offset(), BlockSizeWithTrailer(handle), + &result, const_cast(data), nullptr); + if (io_s.ok()) { + assert(result.data() == data); + assert(result.size() == BlockSizeWithTrailer(handle)); + s = VerifyBlockChecksum(footer, data, handle.size(), + rep_->file->file_name(), handle.offset()); + } else { + s = io_s; + } + } } } else if (!use_shared_buffer) { // Free the allocated scratch buffer. diff --git a/table/block_fetcher.cc b/table/block_fetcher.cc index 273c9a4372..848b83a101 100644 --- a/table/block_fetcher.cc +++ b/table/block_fetcher.cc @@ -81,11 +81,12 @@ inline bool BlockFetcher::TryGetFromPrefetchBuffer() { &io_s, for_compaction_); if (read_from_prefetch_buffer) { ProcessTrailerIfPresent(); - if (!io_status_.ok()) { + if (io_status_.ok()) { + got_from_prefetch_buffer_ = true; + used_buf_ = const_cast(slice_.data()); + } else if (!(io_status_.IsCorruption() && retry_corrupt_read_)) { return true; } - got_from_prefetch_buffer_ = true; - used_buf_ = const_cast(slice_.data()); } } if (!io_s.ok()) { @@ -237,6 +238,113 @@ inline void BlockFetcher::GetBlockContents() { #endif } +// Read a block from the file and verify its checksum. Upon return, io_status_ +// will be updated with the status of the read, and slice_ will be updated +// with a pointer to the data. +void BlockFetcher::ReadBlock(bool retry) { + FSReadRequest read_req; + IOOptions opts; + io_status_ = file_->PrepareIOOptions(read_options_, opts); + opts.verify_and_reconstruct_read = retry; + read_req.status.PermitUncheckedError(); + // Actual file read + if (io_status_.ok()) { + if (file_->use_direct_io()) { + PERF_TIMER_GUARD(block_read_time); + PERF_CPU_TIMER_GUARD( + block_read_cpu_time, + ioptions_.env ? ioptions_.env->GetSystemClock().get() : nullptr); + io_status_ = file_->Read(opts, handle_.offset(), block_size_with_trailer_, + &slice_, /*scratch=*/nullptr, &direct_io_buf_); + PERF_COUNTER_ADD(block_read_count, 1); + used_buf_ = const_cast(slice_.data()); + } else if (use_fs_scratch_) { + PERF_TIMER_GUARD(block_read_time); + PERF_CPU_TIMER_GUARD( + block_read_cpu_time, + ioptions_.env ? ioptions_.env->GetSystemClock().get() : nullptr); + read_req.offset = handle_.offset(); + read_req.len = block_size_with_trailer_; + read_req.scratch = nullptr; + io_status_ = file_->MultiRead(opts, &read_req, /*num_reqs=*/1, + /*AlignedBuf* =*/nullptr); + PERF_COUNTER_ADD(block_read_count, 1); + + slice_ = Slice(read_req.result.data(), read_req.result.size()); + used_buf_ = const_cast(slice_.data()); + } else { + // It allocates/assign used_buf_ + PrepareBufferForBlockFromFile(); + + PERF_TIMER_GUARD(block_read_time); + PERF_CPU_TIMER_GUARD( + block_read_cpu_time, + ioptions_.env ? ioptions_.env->GetSystemClock().get() : nullptr); + + io_status_ = file_->Read( + opts, handle_.offset(), /*size*/ block_size_with_trailer_, + /*result*/ &slice_, /*scratch*/ used_buf_, /*aligned_buf=*/nullptr); + PERF_COUNTER_ADD(block_read_count, 1); +#ifndef NDEBUG + if (slice_.data() == &stack_buf_[0]) { + num_stack_buf_memcpy_++; + } else if (slice_.data() == heap_buf_.get()) { + num_heap_buf_memcpy_++; + } else if (slice_.data() == compressed_buf_.get()) { + num_compressed_buf_memcpy_++; + } +#endif + } + } + + // TODO: introduce dedicated perf counter for range tombstones + switch (block_type_) { + case BlockType::kFilter: + case BlockType::kFilterPartitionIndex: + PERF_COUNTER_ADD(filter_block_read_count, 1); + break; + + case BlockType::kCompressionDictionary: + PERF_COUNTER_ADD(compression_dict_block_read_count, 1); + break; + + case BlockType::kIndex: + PERF_COUNTER_ADD(index_block_read_count, 1); + break; + + // Nothing to do here as we don't have counters for the other types. + default: + break; + } + + PERF_COUNTER_ADD(block_read_byte, block_size_with_trailer_); + if (io_status_.ok()) { + if (use_fs_scratch_ && !read_req.status.ok()) { + io_status_ = read_req.status; + } else if (slice_.size() != block_size_with_trailer_) { + io_status_ = IOStatus::Corruption( + "truncated block read from " + file_->file_name() + " offset " + + std::to_string(handle_.offset()) + ", expected " + + std::to_string(block_size_with_trailer_) + " bytes, got " + + std::to_string(slice_.size())); + } + } + + if (io_status_.ok()) { + ProcessTrailerIfPresent(); + } + + if (io_status_.ok()) { + InsertCompressedBlockToPersistentCacheIfNeeded(); + } else { + ReleaseFileSystemProvidedBuffer(&read_req); + direct_io_buf_.reset(); + compressed_buf_.reset(); + heap_buf_.reset(); + used_buf_ = nullptr; + } +} + IOStatus BlockFetcher::ReadBlockContents() { FSReadRequest read_req; read_req.status.PermitUncheckedError(); @@ -252,104 +360,13 @@ IOStatus BlockFetcher::ReadBlockContents() { return io_status_; } } else if (!TryGetSerializedBlockFromPersistentCache()) { - IOOptions opts; - io_status_ = file_->PrepareIOOptions(read_options_, opts); - // Actual file read - if (io_status_.ok()) { - if (file_->use_direct_io()) { - PERF_TIMER_GUARD(block_read_time); - PERF_CPU_TIMER_GUARD( - block_read_cpu_time, - ioptions_.env ? ioptions_.env->GetSystemClock().get() : nullptr); - io_status_ = - file_->Read(opts, handle_.offset(), block_size_with_trailer_, - &slice_, /*scratch=*/nullptr, &direct_io_buf_); - PERF_COUNTER_ADD(block_read_count, 1); - used_buf_ = const_cast(slice_.data()); - } else if (use_fs_scratch_) { - PERF_TIMER_GUARD(block_read_time); - PERF_CPU_TIMER_GUARD( - block_read_cpu_time, - ioptions_.env ? ioptions_.env->GetSystemClock().get() : nullptr); - read_req.offset = handle_.offset(); - read_req.len = block_size_with_trailer_; - read_req.scratch = nullptr; - io_status_ = file_->MultiRead(opts, &read_req, /*num_reqs=*/1, - /*AlignedBuf* =*/nullptr); - PERF_COUNTER_ADD(block_read_count, 1); - - slice_ = Slice(read_req.result.data(), read_req.result.size()); - used_buf_ = const_cast(slice_.data()); - } else { - // It allocates/assign used_buf_ - PrepareBufferForBlockFromFile(); - - PERF_TIMER_GUARD(block_read_time); - PERF_CPU_TIMER_GUARD( - block_read_cpu_time, - ioptions_.env ? ioptions_.env->GetSystemClock().get() : nullptr); - - io_status_ = file_->Read( - opts, handle_.offset(), /*size*/ block_size_with_trailer_, - /*result*/ &slice_, /*scratch*/ used_buf_, /*aligned_buf=*/nullptr); - PERF_COUNTER_ADD(block_read_count, 1); -#ifndef NDEBUG - if (slice_.data() == &stack_buf_[0]) { - num_stack_buf_memcpy_++; - } else if (slice_.data() == heap_buf_.get()) { - num_heap_buf_memcpy_++; - } else if (slice_.data() == compressed_buf_.get()) { - num_compressed_buf_memcpy_++; - } -#endif - } + ReadBlock(/*retry =*/false); + // If the file system supports retry after corruption, then try to + // re-read the block and see if it succeeds. + if (io_status_.IsCorruption() && retry_corrupt_read_) { + ReadBlock(/*retry=*/true); } - - // TODO: introduce dedicated perf counter for range tombstones - switch (block_type_) { - case BlockType::kFilter: - case BlockType::kFilterPartitionIndex: - PERF_COUNTER_ADD(filter_block_read_count, 1); - break; - - case BlockType::kCompressionDictionary: - PERF_COUNTER_ADD(compression_dict_block_read_count, 1); - break; - - case BlockType::kIndex: - PERF_COUNTER_ADD(index_block_read_count, 1); - break; - - // Nothing to do here as we don't have counters for the other types. - default: - break; - } - - PERF_COUNTER_ADD(block_read_byte, block_size_with_trailer_); if (!io_status_.ok()) { - ReleaseFileSystemProvidedBuffer(&read_req); - return io_status_; - } - - if (use_fs_scratch_ && !read_req.status.ok()) { - ReleaseFileSystemProvidedBuffer(&read_req); - return read_req.status; - } - - if (slice_.size() != block_size_with_trailer_) { - ReleaseFileSystemProvidedBuffer(&read_req); - return IOStatus::Corruption( - "truncated block read from " + file_->file_name() + " offset " + - std::to_string(handle_.offset()) + ", expected " + - std::to_string(block_size_with_trailer_) + " bytes, got " + - std::to_string(slice_.size())); - } - - ProcessTrailerIfPresent(); - if (io_status_.ok()) { - InsertCompressedBlockToPersistentCacheIfNeeded(); - } else { - ReleaseFileSystemProvidedBuffer(&read_req); return io_status_; } } @@ -402,6 +419,10 @@ IOStatus BlockFetcher::ReadAsyncBlockContents() { // Data Block is already in prefetch. got_from_prefetch_buffer_ = true; ProcessTrailerIfPresent(); + if (io_status_.IsCorruption() && retry_corrupt_read_) { + got_from_prefetch_buffer_ = false; + ReadBlock(/*retry = */ true); + } if (!io_status_.ok()) { return io_status_; } diff --git a/table/block_fetcher.h b/table/block_fetcher.h index 8fc615ed0e..29efb995a7 100644 --- a/table/block_fetcher.h +++ b/table/block_fetcher.h @@ -72,6 +72,10 @@ class BlockFetcher { if (CheckFSFeatureSupport(ioptions_.fs.get(), FSSupportedOps::kFSBuffer)) { use_fs_scratch_ = true; } + if (CheckFSFeatureSupport(ioptions_.fs.get(), + FSSupportedOps::kVerifyAndReconstructRead)) { + retry_corrupt_read_ = true; + } } IOStatus ReadBlockContents(); @@ -132,6 +136,7 @@ class BlockFetcher { CompressionType compression_type_; bool for_compaction_ = false; bool use_fs_scratch_ = false; + bool retry_corrupt_read_ = false; // return true if found bool TryGetUncompressBlockFromPersistentCache(); @@ -147,6 +152,7 @@ class BlockFetcher { void InsertCompressedBlockToPersistentCacheIfNeeded(); void InsertUncompressedBlockToPersistentCacheIfNeeded(); void ProcessTrailerIfPresent(); + void ReadBlock(bool retry); void ReleaseFileSystemProvidedBuffer(FSReadRequest* read_req) { if (use_fs_scratch_) { diff --git a/unreleased_history/new_features/retry_on_corruption.md b/unreleased_history/new_features/retry_on_corruption.md new file mode 100644 index 0000000000..807c1b58bb --- /dev/null +++ b/unreleased_history/new_features/retry_on_corruption.md @@ -0,0 +1 @@ +On file systems that support storage level data checksum and reconstruction, retry SST block reads for point lookups, scans, and flush and compaction if there's a checksum mismatch on the initial read.