Enable recycle_log_file_num option for point in time recovery (#12403)

Summary:
This option was previously disabled due to a bug in the recovery logic. The recovery code in `DBImpl::RecoverLogFiles` couldn't tell if an EoF reported by the log reader was really an EoF or a possible corruption that made a record look like an old log record. To fix this, the log reader now explicitly reports when it encounters what looks like an old record. The recovery code treats it as a possible corruption, and uses the next sequence number in the WAL to determine if it should continue replaying the WAL.

This PR also fixes a couple of bugs that log file recycling exposed in the backup and checkpoint path.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/12403

Test Plan:
1. Add new unit tests to verify behavior upon corruption
2. Re-enable disabled tests for verifying recycling behavior

Reviewed By: ajkr

Differential Revision: D54544824

Pulled By: anand1976

fbshipit-source-id: 12f5ce39bd6bc0d63b0bc6432dc4db510e0e802a
This commit is contained in:
anand76 2024-03-21 12:29:35 -07:00 committed by Facebook GitHub Bot
parent 98d8a85624
commit 63a105a481
11 changed files with 146 additions and 16 deletions

View File

@ -390,8 +390,11 @@ Status DBImpl::GetLiveFilesStorageInfo(
info.file_number = live_wal_files[i]->LogNumber();
info.file_type = kWalFile;
info.size = live_wal_files[i]->SizeFileBytes();
// Only last should need to be trimmed
info.trim_to_size = (i + 1 == wal_size);
// Trim the log either if its the last one, or log file recycling is
// enabled. In the latter case, a hard link doesn't prevent the file
// from being renamed and recycled. So we need to copy it instead.
info.trim_to_size = (i + 1 == wal_size) ||
(immutable_db_options_.recycle_log_file_num > 0);
if (opts.include_checksum_info) {
info.file_checksum_func_name = kUnknownFileChecksumFuncName;
info.file_checksum = kUnknownFileChecksum;

View File

@ -1757,7 +1757,11 @@ void DBImpl::MarkLogsSynced(uint64_t up_to, bool synced_dir,
wal.GetPreSyncSize() > 0) {
synced_wals->AddWal(wal.number, WalMetadata(wal.GetPreSyncSize()));
}
if (wal.GetPreSyncSize() == wal.writer->file()->GetFlushedSize()) {
// Check if the file has been closed, i.e wal.writer->file() == nullptr
// which can happen if log recycling is enabled, or if all the data in
// the log has been synced
if (wal.writer->file() == nullptr ||
wal.GetPreSyncSize() == wal.writer->file()->GetFlushedSize()) {
// Fully synced
logs_to_free_.push_back(wal.ReleaseWriter());
it = logs_.erase(it);

View File

@ -163,8 +163,10 @@ IOStatus DBImpl::SyncClosedLogs(const WriteOptions& write_options,
if (error_recovery_in_prog) {
log->file()->reset_seen_error();
}
// TODO: plumb Env::IOActivity, Env::IOPriority
io_s = log->Close(WriteOptions());
// Normally the log file is closed when purging obsolete file, but if
// log recycling is enabled, the log file is closed here so that it
// can be reused.
io_s = log->Close(write_options);
if (!io_s.ok()) {
break;
}

View File

@ -104,7 +104,6 @@ DBOptions SanitizeOptions(const std::string& dbname, const DBOptions& src,
if (result.recycle_log_file_num &&
(result.wal_recovery_mode ==
WALRecoveryMode::kTolerateCorruptedTailRecords ||
result.wal_recovery_mode == WALRecoveryMode::kPointInTimeRecovery ||
result.wal_recovery_mode == WALRecoveryMode::kAbsoluteConsistency)) {
// - kTolerateCorruptedTailRecords is inconsistent with recycle log file
// feature. WAL recycling expects recovery success upon encountering a
@ -1086,6 +1085,7 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& wal_numbers,
Logger* info_log;
const char* fname;
Status* status; // nullptr if immutable_db_options_.paranoid_checks==false
bool* old_log_record;
void Corruption(size_t bytes, const Status& s) override {
ROCKS_LOG_WARN(info_log, "%s%s: dropping %d bytes; %s",
(status == nullptr ? "(ignoring error) " : ""), fname,
@ -1094,10 +1094,19 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& wal_numbers,
*status = s;
}
}
void OldLogRecord(size_t bytes) override {
if (old_log_record != nullptr) {
*old_log_record = true;
}
ROCKS_LOG_WARN(info_log, "%s: dropping %d bytes; possibly recycled",
fname, static_cast<int>(bytes));
}
};
mutex_.AssertHeld();
Status status;
bool old_log_record = false;
std::unordered_map<int, VersionEdit> version_edits;
// no need to refcount because iteration is under mutex
for (auto cfd : *versions_->GetColumnFamilySet()) {
@ -1188,6 +1197,7 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& wal_numbers,
reporter.env = env_;
reporter.info_log = immutable_db_options_.info_log.get();
reporter.fname = fname.c_str();
reporter.old_log_record = &old_log_record;
if (!immutable_db_options_.paranoid_checks ||
immutable_db_options_.wal_recovery_mode ==
WALRecoveryMode::kSkipAnyCorruptedRecords) {
@ -1335,7 +1345,7 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& wal_numbers,
}
}
if (!status.ok()) {
if (!status.ok() || old_log_record) {
if (status.IsNotSupported()) {
// We should not treat NotSupported as corruption. It is rather a clear
// sign that we are processing a WAL that is produced by an incompatible
@ -1360,6 +1370,7 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& wal_numbers,
}
// We should ignore the error but not continue replaying
status = Status::OK();
old_log_record = false;
stop_replay_for_corruption = true;
corrupted_wal_number = wal_number;
if (corrupted_wal_found != nullptr) {

View File

@ -220,6 +220,11 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
write_options.protection_bytes_per_key != 8) {
return Status::InvalidArgument(
"`WriteOptions::protection_bytes_per_key` must be zero or eight");
} else if (write_options.disableWAL &&
immutable_db_options_.recycle_log_file_num > 0) {
return Status::InvalidArgument(
"WriteOptions::disableWAL option is not supported if "
"DBOptions::recycle_log_file_num > 0");
}
// TODO: this use of operator bool on `tracer_` can avoid unnecessary lock
// grabs but does not seem thread-safe.
@ -2173,8 +2178,10 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) {
log_write_mutex_.Unlock();
}
uint64_t recycle_log_number = 0;
// If file deletion is disabled, don't recycle logs since it'll result in
// the file getting renamed
if (creating_new_log && immutable_db_options_.recycle_log_file_num &&
!log_recycle_files_.empty()) {
!log_recycle_files_.empty() && IsFileDeletionsEnabled()) {
recycle_log_number = log_recycle_files_.front();
}
uint64_t new_log_number =

View File

@ -1123,15 +1123,13 @@ TEST_F(DBWALTest, PreallocateBlock) {
}
#endif // !(defined NDEBUG) || !defined(OS_WIN)
TEST_F(DBWALTest, DISABLED_FullPurgePreservesRecycledLog) {
// TODO(ajkr): Disabled until WAL recycling is fixed for
// `kPointInTimeRecovery`.
TEST_F(DBWALTest, FullPurgePreservesRecycledLog) {
// For github issue #1303
for (int i = 0; i < 2; ++i) {
Options options = CurrentOptions();
options.create_if_missing = true;
options.recycle_log_file_num = 2;
options.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery;
if (i != 0) {
options.wal_dir = alternative_wal_dir_;
}
@ -1162,16 +1160,14 @@ TEST_F(DBWALTest, DISABLED_FullPurgePreservesRecycledLog) {
}
}
TEST_F(DBWALTest, DISABLED_FullPurgePreservesLogPendingReuse) {
// TODO(ajkr): Disabled until WAL recycling is fixed for
// `kPointInTimeRecovery`.
TEST_F(DBWALTest, FullPurgePreservesLogPendingReuse) {
// Ensures full purge cannot delete a WAL while it's in the process of being
// recycled. In particular, we force the full purge after a file has been
// chosen for reuse, but before it has been renamed.
for (int i = 0; i < 2; ++i) {
Options options = CurrentOptions();
options.recycle_log_file_num = 1;
options.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery;
if (i != 0) {
options.wal_dir = alternative_wal_dir_;
}

View File

@ -821,6 +821,95 @@ TEST_P(DBWriteTest, ConcurrentlyDisabledWAL) {
ASSERT_LE(bytes_num, 1024 * 100);
}
void CorruptLogFile(Env* env, Options& options, std::string log_path,
uint64_t log_num, int record_num) {
std::shared_ptr<FileSystem> fs = env->GetFileSystem();
std::unique_ptr<SequentialFileReader> file_reader;
Status status;
{
std::unique_ptr<FSSequentialFile> file;
status = fs->NewSequentialFile(log_path, FileOptions(), &file, nullptr);
ASSERT_EQ(status, IOStatus::OK());
file_reader.reset(new SequentialFileReader(std::move(file), log_path));
}
std::unique_ptr<log::Reader> reader(new log::Reader(
nullptr, std::move(file_reader), nullptr, false, log_num));
std::string scratch;
Slice record;
uint64_t record_checksum;
for (int i = 0; i < record_num; ++i) {
ASSERT_TRUE(reader->ReadRecord(&record, &scratch, options.wal_recovery_mode,
&record_checksum));
}
uint64_t rec_start = reader->LastRecordOffset();
reader.reset();
{
std::unique_ptr<FSRandomRWFile> file;
status = fs->NewRandomRWFile(log_path, FileOptions(), &file, nullptr);
ASSERT_EQ(status, IOStatus::OK());
uint32_t bad_lognum = 0xff;
ASSERT_EQ(file->Write(
rec_start + 7,
Slice(reinterpret_cast<char*>(&bad_lognum), sizeof(uint32_t)),
IOOptions(), nullptr),
IOStatus::OK());
ASSERT_OK(file->Close(IOOptions(), nullptr));
file.reset();
}
}
TEST_P(DBWriteTest, RecycleLogTest) {
Options options = GetOptions();
options.recycle_log_file_num = 0;
options.avoid_flush_during_recovery = true;
options.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery;
Reopen(options);
ASSERT_OK(Put(Key(1), "val1"));
ASSERT_OK(Put(Key(2), "val1"));
uint64_t latest_log_num = 0;
std::unique_ptr<LogFile> log_file;
ASSERT_OK(dbfull()->GetCurrentWalFile(&log_file));
latest_log_num = log_file->LogNumber();
Reopen(options);
ASSERT_OK(Put(Key(3), "val3"));
// Corrupt second entry of first log
std::string log_path = LogFileName(dbname_, latest_log_num);
CorruptLogFile(env_, options, log_path, latest_log_num, 2);
Reopen(options);
ASSERT_EQ(Get(Key(1)), "val1");
ASSERT_EQ(Get(Key(2)), "NOT_FOUND");
ASSERT_EQ(Get(Key(3)), "NOT_FOUND");
}
TEST_P(DBWriteTest, RecycleLogTestCFAheadOfWAL) {
Options options = GetOptions();
options.recycle_log_file_num = 0;
options.avoid_flush_during_recovery = true;
options.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery;
CreateAndReopenWithCF({"pikachu"}, options);
ASSERT_OK(Put(1, Key(1), "val1"));
ASSERT_OK(Put(0, Key(2), "val2"));
uint64_t latest_log_num = 0;
std::unique_ptr<LogFile> log_file;
ASSERT_OK(dbfull()->GetCurrentWalFile(&log_file));
latest_log_num = log_file->LogNumber();
ASSERT_OK(Flush(1));
ASSERT_OK(Put(1, Key(3), "val3"));
// Corrupt second entry of first log
std::string log_path = LogFileName(dbname_, latest_log_num);
CorruptLogFile(env_, options, log_path, latest_log_num, 2);
ASSERT_EQ(TryReopenWithColumnFamilies({"default", "pikachu"}, options),
Status::Corruption());
}
INSTANTIATE_TEST_CASE_P(DBWriteTestInstance, DBWriteTest,
testing::Values(DBTestBase::kDefault,
DBTestBase::kConcurrentWALWrites,

View File

@ -258,6 +258,10 @@ bool Reader::ReadRecord(Slice* record, std::string* scratch,
// writing a physical record but before completing the next; don't
// treat it as a corruption, just ignore the entire logical record.
scratch->clear();
} else {
if (wal_recovery_mode == WALRecoveryMode::kPointInTimeRecovery) {
ReportOldLogRecord(scratch->size());
}
}
return false;
}
@ -405,6 +409,12 @@ void Reader::ReportDrop(size_t bytes, const Status& reason) {
}
}
void Reader::ReportOldLogRecord(size_t bytes) {
if (reporter_ != nullptr) {
reporter_->OldLogRecord(bytes);
}
}
bool Reader::ReadMore(size_t* drop_size, int* error) {
if (!eof_ && !read_error_) {
// Last read was a full read, so this is a trailer to skip

View File

@ -45,6 +45,8 @@ class Reader {
// Some corruption was detected. "size" is the approximate number
// of bytes dropped due to the corruption.
virtual void Corruption(size_t bytes, const Status& status) = 0;
virtual void OldLogRecord(size_t /*bytes*/) {}
};
// Create a reader that will return log records from "*file".
@ -202,6 +204,7 @@ class Reader {
// buffer_ must be updated to remove the dropped bytes prior to invocation.
void ReportCorruption(size_t bytes, const char* reason);
void ReportDrop(size_t bytes, const Status& reason);
void ReportOldLogRecord(size_t bytes);
void InitCompression(const CompressionTypeRecord& compression_record);

View File

@ -771,6 +771,10 @@ def finalize_and_sanitize(src_params):
# disable atomic flush.
if dest_params["test_best_efforts_recovery"] == 0:
dest_params["disable_wal"] = 0
if dest_params.get("disable_wal") == 1:
# disableWAL and recycle_log_file_num options are not mutually
# compatible at the moment
dest_params["recycle_log_file_num"] = 0
return dest_params

View File

@ -0,0 +1 @@
Re-enable the recycle_log_file_num option in DBOptions for kPointInTimeRecovery WAL recovery mode, which was previously disabled due to a bug in the recovery logic. This option is incompatible with WriteOptions::disableWAL. A Status::InvalidArgument() will be returned if disableWAL is specified.