Retry block reads on checksum mismatch (#12427)

Summary:
On file systems that support storage level data checksum and reconstruction, retry SST block reads for point lookups, scans, and flush and compaction if there's a checksum mismatch on the initial read. A file system can indicate its support by setting the `FSSupportedOps::kVerifyAndReconstructRead` bit in `SupportedOps`.

Tests:
Add new unit tests

Pull Request resolved: https://github.com/facebook/rocksdb/pull/12427

Reviewed By: ajkr

Differential Revision: D55025941

Pulled By: anand1976

fbshipit-source-id: dbd990cb75e03f756c8a66d42956f645c0b6d55e
This commit is contained in:
anand76 2024-03-18 16:16:05 -07:00 committed by Facebook GitHub Bot
parent b4e9f5a400
commit 4868c10b44
11 changed files with 398 additions and 102 deletions

View File

@ -13,6 +13,100 @@
#include "util/random.h"
namespace ROCKSDB_NAMESPACE {
namespace {
// A wrapper that allows injection of errors.
class CorruptionFS : public FileSystemWrapper {
public:
bool writable_file_error_;
int num_writable_file_errors_;
explicit CorruptionFS(const std::shared_ptr<FileSystem>& _target)
: FileSystemWrapper(_target),
writable_file_error_(false),
num_writable_file_errors_(0),
corruption_trigger_(INT_MAX),
read_count_(0),
rnd_(300) {}
~CorruptionFS() override {
// Assert that the corruption was reset, which means it got triggered
assert(corruption_trigger_ == INT_MAX);
}
const char* Name() const override { return "ErrorEnv"; }
IOStatus NewWritableFile(const std::string& fname, const FileOptions& opts,
std::unique_ptr<FSWritableFile>* result,
IODebugContext* dbg) override {
result->reset();
if (writable_file_error_) {
++num_writable_file_errors_;
return IOStatus::IOError(fname, "fake error");
}
return target()->NewWritableFile(fname, opts, result, dbg);
}
void SetCorruptionTrigger(const int trigger) {
corruption_trigger_ = trigger;
read_count_ = 0;
}
IOStatus NewRandomAccessFile(const std::string& fname,
const FileOptions& opts,
std::unique_ptr<FSRandomAccessFile>* result,
IODebugContext* dbg) override {
class CorruptionRandomAccessFile : public FSRandomAccessFileOwnerWrapper {
public:
CorruptionRandomAccessFile(CorruptionFS& fs,
std::unique_ptr<FSRandomAccessFile>& file)
: FSRandomAccessFileOwnerWrapper(std::move(file)), fs_(fs) {}
IOStatus Read(uint64_t offset, size_t len, const IOOptions& opts,
Slice* result, char* scratch,
IODebugContext* dbg) const override {
IOStatus s = target()->Read(offset, len, opts, result, scratch, dbg);
if (opts.verify_and_reconstruct_read) {
return s;
}
if (s.ok() && ++fs_.read_count_ >= fs_.corruption_trigger_) {
fs_.read_count_ = 0;
fs_.corruption_trigger_ = INT_MAX;
char* data = const_cast<char*>(result->data());
std::memcpy(
data,
fs_.rnd_.RandomString(static_cast<int>(result->size())).c_str(),
result->size());
}
return s;
}
IOStatus MultiRead(FSReadRequest* reqs, size_t num_reqs,
const IOOptions& options,
IODebugContext* dbg) override {
return FSRandomAccessFile::MultiRead(reqs, num_reqs, options, dbg);
}
private:
CorruptionFS& fs_;
};
std::unique_ptr<FSRandomAccessFile> file;
IOStatus s = target()->NewRandomAccessFile(fname, opts, &file, dbg);
EXPECT_OK(s);
result->reset(new CorruptionRandomAccessFile(*this, file));
return s;
}
void SupportedOps(int64_t& supported_ops) override {
supported_ops = 1 << FSSupportedOps::kVerifyAndReconstructRead |
1 << FSSupportedOps::kAsyncIO;
}
private:
int corruption_trigger_;
int read_count_;
Random rnd_;
};
} // anonymous namespace
class DBIOFailureTest : public DBTestBase {
public:
@ -579,6 +673,130 @@ TEST_F(DBIOFailureTest, CompactionSstSyncError) {
ASSERT_EQ("bar3", Get(1, "foo"));
}
#endif // !(defined NDEBUG) || !defined(OS_WIN)
class DBIOCorruptionTest : public DBIOFailureTest,
public testing::WithParamInterface<bool> {
public:
DBIOCorruptionTest() : DBIOFailureTest() {
BlockBasedTableOptions bbto;
Options options = CurrentOptions();
base_env_ = env_;
EXPECT_NE(base_env_, nullptr);
fs_.reset(new CorruptionFS(base_env_->GetFileSystem()));
env_guard_ = NewCompositeEnv(fs_);
options.env = env_guard_.get();
bbto.num_file_reads_for_auto_readahead = 0;
options.table_factory.reset(NewBlockBasedTableFactory(bbto));
options.disable_auto_compactions = true;
Reopen(options);
}
~DBIOCorruptionTest() {
Close();
db_ = nullptr;
}
protected:
std::unique_ptr<Env> env_guard_;
std::shared_ptr<CorruptionFS> fs_;
Env* base_env_;
};
TEST_P(DBIOCorruptionTest, GetReadCorruptionRetry) {
CorruptionFS* fs =
static_cast<CorruptionFS*>(env_guard_->GetFileSystem().get());
ASSERT_OK(Put("key1", "val1"));
ASSERT_OK(Flush());
fs->SetCorruptionTrigger(1);
std::string val;
ReadOptions ro;
ro.async_io = GetParam();
ASSERT_OK(dbfull()->Get(ReadOptions(), "key1", &val));
ASSERT_EQ(val, "val1");
}
TEST_P(DBIOCorruptionTest, IterReadCorruptionRetry) {
CorruptionFS* fs =
static_cast<CorruptionFS*>(env_guard_->GetFileSystem().get());
ASSERT_OK(Put("key1", "val1"));
ASSERT_OK(Flush());
fs->SetCorruptionTrigger(1);
ReadOptions ro;
ro.readahead_size = 65536;
ro.async_io = GetParam();
Iterator* iter = dbfull()->NewIterator(ro);
iter->SeekToFirst();
while (iter->status().ok() && iter->Valid()) {
iter->Next();
}
ASSERT_OK(iter->status());
delete iter;
}
TEST_P(DBIOCorruptionTest, MultiGetReadCorruptionRetry) {
CorruptionFS* fs =
static_cast<CorruptionFS*>(env_guard_->GetFileSystem().get());
ASSERT_OK(Put("key1", "val1"));
ASSERT_OK(Put("key2", "val2"));
ASSERT_OK(Flush());
fs->SetCorruptionTrigger(1);
std::vector<std::string> keystr{"key1", "key2"};
std::vector<Slice> keys{Slice(keystr[0]), Slice(keystr[1])};
std::vector<PinnableSlice> values(keys.size());
std::vector<Status> statuses(keys.size());
ReadOptions ro;
ro.async_io = GetParam();
dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
keys.data(), values.data(), statuses.data());
ASSERT_EQ(values[0].ToString(), "val1");
ASSERT_EQ(values[1].ToString(), "val2");
}
TEST_P(DBIOCorruptionTest, CompactionReadCorruptionRetry) {
CorruptionFS* fs =
static_cast<CorruptionFS*>(env_guard_->GetFileSystem().get());
ASSERT_OK(Put("key1", "val1"));
ASSERT_OK(Put("key3", "val3"));
ASSERT_OK(Flush());
ASSERT_OK(Put("key2", "val2"));
ASSERT_OK(Flush());
fs->SetCorruptionTrigger(1);
ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr));
std::string val;
ReadOptions ro;
ro.async_io = GetParam();
ASSERT_OK(dbfull()->Get(ro, "key1", &val));
ASSERT_EQ(val, "val1");
}
TEST_P(DBIOCorruptionTest, FlushReadCorruptionRetry) {
CorruptionFS* fs =
static_cast<CorruptionFS*>(env_guard_->GetFileSystem().get());
ASSERT_OK(Put("key1", "val1"));
fs->SetCorruptionTrigger(1);
ASSERT_OK(Flush());
std::string val;
ReadOptions ro;
ro.async_io = GetParam();
ASSERT_OK(dbfull()->Get(ro, "key1", &val));
ASSERT_EQ(val, "val1");
}
INSTANTIATE_TEST_CASE_P(DBIOCorruptionTest, DBIOCorruptionTest,
testing::Bool());
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {

View File

@ -150,7 +150,8 @@ struct IOOptions {
rate_limiter_priority(Env::IO_TOTAL),
type(IOType::kUnknown),
force_dir_fsync(force_dir_fsync_),
do_not_recurse(false) {}
do_not_recurse(false),
verify_and_reconstruct_read(false) {}
};
struct DirFsyncOptions {

View File

@ -526,6 +526,9 @@ enum Tickers : uint32_t {
// Number of FS reads avoided due to scan prefetching
PREFETCH_HITS,
// Footer corruption detected when opening an SST file for reading
SST_FOOTER_CORRUPTION_COUNT,
TICKER_ENUM_MAX
};

View File

@ -5267,6 +5267,8 @@ class TickerTypeJni {
return -0x52;
case ROCKSDB_NAMESPACE::Tickers::PREFETCH_HITS:
return -0x53;
case ROCKSDB_NAMESPACE::Tickers::SST_FOOTER_CORRUPTION_COUNT:
return -0x55;
case ROCKSDB_NAMESPACE::Tickers::TICKER_ENUM_MAX:
// -0x54 is the max value at this time. Since these values are exposed
// directly to Java clients, we'll keep the value the same till the next
@ -5722,6 +5724,8 @@ class TickerTypeJni {
return ROCKSDB_NAMESPACE::Tickers::PREFETCH_BYTES_USEFUL;
case -0x53:
return ROCKSDB_NAMESPACE::Tickers::PREFETCH_HITS;
case -0x55:
return ROCKSDB_NAMESPACE::Tickers::SST_FOOTER_CORRUPTION_COUNT;
case -0x54:
// -0x54 is the max value at this time. Since these values are exposed
// directly to Java clients, we'll keep the value the same till the next

View File

@ -876,6 +876,8 @@ public enum TickerType {
PREFETCH_HITS((byte) -0x53),
SST_FOOTER_CORRUPTION_COUNT((byte) -0x55),
TICKER_ENUM_MAX((byte) -0x54);
private final byte value;

View File

@ -265,6 +265,7 @@ const std::vector<std::pair<Tickers, std::string>> TickersNameMap = {
{PREFETCH_BYTES, "rocksdb.prefetch.bytes"},
{PREFETCH_BYTES_USEFUL, "rocksdb.prefetch.bytes.useful"},
{PREFETCH_HITS, "rocksdb.prefetch.hits"},
{SST_FOOTER_CORRUPTION_COUNT, "rocksdb.footer.corruption.count"},
};
const std::vector<std::pair<Histograms, std::string>> HistogramsNameMap = {

View File

@ -631,6 +631,19 @@ Status BlockBasedTable::Open(
prefetch_buffer.get(), file_size, &footer,
kBlockBasedTableMagicNumber);
}
// If the footer is corrupted and the FS supports checksum verification and
// correction, try reading the footer again
if (s.IsCorruption()) {
RecordTick(ioptions.statistics.get(), SST_FOOTER_CORRUPTION_COUNT);
if (CheckFSFeatureSupport(ioptions.fs.get(),
FSSupportedOps::kVerifyAndReconstructRead)) {
IOOptions retry_opts = opts;
retry_opts.verify_and_reconstruct_read = true;
s = ReadFooterFromFile(retry_opts, file.get(), *ioptions.fs,
prefetch_buffer.get(), file_size, &footer,
kBlockBasedTableMagicNumber);
}
}
if (!s.ok()) {
return s;
}

View File

@ -214,16 +214,42 @@ DEFINE_SYNC_AND_ASYNC(void, BlockBasedTable::RetrieveMultipleBlocks)
if (options.verify_checksums) {
PERF_TIMER_GUARD(block_checksum_time);
const char* data = req.result.data();
const char* data = serialized_block.data.data();
// Since the scratch might be shared, the offset of the data block in
// the buffer might not be 0. req.result.data() only point to the
// begin address of each read request, we need to add the offset
// in each read request. Checksum is stored in the block trailer,
// beyond the payload size.
s = VerifyBlockChecksum(footer, data + req_offset, handle.size(),
s = VerifyBlockChecksum(footer, data, handle.size(),
rep_->file->file_name(), handle.offset());
RecordTick(ioptions.stats, BLOCK_CHECKSUM_COMPUTE_COUNT);
TEST_SYNC_POINT_CALLBACK("RetrieveMultipleBlocks:VerifyChecksum", &s);
if (!s.ok() &&
CheckFSFeatureSupport(ioptions.fs.get(),
FSSupportedOps::kVerifyAndReconstructRead)) {
assert(s.IsCorruption());
assert(!ioptions.allow_mmap_reads);
RecordTick(ioptions.stats, BLOCK_CHECKSUM_MISMATCH_COUNT);
// Repeat the read for this particular block using the regular
// synchronous Read API. We can use the same chunk of memory
// pointed to by data, since the size is identical and we know
// its not a memory mapped file
Slice result;
IOOptions opts;
IOStatus io_s = file->PrepareIOOptions(options, opts);
opts.verify_and_reconstruct_read = true;
io_s = file->Read(opts, handle.offset(), BlockSizeWithTrailer(handle),
&result, const_cast<char*>(data), nullptr);
if (io_s.ok()) {
assert(result.data() == data);
assert(result.size() == BlockSizeWithTrailer(handle));
s = VerifyBlockChecksum(footer, data, handle.size(),
rep_->file->file_name(), handle.offset());
} else {
s = io_s;
}
}
}
} else if (!use_shared_buffer) {
// Free the allocated scratch buffer.

View File

@ -81,11 +81,12 @@ inline bool BlockFetcher::TryGetFromPrefetchBuffer() {
&io_s, for_compaction_);
if (read_from_prefetch_buffer) {
ProcessTrailerIfPresent();
if (!io_status_.ok()) {
if (io_status_.ok()) {
got_from_prefetch_buffer_ = true;
used_buf_ = const_cast<char*>(slice_.data());
} else if (!(io_status_.IsCorruption() && retry_corrupt_read_)) {
return true;
}
got_from_prefetch_buffer_ = true;
used_buf_ = const_cast<char*>(slice_.data());
}
}
if (!io_s.ok()) {
@ -237,6 +238,113 @@ inline void BlockFetcher::GetBlockContents() {
#endif
}
// Read a block from the file and verify its checksum. Upon return, io_status_
// will be updated with the status of the read, and slice_ will be updated
// with a pointer to the data.
void BlockFetcher::ReadBlock(bool retry) {
FSReadRequest read_req;
IOOptions opts;
io_status_ = file_->PrepareIOOptions(read_options_, opts);
opts.verify_and_reconstruct_read = retry;
read_req.status.PermitUncheckedError();
// Actual file read
if (io_status_.ok()) {
if (file_->use_direct_io()) {
PERF_TIMER_GUARD(block_read_time);
PERF_CPU_TIMER_GUARD(
block_read_cpu_time,
ioptions_.env ? ioptions_.env->GetSystemClock().get() : nullptr);
io_status_ = file_->Read(opts, handle_.offset(), block_size_with_trailer_,
&slice_, /*scratch=*/nullptr, &direct_io_buf_);
PERF_COUNTER_ADD(block_read_count, 1);
used_buf_ = const_cast<char*>(slice_.data());
} else if (use_fs_scratch_) {
PERF_TIMER_GUARD(block_read_time);
PERF_CPU_TIMER_GUARD(
block_read_cpu_time,
ioptions_.env ? ioptions_.env->GetSystemClock().get() : nullptr);
read_req.offset = handle_.offset();
read_req.len = block_size_with_trailer_;
read_req.scratch = nullptr;
io_status_ = file_->MultiRead(opts, &read_req, /*num_reqs=*/1,
/*AlignedBuf* =*/nullptr);
PERF_COUNTER_ADD(block_read_count, 1);
slice_ = Slice(read_req.result.data(), read_req.result.size());
used_buf_ = const_cast<char*>(slice_.data());
} else {
// It allocates/assign used_buf_
PrepareBufferForBlockFromFile();
PERF_TIMER_GUARD(block_read_time);
PERF_CPU_TIMER_GUARD(
block_read_cpu_time,
ioptions_.env ? ioptions_.env->GetSystemClock().get() : nullptr);
io_status_ = file_->Read(
opts, handle_.offset(), /*size*/ block_size_with_trailer_,
/*result*/ &slice_, /*scratch*/ used_buf_, /*aligned_buf=*/nullptr);
PERF_COUNTER_ADD(block_read_count, 1);
#ifndef NDEBUG
if (slice_.data() == &stack_buf_[0]) {
num_stack_buf_memcpy_++;
} else if (slice_.data() == heap_buf_.get()) {
num_heap_buf_memcpy_++;
} else if (slice_.data() == compressed_buf_.get()) {
num_compressed_buf_memcpy_++;
}
#endif
}
}
// TODO: introduce dedicated perf counter for range tombstones
switch (block_type_) {
case BlockType::kFilter:
case BlockType::kFilterPartitionIndex:
PERF_COUNTER_ADD(filter_block_read_count, 1);
break;
case BlockType::kCompressionDictionary:
PERF_COUNTER_ADD(compression_dict_block_read_count, 1);
break;
case BlockType::kIndex:
PERF_COUNTER_ADD(index_block_read_count, 1);
break;
// Nothing to do here as we don't have counters for the other types.
default:
break;
}
PERF_COUNTER_ADD(block_read_byte, block_size_with_trailer_);
if (io_status_.ok()) {
if (use_fs_scratch_ && !read_req.status.ok()) {
io_status_ = read_req.status;
} else if (slice_.size() != block_size_with_trailer_) {
io_status_ = IOStatus::Corruption(
"truncated block read from " + file_->file_name() + " offset " +
std::to_string(handle_.offset()) + ", expected " +
std::to_string(block_size_with_trailer_) + " bytes, got " +
std::to_string(slice_.size()));
}
}
if (io_status_.ok()) {
ProcessTrailerIfPresent();
}
if (io_status_.ok()) {
InsertCompressedBlockToPersistentCacheIfNeeded();
} else {
ReleaseFileSystemProvidedBuffer(&read_req);
direct_io_buf_.reset();
compressed_buf_.reset();
heap_buf_.reset();
used_buf_ = nullptr;
}
}
IOStatus BlockFetcher::ReadBlockContents() {
FSReadRequest read_req;
read_req.status.PermitUncheckedError();
@ -252,104 +360,13 @@ IOStatus BlockFetcher::ReadBlockContents() {
return io_status_;
}
} else if (!TryGetSerializedBlockFromPersistentCache()) {
IOOptions opts;
io_status_ = file_->PrepareIOOptions(read_options_, opts);
// Actual file read
if (io_status_.ok()) {
if (file_->use_direct_io()) {
PERF_TIMER_GUARD(block_read_time);
PERF_CPU_TIMER_GUARD(
block_read_cpu_time,
ioptions_.env ? ioptions_.env->GetSystemClock().get() : nullptr);
io_status_ =
file_->Read(opts, handle_.offset(), block_size_with_trailer_,
&slice_, /*scratch=*/nullptr, &direct_io_buf_);
PERF_COUNTER_ADD(block_read_count, 1);
used_buf_ = const_cast<char*>(slice_.data());
} else if (use_fs_scratch_) {
PERF_TIMER_GUARD(block_read_time);
PERF_CPU_TIMER_GUARD(
block_read_cpu_time,
ioptions_.env ? ioptions_.env->GetSystemClock().get() : nullptr);
read_req.offset = handle_.offset();
read_req.len = block_size_with_trailer_;
read_req.scratch = nullptr;
io_status_ = file_->MultiRead(opts, &read_req, /*num_reqs=*/1,
/*AlignedBuf* =*/nullptr);
PERF_COUNTER_ADD(block_read_count, 1);
slice_ = Slice(read_req.result.data(), read_req.result.size());
used_buf_ = const_cast<char*>(slice_.data());
} else {
// It allocates/assign used_buf_
PrepareBufferForBlockFromFile();
PERF_TIMER_GUARD(block_read_time);
PERF_CPU_TIMER_GUARD(
block_read_cpu_time,
ioptions_.env ? ioptions_.env->GetSystemClock().get() : nullptr);
io_status_ = file_->Read(
opts, handle_.offset(), /*size*/ block_size_with_trailer_,
/*result*/ &slice_, /*scratch*/ used_buf_, /*aligned_buf=*/nullptr);
PERF_COUNTER_ADD(block_read_count, 1);
#ifndef NDEBUG
if (slice_.data() == &stack_buf_[0]) {
num_stack_buf_memcpy_++;
} else if (slice_.data() == heap_buf_.get()) {
num_heap_buf_memcpy_++;
} else if (slice_.data() == compressed_buf_.get()) {
num_compressed_buf_memcpy_++;
}
#endif
}
ReadBlock(/*retry =*/false);
// If the file system supports retry after corruption, then try to
// re-read the block and see if it succeeds.
if (io_status_.IsCorruption() && retry_corrupt_read_) {
ReadBlock(/*retry=*/true);
}
// TODO: introduce dedicated perf counter for range tombstones
switch (block_type_) {
case BlockType::kFilter:
case BlockType::kFilterPartitionIndex:
PERF_COUNTER_ADD(filter_block_read_count, 1);
break;
case BlockType::kCompressionDictionary:
PERF_COUNTER_ADD(compression_dict_block_read_count, 1);
break;
case BlockType::kIndex:
PERF_COUNTER_ADD(index_block_read_count, 1);
break;
// Nothing to do here as we don't have counters for the other types.
default:
break;
}
PERF_COUNTER_ADD(block_read_byte, block_size_with_trailer_);
if (!io_status_.ok()) {
ReleaseFileSystemProvidedBuffer(&read_req);
return io_status_;
}
if (use_fs_scratch_ && !read_req.status.ok()) {
ReleaseFileSystemProvidedBuffer(&read_req);
return read_req.status;
}
if (slice_.size() != block_size_with_trailer_) {
ReleaseFileSystemProvidedBuffer(&read_req);
return IOStatus::Corruption(
"truncated block read from " + file_->file_name() + " offset " +
std::to_string(handle_.offset()) + ", expected " +
std::to_string(block_size_with_trailer_) + " bytes, got " +
std::to_string(slice_.size()));
}
ProcessTrailerIfPresent();
if (io_status_.ok()) {
InsertCompressedBlockToPersistentCacheIfNeeded();
} else {
ReleaseFileSystemProvidedBuffer(&read_req);
return io_status_;
}
}
@ -402,6 +419,10 @@ IOStatus BlockFetcher::ReadAsyncBlockContents() {
// Data Block is already in prefetch.
got_from_prefetch_buffer_ = true;
ProcessTrailerIfPresent();
if (io_status_.IsCorruption() && retry_corrupt_read_) {
got_from_prefetch_buffer_ = false;
ReadBlock(/*retry = */ true);
}
if (!io_status_.ok()) {
return io_status_;
}

View File

@ -72,6 +72,10 @@ class BlockFetcher {
if (CheckFSFeatureSupport(ioptions_.fs.get(), FSSupportedOps::kFSBuffer)) {
use_fs_scratch_ = true;
}
if (CheckFSFeatureSupport(ioptions_.fs.get(),
FSSupportedOps::kVerifyAndReconstructRead)) {
retry_corrupt_read_ = true;
}
}
IOStatus ReadBlockContents();
@ -132,6 +136,7 @@ class BlockFetcher {
CompressionType compression_type_;
bool for_compaction_ = false;
bool use_fs_scratch_ = false;
bool retry_corrupt_read_ = false;
// return true if found
bool TryGetUncompressBlockFromPersistentCache();
@ -147,6 +152,7 @@ class BlockFetcher {
void InsertCompressedBlockToPersistentCacheIfNeeded();
void InsertUncompressedBlockToPersistentCacheIfNeeded();
void ProcessTrailerIfPresent();
void ReadBlock(bool retry);
void ReleaseFileSystemProvidedBuffer(FSReadRequest* read_req) {
if (use_fs_scratch_) {

View File

@ -0,0 +1 @@
On file systems that support storage level data checksum and reconstruction, retry SST block reads for point lookups, scans, and flush and compaction if there's a checksum mismatch on the initial read.