Retry DB::Open upon a corruption detected while reading the MANIFEST (#12518)

Summary:
This PR is a counterpart of https://github.com/facebook/rocksdb/issues/12427 . On file systems that support storage level data checksum and reconstruction, retry opening the DB if a corruption is detected when reading the MANIFEST. This could be done in `log::Reader`, but its a little complicated since the sequential file would have to be reopened in order to re-read the same data, and we may miss some subtle corruptions that don't result in checksum mismatch. The approach chosen here instead is to make the decision to retry in `DBImpl::Recover`, based on either an explicit corruption in the MANIFEST file, or missing SST files due to bad data in the MANIFEST.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/12518

Reviewed By: ajkr

Differential Revision: D55932155

Pulled By: anand1976

fbshipit-source-id: 51755a29b3eb14b9d8e98534adb2e7d54b12ced9
This commit is contained in:
anand76 2024-04-18 17:36:33 -07:00 committed by Facebook GitHub Bot
parent ef38d99edc
commit 97991960e9
13 changed files with 165 additions and 60 deletions

View File

@ -1057,7 +1057,8 @@ class DBImpl : public DB {
static Status Open(const DBOptions& db_options, const std::string& name,
const std::vector<ColumnFamilyDescriptor>& column_families,
std::vector<ColumnFamilyHandle*>* handles, DB** dbptr,
const bool seq_per_batch, const bool batch_per_txn);
const bool seq_per_batch, const bool batch_per_txn,
const bool is_retry, bool* can_retry);
static IOStatus CreateAndNewDirectory(
FileSystem* fs, const std::string& dirname,
@ -1546,9 +1547,9 @@ class DBImpl : public DB {
virtual Status Recover(
const std::vector<ColumnFamilyDescriptor>& column_families,
bool read_only = false, bool error_if_wal_file_exists = false,
bool error_if_data_exists_in_wals = false,
bool error_if_data_exists_in_wals = false, bool is_retry = false,
uint64_t* recovered_seq = nullptr,
RecoveryContext* recovery_ctx = nullptr);
RecoveryContext* recovery_ctx = nullptr, bool* can_retry = nullptr);
virtual bool OwnTablesAndLogs() const { return true; }
@ -1928,7 +1929,7 @@ class DBImpl : public DB {
// corrupted_log_found is set to true if we recover from a corrupted log file.
Status RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
SequenceNumber* next_sequence, bool read_only,
bool* corrupted_log_found,
bool is_retry, bool* corrupted_log_found,
RecoveryContext* recovery_ctx);
// The following two methods are used to flush a memtable to

View File

@ -413,7 +413,8 @@ IOStatus Directories::SetDirectories(FileSystem* fs, const std::string& dbname,
Status DBImpl::Recover(
const std::vector<ColumnFamilyDescriptor>& column_families, bool read_only,
bool error_if_wal_file_exists, bool error_if_data_exists_in_wals,
uint64_t* recovered_seq, RecoveryContext* recovery_ctx) {
bool is_retry, uint64_t* recovered_seq, RecoveryContext* recovery_ctx,
bool* can_retry) {
mutex_.AssertHeld();
const WriteOptions write_options(Env::IOActivity::kDBOpen);
@ -529,7 +530,31 @@ Status DBImpl::Recover(
Status s;
bool missing_table_file = false;
if (!immutable_db_options_.best_efforts_recovery) {
s = versions_->Recover(column_families, read_only, &db_id_);
// Status of reading the descriptor file
Status desc_status;
s = versions_->Recover(column_families, read_only, &db_id_,
/*no_error_if_files_missing=*/false, is_retry,
&desc_status);
desc_status.PermitUncheckedError();
if (can_retry) {
// If we're opening for the first time and the failure is likely due to
// a corrupt MANIFEST file (could result in either the log::Reader
// detecting a corrupt record, or SST files not found error due to
// discarding badly formed tail records)
if (!is_retry &&
(desc_status.IsCorruption() || s.IsNotFound() || s.IsCorruption()) &&
CheckFSFeatureSupport(fs_.get(),
FSSupportedOps::kVerifyAndReconstructRead)) {
*can_retry = true;
ROCKS_LOG_ERROR(
immutable_db_options_.info_log,
"Possible corruption detected while replaying MANIFEST %s, %s. "
"Will be retried.",
desc_status.ToString().c_str(), s.ToString().c_str());
} else {
*can_retry = false;
}
}
} else {
assert(!files_in_dbname.empty());
s = versions_->TryRecover(column_families, read_only, files_in_dbname,
@ -767,8 +792,8 @@ Status DBImpl::Recover(
std::sort(wals.begin(), wals.end());
bool corrupted_wal_found = false;
s = RecoverLogFiles(wals, &next_sequence, read_only, &corrupted_wal_found,
recovery_ctx);
s = RecoverLogFiles(wals, &next_sequence, read_only, is_retry,
&corrupted_wal_found, recovery_ctx);
if (corrupted_wal_found && recovered_seq != nullptr) {
*recovered_seq = next_sequence;
}
@ -1078,7 +1103,7 @@ bool DBImpl::InvokeWalFilterIfNeededOnWalRecord(uint64_t wal_number,
// REQUIRES: wal_numbers are sorted in ascending order
Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& wal_numbers,
SequenceNumber* next_sequence, bool read_only,
bool* corrupted_wal_found,
bool is_retry, bool* corrupted_wal_found,
RecoveryContext* recovery_ctx) {
struct LogReporter : public log::Reader::Reporter {
Env* env;
@ -1189,7 +1214,7 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& wal_numbers,
}
file_reader.reset(new SequentialFileReader(
std::move(file), fname, immutable_db_options_.log_readahead_size,
io_tracer_));
io_tracer_, /*listeners=*/{}, /*rate_limiter=*/nullptr, is_retry));
}
// Create the log reader.
@ -1833,8 +1858,12 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname,
const bool kBatchPerTxn = true;
ThreadStatusUtil::SetEnableTracking(db_options.enable_thread_tracking);
ThreadStatusUtil::SetThreadOperation(ThreadStatus::OperationType::OP_DBOPEN);
Status s = DBImpl::Open(db_options, dbname, column_families, handles, dbptr,
!kSeqPerBatch, kBatchPerTxn);
bool can_retry = false;
Status s;
do {
s = DBImpl::Open(db_options, dbname, column_families, handles, dbptr,
!kSeqPerBatch, kBatchPerTxn, can_retry, &can_retry);
} while (!s.ok() && can_retry);
ThreadStatusUtil::ResetThreadStatus();
return s;
}
@ -1956,7 +1985,8 @@ IOStatus DBImpl::CreateWAL(const WriteOptions& write_options,
Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
const std::vector<ColumnFamilyDescriptor>& column_families,
std::vector<ColumnFamilyHandle*>* handles, DB** dbptr,
const bool seq_per_batch, const bool batch_per_txn) {
const bool seq_per_batch, const bool batch_per_txn,
const bool is_retry, bool* can_retry) {
const WriteOptions write_options(Env::IOActivity::kDBOpen);
const ReadOptions read_options(Env::IOActivity::kDBOpen);
@ -2029,8 +2059,8 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
uint64_t recovered_seq(kMaxSequenceNumber);
s = impl->Recover(column_families, false /* read_only */,
false /* error_if_wal_file_exists */,
false /* error_if_data_exists_in_wals */, &recovered_seq,
&recovery_ctx);
false /* error_if_data_exists_in_wals */, is_retry,
&recovered_seq, &recovery_ctx, can_retry);
if (s.ok()) {
uint64_t new_log_number = impl->versions_->NewFileNumber();
log::Writer* new_log = nullptr;

View File

@ -33,8 +33,8 @@ DBImplSecondary::~DBImplSecondary() = default;
Status DBImplSecondary::Recover(
const std::vector<ColumnFamilyDescriptor>& column_families,
bool /*readonly*/, bool /*error_if_wal_file_exists*/,
bool /*error_if_data_exists_in_wals*/, uint64_t*,
RecoveryContext* /*recovery_ctx*/) {
bool /*error_if_data_exists_in_wals*/, bool /*is_retry*/, uint64_t*,
RecoveryContext* /*recovery_ctx*/, bool* /*can_retry*/) {
mutex_.AssertHeld();
JobContext job_context(0);

View File

@ -82,8 +82,9 @@ class DBImplSecondary : public DBImpl {
// and log_readers_ to facilitate future operations.
Status Recover(const std::vector<ColumnFamilyDescriptor>& column_families,
bool read_only, bool error_if_wal_file_exists,
bool error_if_data_exists_in_wals, uint64_t* = nullptr,
RecoveryContext* recovery_ctx = nullptr) override;
bool error_if_data_exists_in_wals, bool is_retry = false,
uint64_t* = nullptr, RecoveryContext* recovery_ctx = nullptr,
bool* can_retry = nullptr) override;
// Can return IOError due to files being deleted by the primary. To avoid
// IOError in this case, application can coordinate between primary and

View File

@ -21,14 +21,15 @@ class CorruptionFS : public FileSystemWrapper {
int num_writable_file_errors_;
explicit CorruptionFS(const std::shared_ptr<FileSystem>& _target,
bool fs_buffer)
bool fs_buffer, bool verify_read)
: FileSystemWrapper(_target),
writable_file_error_(false),
num_writable_file_errors_(0),
corruption_trigger_(INT_MAX),
read_count_(0),
rnd_(300),
fs_buffer_(fs_buffer) {}
fs_buffer_(fs_buffer),
verify_read_(verify_read) {}
~CorruptionFS() override {
// Assert that the corruption was reset, which means it got triggered
assert(corruption_trigger_ == INT_MAX);
@ -113,11 +114,13 @@ class CorruptionFS : public FileSystemWrapper {
}
void SupportedOps(int64_t& supported_ops) override {
supported_ops = 1 << FSSupportedOps::kVerifyAndReconstructRead |
1 << FSSupportedOps::kAsyncIO;
supported_ops = 1 << FSSupportedOps::kAsyncIO;
if (fs_buffer_) {
supported_ops |= 1 << FSSupportedOps::kFSBuffer;
}
if (verify_read_) {
supported_ops |= 1 << FSSupportedOps::kVerifyAndReconstructRead;
}
}
private:
@ -125,6 +128,7 @@ class CorruptionFS : public FileSystemWrapper {
int read_count_;
Random rnd_;
bool fs_buffer_;
bool verify_read_;
};
} // anonymous namespace
@ -696,23 +700,24 @@ TEST_F(DBIOFailureTest, CompactionSstSyncError) {
class DBIOCorruptionTest
: public DBIOFailureTest,
public testing::WithParamInterface<std::tuple<bool, bool>> {
public testing::WithParamInterface<std::tuple<bool, bool, bool>> {
public:
DBIOCorruptionTest() : DBIOFailureTest() {
BlockBasedTableOptions bbto;
Options options = CurrentOptions();
options_ = CurrentOptions();
base_env_ = env_;
EXPECT_NE(base_env_, nullptr);
fs_.reset(
new CorruptionFS(base_env_->GetFileSystem(), std::get<0>(GetParam())));
fs_.reset(new CorruptionFS(base_env_->GetFileSystem(),
std::get<0>(GetParam()),
std::get<2>(GetParam())));
env_guard_ = NewCompositeEnv(fs_);
options.env = env_guard_.get();
options_.env = env_guard_.get();
bbto.num_file_reads_for_auto_readahead = 0;
options.table_factory.reset(NewBlockBasedTableFactory(bbto));
options.disable_auto_compactions = true;
options_.table_factory.reset(NewBlockBasedTableFactory(bbto));
options_.disable_auto_compactions = true;
Reopen(options);
Reopen(options_);
}
~DBIOCorruptionTest() {
@ -720,10 +725,13 @@ class DBIOCorruptionTest
db_ = nullptr;
}
Status ReopenDB() { return TryReopen(options_); }
protected:
std::unique_ptr<Env> env_guard_;
std::shared_ptr<CorruptionFS> fs_;
Env* base_env_;
Options options_;
};
TEST_P(DBIOCorruptionTest, GetReadCorruptionRetry) {
@ -737,8 +745,13 @@ TEST_P(DBIOCorruptionTest, GetReadCorruptionRetry) {
std::string val;
ReadOptions ro;
ro.async_io = std::get<1>(GetParam());
ASSERT_OK(dbfull()->Get(ReadOptions(), "key1", &val));
ASSERT_EQ(val, "val1");
Status s = dbfull()->Get(ReadOptions(), "key1", &val);
if (std::get<2>(GetParam())) {
ASSERT_OK(s);
ASSERT_EQ(val, "val1");
} else {
ASSERT_TRUE(s.IsCorruption());
}
}
TEST_P(DBIOCorruptionTest, IterReadCorruptionRetry) {
@ -758,7 +771,11 @@ TEST_P(DBIOCorruptionTest, IterReadCorruptionRetry) {
while (iter->status().ok() && iter->Valid()) {
iter->Next();
}
ASSERT_OK(iter->status());
if (std::get<2>(GetParam())) {
ASSERT_OK(iter->status());
} else {
ASSERT_TRUE(iter->status().IsCorruption());
}
delete iter;
}
@ -779,8 +796,13 @@ TEST_P(DBIOCorruptionTest, MultiGetReadCorruptionRetry) {
ro.async_io = std::get<1>(GetParam());
dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
keys.data(), values.data(), statuses.data());
ASSERT_EQ(values[0].ToString(), "val1");
ASSERT_EQ(values[1].ToString(), "val2");
if (std::get<2>(GetParam())) {
ASSERT_EQ(values[0].ToString(), "val1");
ASSERT_EQ(values[1].ToString(), "val2");
} else {
ASSERT_TRUE(statuses[0].IsCorruption());
ASSERT_TRUE(statuses[1].IsCorruption());
}
}
TEST_P(DBIOCorruptionTest, CompactionReadCorruptionRetry) {
@ -793,13 +815,18 @@ TEST_P(DBIOCorruptionTest, CompactionReadCorruptionRetry) {
ASSERT_OK(Put("key2", "val2"));
ASSERT_OK(Flush());
fs->SetCorruptionTrigger(1);
ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr));
Status s = dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
if (std::get<2>(GetParam())) {
ASSERT_OK(s);
std::string val;
ReadOptions ro;
ro.async_io = std::get<1>(GetParam());
ASSERT_OK(dbfull()->Get(ro, "key1", &val));
ASSERT_EQ(val, "val1");
std::string val;
ReadOptions ro;
ro.async_io = std::get<1>(GetParam());
ASSERT_OK(dbfull()->Get(ro, "key1", &val));
ASSERT_EQ(val, "val1");
} else {
ASSERT_TRUE(s.IsCorruption());
}
}
TEST_P(DBIOCorruptionTest, FlushReadCorruptionRetry) {
@ -808,17 +835,44 @@ TEST_P(DBIOCorruptionTest, FlushReadCorruptionRetry) {
ASSERT_OK(Put("key1", "val1"));
fs->SetCorruptionTrigger(1);
ASSERT_OK(Flush());
Status s = Flush();
if (std::get<2>(GetParam())) {
ASSERT_OK(s);
std::string val;
ReadOptions ro;
ro.async_io = std::get<1>(GetParam());
ASSERT_OK(dbfull()->Get(ro, "key1", &val));
ASSERT_EQ(val, "val1");
std::string val;
ReadOptions ro;
ro.async_io = std::get<1>(GetParam());
ASSERT_OK(dbfull()->Get(ro, "key1", &val));
ASSERT_EQ(val, "val1");
} else {
ASSERT_NOK(s);
}
}
TEST_P(DBIOCorruptionTest, ManifestCorruptionRetry) {
CorruptionFS* fs =
static_cast<CorruptionFS*>(env_guard_->GetFileSystem().get());
ASSERT_OK(Put("key1", "val1"));
ASSERT_OK(Flush());
SyncPoint::GetInstance()->SetCallBack(
"VersionSet::Recover:StartManifestRead",
[&](void* /*arg*/) { fs->SetCorruptionTrigger(0); });
SyncPoint::GetInstance()->EnableProcessing();
if (std::get<2>(GetParam())) {
ASSERT_OK(ReopenDB());
} else {
ASSERT_EQ(ReopenDB(), Status::Corruption());
}
SyncPoint::GetInstance()->DisableProcessing();
}
// The parameters are - 1. Use FS provided buffer, 2. Use async IO ReadOption,
// 3. Retry with verify_and_reconstruct_read IOOption
INSTANTIATE_TEST_CASE_P(DBIOCorruptionTest, DBIOCorruptionTest,
testing::Combine(testing::Bool(), testing::Bool()));
testing::Combine(testing::Bool(), testing::Bool(),
testing::Bool()));
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {

View File

@ -6013,7 +6013,8 @@ Status VersionSet::GetCurrentManifestPath(const std::string& dbname,
Status VersionSet::Recover(
const std::vector<ColumnFamilyDescriptor>& column_families, bool read_only,
std::string* db_id, bool no_error_if_files_missing) {
std::string* db_id, bool no_error_if_files_missing, bool is_retry,
Status* log_status) {
const ReadOptions read_options(Env::IOActivity::kDBOpen);
// Read "CURRENT" file, which contains a pointer to the current manifest
// file
@ -6038,8 +6039,11 @@ Status VersionSet::Recover(
}
manifest_file_reader.reset(new SequentialFileReader(
std::move(manifest_file), manifest_path,
db_options_->log_readahead_size, io_tracer_, db_options_->listeners));
db_options_->log_readahead_size, io_tracer_, db_options_->listeners,
/*rate_limiter=*/nullptr, is_retry));
}
TEST_SYNC_POINT("VersionSet::Recover:StartManifestRead");
uint64_t current_manifest_file_size = 0;
uint64_t log_number = 0;
{
@ -6063,6 +6067,9 @@ Status VersionSet::Recover(
if (s.ok()) {
RecoverEpochNumbers();
}
if (log_status) {
*log_status = log_read_status;
}
}
if (s.ok()) {

View File

@ -1258,7 +1258,8 @@ class VersionSet {
// are not opened
Status Recover(const std::vector<ColumnFamilyDescriptor>& column_families,
bool read_only = false, std::string* db_id = nullptr,
bool no_error_if_files_missing = false);
bool no_error_if_files_missing = false, bool is_retry = false,
Status* log_status = nullptr);
Status TryRecover(const std::vector<ColumnFamilyDescriptor>& column_families,
bool read_only,

View File

@ -179,7 +179,8 @@ TEST_P(WriteCallbackPTest, WriteWithCallbackTest) {
column_families.emplace_back(kDefaultColumnFamilyName, cf_options);
std::vector<ColumnFamilyHandle*> handles;
auto open_s = DBImpl::Open(db_options, dbname, column_families, &handles,
&db, seq_per_batch_, true /* batch_per_txn */);
&db, seq_per_batch_, true /* batch_per_txn */,
false /* is_retry */, nullptr /* can_retry */);
ASSERT_OK(open_s);
assert(handles.size() == 1);
delete handles[0];

View File

@ -41,6 +41,7 @@ IOStatus SequentialFileReader::Read(size_t n, Slice* result, char* scratch,
IOStatus io_s;
IOOptions io_opts;
io_opts.rate_limiter_priority = rate_limiter_priority;
io_opts.verify_and_reconstruct_read = verify_and_reconstruct_read_;
if (use_direct_io()) {
//
// |-offset_advance-|---bytes returned--|

View File

@ -56,6 +56,7 @@ class SequentialFileReader {
std::atomic<size_t> offset_{0}; // read offset
std::vector<std::shared_ptr<EventListener>> listeners_{};
RateLimiter* rate_limiter_;
bool verify_and_reconstruct_read_;
public:
explicit SequentialFileReader(
@ -63,11 +64,13 @@ class SequentialFileReader {
const std::shared_ptr<IOTracer>& io_tracer = nullptr,
const std::vector<std::shared_ptr<EventListener>>& listeners = {},
RateLimiter* rate_limiter =
nullptr) // TODO: migrate call sites to provide rate limiter
nullptr, // TODO: migrate call sites to provide rate limiter
bool verify_and_reconstruct_read = false)
: file_name_(_file_name),
file_(std::move(_file), io_tracer, _file_name),
listeners_(),
rate_limiter_(rate_limiter) {
rate_limiter_(rate_limiter),
verify_and_reconstruct_read_(verify_and_reconstruct_read) {
AddFileIOListeners(listeners);
}
@ -77,12 +80,14 @@ class SequentialFileReader {
const std::shared_ptr<IOTracer>& io_tracer = nullptr,
const std::vector<std::shared_ptr<EventListener>>& listeners = {},
RateLimiter* rate_limiter =
nullptr) // TODO: migrate call sites to provide rate limiter
nullptr, // TODO: migrate call sites to provide rate limiter
bool verify_and_reconstruct_read = false)
: file_name_(_file_name),
file_(NewReadaheadSequentialFile(std::move(_file), _readahead_size),
io_tracer, _file_name),
listeners_(),
rate_limiter_(rate_limiter) {
rate_limiter_(rate_limiter),
verify_and_reconstruct_read_(verify_and_reconstruct_read) {
AddFileIOListeners(listeners);
}
static IOStatus Create(const std::shared_ptr<FileSystem>& fs,

View File

@ -0,0 +1 @@
On distributed file systems that support file system level checksum verification and reconstruction reads, RocksDB will now retry a file read if the initial read fails RocksDB block level or record level checksum verification. This applies to MANIFEST file reads when the DB is opened, and to SST file reads at all times.

View File

@ -255,7 +255,8 @@ Status TransactionDB::Open(
txn_db_options.write_policy == WRITE_COMMITTED ||
txn_db_options.write_policy == WRITE_PREPARED;
s = DBImpl::Open(db_options_2pc, dbname, column_families_copy, handles, &db,
use_seq_per_batch, use_batch_per_txn);
use_seq_per_batch, use_batch_per_txn,
/*is_retry=*/false, /*can_retry=*/nullptr);
if (s.ok()) {
ROCKS_LOG_WARN(db->GetDBOptions().info_log,
"Transaction write_policy is %" PRId32,

View File

@ -170,7 +170,8 @@ class TransactionTestBase : public ::testing::Test {
txn_db_options.write_policy == WRITE_COMMITTED ||
txn_db_options.write_policy == WRITE_PREPARED;
Status s = DBImpl::Open(options_copy, dbname, cfs, handles, &root_db,
use_seq_per_batch, use_batch_per_txn);
use_seq_per_batch, use_batch_per_txn,
/*is_retry=*/false, /*can_retry=*/nullptr);
auto stackable_db = std::make_unique<StackableDB>(root_db);
if (s.ok()) {
assert(root_db != nullptr);
@ -200,7 +201,8 @@ class TransactionTestBase : public ::testing::Test {
txn_db_options.write_policy == WRITE_COMMITTED ||
txn_db_options.write_policy == WRITE_PREPARED;
Status s = DBImpl::Open(options_copy, dbname, column_families, &handles,
&root_db, use_seq_per_batch, use_batch_per_txn);
&root_db, use_seq_per_batch, use_batch_per_txn,
/*is_retry=*/false, /*can_retry=*/nullptr);
if (!s.ok()) {
delete root_db;
return s;