Add some unit tests when file read returns error during compaction/scanning (#11788)

Summary:
Some repro unit tests for the bug fixed in https://github.com/facebook/rocksdb/pull/11782.

Ran on main without https://github.com/facebook/rocksdb/pull/11782:
```
./db_compaction_test --gtest_filter='*ErrorWhenReadFileHead'
Note: Google Test filter = *ErrorWhenReadFileHead
[==========] Running 1 test from 1 test case.
[----------] Global test environment set-up.
[----------] 1 test from DBCompactionTest
[ RUN      ] DBCompactionTest.ErrorWhenReadFileHead
db/db_compaction_test.cc:10105: Failure
Value of: s.IsIOError()
  Actual: false
Expected: true
[  FAILED  ] DBCompactionTest.ErrorWhenReadFileHead (3960 ms)

./db_iterator_test --gtest_filter="*ErrorWhenReadFile*"
Note: Google Test filter = *ErrorWhenReadFile*
[==========] Running 1 test from 1 test case.
[----------] Global test environment set-up.
[----------] 1 test from DBIteratorTest
[ RUN      ] DBIteratorTest.ErrorWhenReadFile
db/db_iterator_test.cc:3399: Failure
Value of: (iter->status()).ok()
  Actual: true
Expected: false
[  FAILED  ] DBIteratorTest.ErrorWhenReadFile (280 ms)
[----------] 1 test from DBIteratorTest (280 ms total)
```

Pull Request resolved: https://github.com/facebook/rocksdb/pull/11788

Reviewed By: ajkr

Differential Revision: D48940284

Pulled By: cbi42

fbshipit-source-id: 06f3c5963f576db3f85d305ffb2745ee13d209bb
This commit is contained in:
Changyu Bi 2023-09-06 10:23:41 -07:00 committed by Facebook GitHub Bot
parent 3f54b9696c
commit 458acf8169
4 changed files with 257 additions and 0 deletions

View file

@ -10037,6 +10037,83 @@ TEST_F(DBCompactionTest, VerifyRecordCount) {
"processed.";
ASSERT_TRUE(std::strstr(s.getState(), expect));
}
TEST_F(DBCompactionTest, ErrorWhenReadFileHead) {
// This is to test a bug that is fixed in
// https://github.com/facebook/rocksdb/pull/11782.
//
// Ingest error when reading from a file with offset = 0,
// See if compaction handles it correctly.
Options opts = CurrentOptions();
opts.num_levels = 7;
opts.compression = kNoCompression;
DestroyAndReopen(opts);
// Set up LSM
// L5: F1 [key0, key99], F2 [key100, key199]
// L6: F3 [key50, key149]
Random rnd(301);
const int kValLen = 100;
for (int error_file = 1; error_file <= 3; ++error_file) {
for (int i = 50; i < 150; ++i) {
ASSERT_OK(Put(Key(i), rnd.RandomString(kValLen)));
}
ASSERT_OK(Flush());
MoveFilesToLevel(6);
std::vector<std::string> values;
for (int i = 0; i < 100; ++i) {
values.emplace_back(rnd.RandomString(kValLen));
ASSERT_OK(Put(Key(i), values.back()));
}
ASSERT_OK(Flush());
MoveFilesToLevel(5);
for (int i = 100; i < 200; ++i) {
values.emplace_back(rnd.RandomString(kValLen));
ASSERT_OK(Put(Key(i), values.back()));
}
ASSERT_OK(Flush());
MoveFilesToLevel(5);
ASSERT_EQ(2, NumTableFilesAtLevel(5));
ASSERT_EQ(1, NumTableFilesAtLevel(6));
std::atomic_int count = 0;
SyncPoint::GetInstance()->SetCallBack(
"RandomAccessFileReader::Read::BeforeReturn",
[&count, &error_file](void* pair_ptr) {
auto p =
reinterpret_cast<std::pair<std::string*, IOStatus*>*>(pair_ptr);
int cur = ++count;
if (cur == error_file) {
IOStatus* io_s = p->second;
*io_s = IOStatus::IOError();
io_s->SetRetryable(true);
}
});
SyncPoint::GetInstance()->EnableProcessing();
Status s = db_->CompactRange(CompactRangeOptions(), nullptr, nullptr);
// Failed compaction should not lose data.
PinnableSlice slice;
for (int i = 0; i < 200; ++i) {
ASSERT_OK(Get(Key(i), &slice));
ASSERT_EQ(slice, values[i]);
}
ASSERT_NOK(s);
ASSERT_TRUE(s.IsIOError());
s = db_->CompactRange(CompactRangeOptions(), nullptr, nullptr);
ASSERT_OK(s);
for (int i = 0; i < 200; ++i) {
ASSERT_OK(Get(Key(i), &slice));
ASSERT_EQ(slice, values[i]);
}
SyncPoint::GetInstance()->DisableProcessing();
DestroyAndReopen(opts);
}
}
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {

View file

@ -3295,6 +3295,176 @@ TEST_F(DBIteratorTest, IteratorRefreshReturnSV) {
Close();
}
TEST_F(DBIteratorTest, ErrorWhenReadFile) {
// This is to test a bug that is fixed in
// https://github.com/facebook/rocksdb/pull/11782.
//
// Ingest error when reading from a file, and
// see if Iterator handles it correctly.
Options opts = CurrentOptions();
opts.num_levels = 7;
opts.compression = kNoCompression;
BlockBasedTableOptions bbto;
// Always do I/O
bbto.no_block_cache = true;
opts.table_factory.reset(NewBlockBasedTableFactory(bbto));
DestroyAndReopen(opts);
// Set up LSM
// L5: F1 [key0, key99], F2 [key100, key199]
// L6: F3 [key50, key149]
Random rnd(301);
const int kValLen = 100;
for (int i = 50; i < 150; ++i) {
ASSERT_OK(Put(Key(i), rnd.RandomString(kValLen)));
}
ASSERT_OK(Flush());
MoveFilesToLevel(6);
std::vector<std::string> values;
for (int i = 0; i < 100; ++i) {
values.emplace_back(rnd.RandomString(kValLen));
ASSERT_OK(Put(Key(i), values.back()));
}
ASSERT_OK(Flush());
MoveFilesToLevel(5);
for (int i = 100; i < 200; ++i) {
values.emplace_back(rnd.RandomString(kValLen));
ASSERT_OK(Put(Key(i), values.back()));
}
ASSERT_OK(Flush());
MoveFilesToLevel(5);
ASSERT_EQ(2, NumTableFilesAtLevel(5));
ASSERT_EQ(1, NumTableFilesAtLevel(6));
std::vector<LiveFileMetaData> files;
db_->GetLiveFilesMetaData(&files);
// Get file names for F1, F2 and F3.
// These are file names, not full paths.
std::string f1, f2, f3;
for (auto& file_meta : files) {
if (file_meta.level == 6) {
f3 = file_meta.name;
} else {
if (file_meta.smallestkey == Key(0)) {
f1 = file_meta.name;
} else {
f2 = file_meta.name;
}
}
}
ASSERT_TRUE(!f1.empty());
ASSERT_TRUE(!f2.empty());
ASSERT_TRUE(!f3.empty());
std::string error_file;
SyncPoint::GetInstance()->SetCallBack(
"RandomAccessFileReader::Read::BeforeReturn",
[&error_file](void* io_s_ptr) {
auto p =
reinterpret_cast<std::pair<std::string*, IOStatus*>*>(io_s_ptr);
if (p->first->find(error_file) != std::string::npos) {
*p->second = IOStatus::IOError();
p->second->SetRetryable(true);
}
});
SyncPoint::GetInstance()->EnableProcessing();
// Error reading F1
error_file = f1;
std::unique_ptr<Iterator> iter{db_->NewIterator(ReadOptions())};
iter->SeekToFirst();
ASSERT_NOK(iter->status());
ASSERT_TRUE(iter->status().IsIOError());
// This does not require reading the first block.
iter->Seek(Key(90));
ASSERT_OK(iter->status());
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->value(), values[90]);
// iter has ok status before this Seek.
iter->Seek(Key(1));
ASSERT_NOK(iter->status());
ASSERT_TRUE(iter->status().IsIOError());
// Error reading F2
error_file = f2;
iter.reset(db_->NewIterator(ReadOptions()));
iter->Seek(Key(99));
ASSERT_OK(iter->status());
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->value(), values[99]);
// Need to read from F2.
iter->Next();
ASSERT_NOK(iter->status());
ASSERT_TRUE(iter->status().IsIOError());
iter->Seek(Key(190));
ASSERT_OK(iter->status());
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->value(), values[190]);
// Seek for first key of F2.
iter->Seek(Key(100));
ASSERT_NOK(iter->status());
ASSERT_TRUE(iter->status().IsIOError());
iter->SeekToLast();
ASSERT_OK(iter->status());
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->value(), values[199]);
// SeekForPrev for first key of F2.
iter->SeekForPrev(Key(100));
ASSERT_NOK(iter->status());
ASSERT_TRUE(iter->status().IsIOError());
// Does not read first block (offset 0).
iter->SeekForPrev(Key(98));
ASSERT_OK(iter->status());
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->value(), values[98]);
// Error reading F3
error_file = f3;
iter.reset(db_->NewIterator(ReadOptions()));
iter->SeekToFirst();
ASSERT_NOK(iter->status());
ASSERT_TRUE(iter->status().IsIOError());
iter->Seek(Key(50));
ASSERT_NOK(iter->status());
ASSERT_TRUE(iter->status().IsIOError());
iter->SeekForPrev(Key(50));
ASSERT_NOK(iter->status());
ASSERT_TRUE(iter->status().IsIOError());
// Does not read file 3
iter->Seek(Key(150));
ASSERT_OK(iter->status());
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->value(), values[150]);
// Test when file read error occurs during Prev().
// This requires returning an error when reading near the end of a file
// instead of offset 0.
SyncPoint::GetInstance()->ClearAllCallBacks();
SyncPoint::GetInstance()->SetCallBack(
"RandomAccessFileReader::Read::AnyOffset", [&f1](void* pair_ptr) {
auto p =
reinterpret_cast<std::pair<std::string*, IOStatus*>*>(pair_ptr);
if (p->first->find(f1) != std::string::npos) {
*p->second = IOStatus::IOError();
p->second->SetRetryable(true);
}
});
iter->SeekForPrev(Key(101));
ASSERT_OK(iter->status());
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->value(), values[101]);
// DBIter will not stop at Key(100) since it needs
// to make sure the key it returns has the max sequence number for Key(100).
// So it will call MergingIterator::Prev() which will read F1.
iter->Prev();
ASSERT_NOK(iter->status());
ASSERT_TRUE(iter->status().IsIOError());
SyncPoint::GetInstance()->DisableProcessing();
iter->Reset();
}
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {

View file

@ -472,6 +472,8 @@ const Status& ErrorHandler::SetBGError(const Status& bg_status,
ROCKS_LOG_INFO(
db_options_.info_log,
"ErrorHandler: Compaction will schedule by itself to resume\n");
// Not used in this code path.
new_bg_io_err.PermitUncheckedError();
return bg_error_;
} else if (BackgroundErrorReason::kFlushNoWAL == reason ||
BackgroundErrorReason::kManifestWriteNoWAL == reason) {

View file

@ -271,6 +271,14 @@ IOStatus RandomAccessFileReader::Read(const IOOptions& opts, uint64_t offset,
file_read_hist_->Add(elapsed);
}
#ifndef NDEBUG
auto pair = std::make_pair(&file_name_, &io_s);
if (offset == 0) {
TEST_SYNC_POINT_CALLBACK("RandomAccessFileReader::Read::BeforeReturn",
&pair);
}
TEST_SYNC_POINT_CALLBACK("RandomAccessFileReader::Read::AnyOffset", &pair);
#endif
return io_s;
}