diff --git a/db/db_impl.cc b/db/db_impl.cc index 0abe140124..d4e6482122 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -957,7 +957,9 @@ Status DBImpl::RecoverLogFiles(const std::vector& log_numbers, info_log, "%s%s: dropping %d bytes; %s", (this->status == nullptr ? "(ignoring error) " : ""), fname, static_cast(bytes), s.ToString().c_str()); - if (this->status != nullptr && this->status->ok()) *this->status = s; + if (this->status != nullptr && this->status->ok()) { + *this->status = s; + } } }; @@ -983,6 +985,7 @@ Status DBImpl::RecoverLogFiles(const std::vector& log_numbers, stream.EndArray(); } + bool continue_replay_log = true; for (auto log_number : log_numbers) { // The previous incarnation may not have written any MANIFEST // records after allocating this log number. So we manually @@ -1008,21 +1011,56 @@ Status DBImpl::RecoverLogFiles(const std::vector& log_numbers, reporter.env = env_; reporter.info_log = db_options_.info_log.get(); reporter.fname = fname.c_str(); - reporter.status = (db_options_.paranoid_checks) ? &status : nullptr; + if (!db_options_.paranoid_checks || + db_options_.wal_recovery_mode == + WALRecoveryMode::kSkipAnyCorruptedRecords) { + reporter.status = nullptr; + } else { + reporter.status = &status; + } // We intentially make log::Reader do checksumming even if // paranoid_checks==false so that corruptions cause entire commits // to be skipped instead of propagating bad information (like overly // large sequence numbers). log::Reader reader(std::move(file), &reporter, true /*checksum*/, 0 /*initial_offset*/); - Log(InfoLogLevel::INFO_LEVEL, - db_options_.info_log, "Recovering log #%" PRIu64 "", log_number); + Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log, + "Recovering log #%" PRIu64 " mode %d skip-recovery %d", log_number, + db_options_.wal_recovery_mode, !continue_replay_log); + + // Determine if we should tolerate incomplete records at the tail end of the + // log + bool report_eof_inconsistency; + if (db_options_.wal_recovery_mode == + WALRecoveryMode::kAbsoluteConsistency) { + // in clean shutdown we don't expect any error in the log files + report_eof_inconsistency = true; + } else { + // for other modes ignore only incomplete records in the last log file + // which is presumably due to write in progress during restart + report_eof_inconsistency = false; + + // TODO krad: Evaluate if we need to move to a more strict mode where we + // restrict the inconsistency to only the last log + } // Read all the records and add to a memtable std::string scratch; Slice record; WriteBatch batch; - while (reader.ReadRecord(&record, &scratch) && status.ok()) { + + if (!continue_replay_log) { + uint64_t bytes; + if (env_->GetFileSize(fname, &bytes).ok()) { + auto info_log = db_options_.info_log.get(); + Log(InfoLogLevel::WARN_LEVEL, info_log, "%s: dropping %d bytes", + fname.c_str(), static_cast(bytes)); + } + } + + while (continue_replay_log && + reader.ReadRecord(&record, &scratch, report_eof_inconsistency) && + status.ok()) { if (record.size() < 12) { reporter.Corruption(record.size(), Status::Corruption("log record too small")); @@ -1075,7 +1113,24 @@ Status DBImpl::RecoverLogFiles(const std::vector& log_numbers, } if (!status.ok()) { - return status; + // The hook function is designed to ignore all IO errors from reader + // during recovery for kSkipAnyCorruptedRecords. Status variable is + // unmodified by the reader. + assert(db_options_.wal_recovery_mode != + WALRecoveryMode::kSkipAnyCorruptedRecords); + if (db_options_.wal_recovery_mode == + WALRecoveryMode::kPointInTimeRecovery) { + // We should ignore the error but not continue replaying + status = Status::OK(); + continue_replay_log = false; + + Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log, + "Point in time recovered to log #%" PRIu64 " seq #%" PRIu64, + log_number, *max_sequence); + } else if (db_options_.wal_recovery_mode != + WALRecoveryMode::kSkipAnyCorruptedRecords) { + return status; + } } flush_scheduler_.Clear(); diff --git a/db/db_test.cc b/db/db_test.cc index be7e69852f..f7f9f4cc17 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -14,6 +14,7 @@ #include #include #include +#include #include "db/filename.h" #include "db/dbformat.h" @@ -8628,6 +8629,188 @@ TEST_F(DBTest, TransactionLogIteratorCorruptedLog) { } while (ChangeCompactOptions()); } +// +// Test WAL recovery for the various modes available +// TODO krad: +// 1. Add tests when there are more than one log file +// +class RecoveryTestHelper { + public: + // Recreate and fill the store with some data + static size_t FillData(DBTest* test, const Options& options) { + size_t count = 0; + + test->DestroyAndReopen(options); + + for (int i = 0; i < 1024; i++) { + test->Put("key" + ToString(i), test->DummyString(10)); + ++count; + } + return count; + } + + // Read back all the keys we wrote and return the number of keys found + static size_t GetData(DBTest* test) { + size_t count = 0; + for (size_t i = 0; i < 1024; i++) { + if (test->Get("key" + ToString(i)) != "NOT_FOUND") { + ++count; + } + } + return count; + } + + // Overwrite data with 'a' from offset for length len + static void InduceCorruption(const std::string& filename, uint32_t offset, + uint32_t len) { + ASSERT_GT(len, 0); + + int fd = open(filename.c_str(), O_RDWR); + + ASSERT_GT(fd, 0); + ASSERT_EQ(offset, lseek(fd, offset, SEEK_SET)); + + char buf[len]; + memset(buf, 'a', len); + ASSERT_EQ(len, write(fd, buf, len)); + + close(fd); + } + + // Corrupt the last WAL file from (filesize * off) for length (filesize * len) + static void CorruptWAL(DBTest* test, const double off, const double len, + const bool trunc = false) { + rocksdb::VectorLogPtr wal_files; + ASSERT_OK(test->dbfull()->GetSortedWalFiles(wal_files)); + ASSERT_EQ(wal_files.size(), 1); + const auto logfile_path = + test->dbname_ + "/" + wal_files.front()->PathName(); + auto size = wal_files.front()->SizeFileBytes(); + + if (trunc) { + ASSERT_EQ(0, truncate(logfile_path.c_str(), size * off)); + } else { + InduceCorruption(logfile_path, size * off, size * len); + } + } +}; + +// Test scope: +// - We expect to open the data store when there is incomplete trailing writes +// at the end of any of the logs +// - We do not expect to open the data store for corruption +TEST_F(DBTest, kTolerateCorruptedTailRecords) { + for (auto trunc : {true, false}) { + for (int i = 0; i < 4; i++) { + // Fill data for testing + Options options = CurrentOptions(); + const size_t row_count = RecoveryTestHelper::FillData(this, options); + + // test checksum failure or parsing + RecoveryTestHelper::CorruptWAL(this, i * .3, /*len%=*/.1, trunc); + + if (trunc) { + options.wal_recovery_mode = + WALRecoveryMode::kTolerateCorruptedTailRecords; + ASSERT_OK(TryReopen(options)); + const size_t recovered_row_count = RecoveryTestHelper::GetData(this); + ASSERT_TRUE(i == 0 || recovered_row_count > 0); + ASSERT_LT(recovered_row_count, row_count); + } else { + options.wal_recovery_mode = + WALRecoveryMode::kTolerateCorruptedTailRecords; + ASSERT_NOK(TryReopen(options)); + } + } + } +} + +// Test scope: +// We don't expect the data store to be opened if there is any corruption +// (leading, middle or trailing -- incomplete writes or corruption) +TEST_F(DBTest, kAbsoluteConsistency) { + Options options = CurrentOptions(); + const size_t row_count = RecoveryTestHelper::FillData(this, options); + options.wal_recovery_mode = WALRecoveryMode::kAbsoluteConsistency; + ASSERT_OK(TryReopen(options)); + ASSERT_EQ(RecoveryTestHelper::GetData(this), row_count); + + for (auto trunc : {true, false}) { + for (int i = 0; i < 4; i++) { + if (trunc && i == 0) { + continue; + } + options = CurrentOptions(); + RecoveryTestHelper::FillData(this, options); + + RecoveryTestHelper::CorruptWAL(this, i * .3, /*len%=*/.1, trunc); + options.wal_recovery_mode = WALRecoveryMode::kAbsoluteConsistency; + ASSERT_NOK(TryReopen(options)); + } + } +} + +// Test scope: +// - We expect to open data store under all circumstances +// - We expect only data upto the point where the first error was encountered +TEST_F(DBTest, kPointInTimeRecovery) { + for (auto trunc : {true, false}) { + for (int i = 0; i < 4; i++) { + // Fill data for testing + Options options = CurrentOptions(); + const size_t row_count = RecoveryTestHelper::FillData(this, options); + + // test checksum failure or parsing + RecoveryTestHelper::CorruptWAL(this, i * .3, /*len%=*/.1, trunc); + + options.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery; + + ASSERT_OK(TryReopen(options)); + + size_t recovered_row_count = RecoveryTestHelper::GetData(this); + ASSERT_LT(recovered_row_count, row_count); + + // verify that the keys are sequential and there is no break + bool expect_data = true; + for (size_t j = 0; j < 1024; ++j) { + bool found = Get("key" + ToString(i)) != "NOT_FOUND"; + if (expect_data && !found) { + expect_data = false; + } + ASSERT_EQ(found, expect_data); + } + + ASSERT_TRUE(i != 0 || recovered_row_count == 0); + ASSERT_TRUE(i != 1 || recovered_row_count < row_count / 2); + } + } +} + +// Test scope: +// - We expect to open the data store under all scenarios +// - We expect to have recovered records past the corruption zone +TEST_F(DBTest, kSkipAnyCorruptedRecords) { + for (auto trunc : {true, false}) { + for (int i = 0; i < 4; i++) { + // Fill data for testing + Options options = CurrentOptions(); + const size_t row_count = RecoveryTestHelper::FillData(this, options); + + // induce leading corruption + RecoveryTestHelper::CorruptWAL(this, i * .3, /*len%=*/.1, trunc); + + options.wal_recovery_mode = WALRecoveryMode::kSkipAnyCorruptedRecords; + ASSERT_OK(TryReopen(options)); + size_t recovered_row_count = RecoveryTestHelper::GetData(this); + ASSERT_LT(recovered_row_count, row_count); + + if (!trunc) { + ASSERT_TRUE(i != 0 || recovered_row_count > 0); + } + } + } +} + TEST_F(DBTest, TransactionLogIteratorBatchOperations) { do { Options options = OptionsForLogIterTest(); diff --git a/db/log_reader.cc b/db/log_reader.cc index f6514cfd3c..f3fdc18f34 100644 --- a/db/log_reader.cc +++ b/db/log_reader.cc @@ -61,7 +61,8 @@ bool Reader::SkipToInitialBlock() { return true; } -bool Reader::ReadRecord(Slice* record, std::string* scratch) { +bool Reader::ReadRecord(Slice* record, std::string* scratch, + const bool report_eof_inconsistency) { if (last_record_offset_ < initial_offset_) { if (!SkipToInitialBlock()) { return false; @@ -78,7 +79,8 @@ bool Reader::ReadRecord(Slice* record, std::string* scratch) { Slice fragment; while (true) { uint64_t physical_record_offset = end_of_buffer_offset_ - buffer_.size(); - const unsigned int record_type = ReadPhysicalRecord(&fragment); + const unsigned int record_type = + ReadPhysicalRecord(&fragment, report_eof_inconsistency); switch (record_type) { case kFullType: if (in_fragmented_record && !scratch->empty()) { @@ -130,6 +132,9 @@ bool Reader::ReadRecord(Slice* record, std::string* scratch) { case kEof: if (in_fragmented_record) { + if (report_eof_inconsistency) { + ReportCorruption(scratch->size(), "error reading trailing data"); + } // This can be caused by the writer dying immediately after // writing a physical record but before completing the next; don't // treat it as a corruption, just ignore the entire logical record. @@ -238,7 +243,8 @@ void Reader::ReportDrop(size_t bytes, const Status& reason) { } } -unsigned int Reader::ReadPhysicalRecord(Slice* result) { +unsigned int Reader::ReadPhysicalRecord(Slice* result, + const bool report_eof_inconsistency) { while (true) { if (buffer_.size() < (size_t)kHeaderSize) { if (!eof_ && !read_error_) { @@ -259,8 +265,11 @@ unsigned int Reader::ReadPhysicalRecord(Slice* result) { } else { // Note that if buffer_ is non-empty, we have a truncated header at the // end of the file, which can be caused by the writer crashing in the - // middle of writing the header. Instead of considering this an error, - // just report EOF. + // middle of writing the header. Unless explicitly requested we don't + // considering this an error, just report EOF. + if (buffer_.size() && report_eof_inconsistency) { + ReportCorruption(buffer_.size(), "truncated header"); + } buffer_.clear(); return kEof; } @@ -281,7 +290,10 @@ unsigned int Reader::ReadPhysicalRecord(Slice* result) { } // If the end of the file has been reached without reading |length| bytes // of payload, assume the writer died in the middle of writing the record. - // Don't report a corruption. + // Don't report a corruption unless requested. + if (drop_size && report_eof_inconsistency) { + ReportCorruption(drop_size, "truncated header"); + } return kEof; } diff --git a/db/log_reader.h b/db/log_reader.h index a7cf45b4a0..e6cbf47ac9 100644 --- a/db/log_reader.h +++ b/db/log_reader.h @@ -61,7 +61,8 @@ class Reader { // "*scratch" as temporary storage. The contents filled in *record // will only be valid until the next mutating operation on this // reader or the next mutation to *scratch. - bool ReadRecord(Slice* record, std::string* scratch); + bool ReadRecord(Slice* record, std::string* scratch, + bool report_eof_inconsistency = false); // Returns the physical offset of the last record returned by ReadRecord. // @@ -120,7 +121,8 @@ class Reader { bool SkipToInitialBlock(); // Return type, or one of the preceding special values - unsigned int ReadPhysicalRecord(Slice* result); + unsigned int ReadPhysicalRecord(Slice* result, + bool report_eof_inconsistency = false); // Reports dropped bytes to the reporter. // buffer_ must be updated to remove the dropped bytes prior to invocation. diff --git a/db/log_test.cc b/db/log_test.cc index 816e38d1a5..74715acde0 100644 --- a/db/log_test.cc +++ b/db/log_test.cc @@ -208,10 +208,10 @@ class LogTest : public testing::Test { return dest_contents().size(); } - std::string Read() { + std::string Read(const bool report_eof_inconsistency = false) { std::string scratch; Slice record; - if (reader_.ReadRecord(&record, &scratch)) { + if (reader_.ReadRecord(&record, &scratch, report_eof_inconsistency)) { return record.ToString(); } else { return "EOF"; @@ -452,6 +452,15 @@ TEST_F(LogTest, TruncatedTrailingRecordIsIgnored) { ASSERT_EQ("", ReportMessage()); } +TEST_F(LogTest, TruncatedTrailingRecordIsNotIgnored) { + Write("foo"); + ShrinkSize(4); // Drop all payload as well as a header byte + ASSERT_EQ("EOF", Read(/*report_eof_inconsistency*/ true)); + // Truncated last record is ignored, not treated as an error + ASSERT_GT(DroppedBytes(), 0U); + ASSERT_EQ("OK", MatchError("Corruption: truncated header")); +} + TEST_F(LogTest, BadLength) { const int kPayloadSize = kBlockSize - kHeaderSize; Write(BigString("bar", kPayloadSize)); @@ -471,6 +480,14 @@ TEST_F(LogTest, BadLengthAtEndIsIgnored) { ASSERT_EQ("", ReportMessage()); } +TEST_F(LogTest, BadLengthAtEndIsNotIgnored) { + Write("foo"); + ShrinkSize(1); + ASSERT_EQ("EOF", Read(/*report_eof_inconsistency=*/true)); + ASSERT_GT(DroppedBytes(), 0U); + ASSERT_EQ("OK", MatchError("Corruption: truncated header")); +} + TEST_F(LogTest, ChecksumMismatch) { Write("foo"); IncrementByte(0, 10); @@ -528,6 +545,15 @@ TEST_F(LogTest, MissingLastIsIgnored) { ASSERT_EQ(0U, DroppedBytes()); } +TEST_F(LogTest, MissingLastIsNotIgnored) { + Write(BigString("bar", kBlockSize)); + // Remove the LAST block, including header. + ShrinkSize(14); + ASSERT_EQ("EOF", Read(/*report_eof_inconsistency=*/true)); + ASSERT_GT(DroppedBytes(), 0U); + ASSERT_EQ("OK", MatchError("Corruption: error reading trailing data")); +} + TEST_F(LogTest, PartialLastIsIgnored) { Write(BigString("bar", kBlockSize)); // Cause a bad record length in the LAST block. @@ -537,6 +563,17 @@ TEST_F(LogTest, PartialLastIsIgnored) { ASSERT_EQ(0U, DroppedBytes()); } +TEST_F(LogTest, PartialLastIsNotIgnored) { + Write(BigString("bar", kBlockSize)); + // Cause a bad record length in the LAST block. + ShrinkSize(1); + ASSERT_EQ("EOF", Read(/*report_eof_inconsistency=*/true)); + ASSERT_GT(DroppedBytes(), 0U); + ASSERT_EQ("OK", MatchError( + "Corruption: truncated headerCorruption: " + "error reading trailing data")); +} + TEST_F(LogTest, ErrorJoinsRecords) { // Consider two fragmented records: // first(R1) last(R1) first(R2) last(R2) diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index dd02795c6c..cd62ddb4e1 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -70,6 +70,29 @@ enum CompactionStyle : char { kCompactionStyleNone = 0x3, }; +enum class WALRecoveryMode : char { + // Original levelDB recovery + // We tolerate incomplete record in trailing data on all logs + // Use case : This is legacy behavior (default) + kTolerateCorruptedTailRecords = 0x00, + // Recover from clean shutdown + // We don't expect to find any corruption in the WAL + // Use case : This is ideal for unit tests and rare applications that + // can require high consistency gaurantee + kAbsoluteConsistency = 0x01, + // Recover to point-in-time consistency + // We stop the WAL playback on discovering WAL inconsistency + // Use case : Ideal for systems that have disk controller cache like + // hard disk, SSD without super capacitor that store related data + kPointInTimeRecovery = 0x02, + // Recovery after a disaster + // We ignore any corruption in the WAL and try to salvage as much data as + // possible + // Use case : Ideal for last ditch effort to recover data or systems that + // operate with low grade unrelated data + kSkipAnyCorruptedRecords = 0x03, +}; + struct CompactionOptionsFIFO { // once the total sum of table files reaches this, we will delete the oldest // table file @@ -1028,6 +1051,10 @@ struct DBOptions { // // Default: 1MB/s uint64_t delayed_write_rate; + + // Recovery mode to control the consistency while replaying WAL + // Default: kTolerateCorruptedTailRecords + WALRecoveryMode wal_recovery_mode; }; // Options to control the behavior of a database (passed to DB::Open) diff --git a/util/options.cc b/util/options.cc index c9de119496..1244f498a2 100644 --- a/util/options.cc +++ b/util/options.cc @@ -242,7 +242,8 @@ DBOptions::DBOptions() wal_bytes_per_sync(0), listeners(), enable_thread_tracking(false), - delayed_write_rate(1024U * 1024U) { + delayed_write_rate(1024U * 1024U), + wal_recovery_mode(WALRecoveryMode::kTolerateCorruptedTailRecords) { } DBOptions::DBOptions(const Options& options) @@ -288,7 +289,8 @@ DBOptions::DBOptions(const Options& options) wal_bytes_per_sync(options.wal_bytes_per_sync), listeners(options.listeners), enable_thread_tracking(options.enable_thread_tracking), - delayed_write_rate(options.delayed_write_rate) {} + delayed_write_rate(options.delayed_write_rate), + wal_recovery_mode(options.wal_recovery_mode) {} static const char* const access_hints[] = { "NONE", "NORMAL", "SEQUENTIAL", "WILLNEED"