From ee258619be968406d86522766d2d462426ca77fb Mon Sep 17 00:00:00 2001 From: anand76 Date: Fri, 8 Nov 2024 12:43:21 -0800 Subject: [PATCH] Fix missing cases of corruption retries (#13122) Summary: This PR fixes a few cases where RocksDB was not retrying checksum failure/corruption of file reads with the `verify_and_reconstruct_read` IO option. After fixing these cases, we can almost always successfully open the DB and execute reads even if we see transient corruptions, provided the `FileSystem` supports the `verify_and_reconstruct_read` option. The specific cases fixed in this PR are - 1. CURRENT file 2. IDENTITY file 3. OPTIONS file 4. SST footer Pull Request resolved: https://github.com/facebook/rocksdb/pull/13122 Test Plan: Unit test in `db_io_failure_test.cc` that injects corruption at various stages of DB open and reads Reviewed By: jaykorean Differential Revision: D65617982 Pulled By: anand1976 fbshipit-source-id: 4324b88cc7eee5501ab5df20ef7a95bb12ed3ea7 --- db/db_impl/db_impl.cc | 5 +- db/db_impl/db_impl.h | 5 +- db/db_impl/db_impl_files.cc | 9 +- db/db_impl/db_impl_open.cc | 6 +- db/db_impl/db_impl_readonly.cc | 3 +- db/db_io_failure_test.cc | 170 +++++++++++++++++- db/version_set.cc | 20 ++- db/version_set.h | 2 +- db/version_set_test.cc | 12 +- env/file_system.cc | 7 +- include/rocksdb/file_system.h | 4 + options/options_parser.cc | 132 ++++++++------ table/format.cc | 6 +- table/meta_blocks.cc | 2 +- .../bug_fixes/missing_strong_reads.md | 1 + 15 files changed, 290 insertions(+), 94 deletions(-) create mode 100644 unreleased_history/bug_fixes/missing_strong_reads.md diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index b1ea2e33c8..9cb2c465d9 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -5203,11 +5203,12 @@ Status DBImpl::GetDbIdentity(std::string& identity) const { return Status::OK(); } -Status DBImpl::GetDbIdentityFromIdentityFile(std::string* identity) const { +Status DBImpl::GetDbIdentityFromIdentityFile(const IOOptions& opts, + std::string* identity) const { std::string idfilename = IdentityFileName(dbname_); const FileOptions soptions; - Status s = ReadFileToString(fs_.get(), idfilename, identity); + Status s = ReadFileToString(fs_.get(), idfilename, opts, identity); if (!s.ok()) { return s; } diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index f2b57f788e..b187907ff0 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -482,7 +482,8 @@ class DBImpl : public DB { Status GetDbIdentity(std::string& identity) const override; - virtual Status GetDbIdentityFromIdentityFile(std::string* identity) const; + virtual Status GetDbIdentityFromIdentityFile(const IOOptions& opts, + std::string* identity) const; Status GetDbSessionId(std::string& session_id) const override; @@ -1592,7 +1593,7 @@ class DBImpl : public DB { // Read/create DB identity file (as appropriate), and write DB ID to // version_edit if provided. Status SetupDBId(const WriteOptions& write_options, bool read_only, - bool is_new_db, VersionEdit* version_edit); + bool is_new_db, bool is_retry, VersionEdit* version_edit); // Assign db_id_ and write DB ID to version_edit if provided. void SetDBId(std::string&& id, bool read_only, VersionEdit* version_edit); diff --git a/db/db_impl/db_impl_files.cc b/db/db_impl/db_impl_files.cc index bba8c064ca..4bb19774fc 100644 --- a/db/db_impl/db_impl_files.cc +++ b/db/db_impl/db_impl_files.cc @@ -983,7 +983,8 @@ void DBImpl::SetDBId(std::string&& id, bool read_only, } Status DBImpl::SetupDBId(const WriteOptions& write_options, bool read_only, - bool is_new_db, VersionEdit* version_edit) { + bool is_new_db, bool is_retry, + VersionEdit* version_edit) { Status s; if (!is_new_db) { // Check for the IDENTITY file and create it if not there or @@ -991,7 +992,11 @@ Status DBImpl::SetupDBId(const WriteOptions& write_options, bool read_only, std::string db_id_in_file; s = fs_->FileExists(IdentityFileName(dbname_), IOOptions(), nullptr); if (s.ok()) { - s = GetDbIdentityFromIdentityFile(&db_id_in_file); + IOOptions opts; + if (is_retry) { + opts.verify_and_reconstruct_read = true; + } + s = GetDbIdentityFromIdentityFile(opts, &db_id_in_file); if (s.ok() && !db_id_in_file.empty()) { if (db_id_.empty()) { // Loaded from file and wasn't already known from manifest diff --git a/db/db_impl/db_impl_open.cc b/db/db_impl/db_impl_open.cc index 8f073711ee..e902968249 100644 --- a/db/db_impl/db_impl_open.cc +++ b/db/db_impl/db_impl_open.cc @@ -301,7 +301,7 @@ Status DBImpl::NewDB(std::vector* new_filenames) { VersionEdit new_db_edit; const WriteOptions write_options(Env::IOActivity::kDBOpen); Status s = SetupDBId(write_options, /*read_only=*/false, /*is_new_db=*/true, - &new_db_edit); + /*is_retry=*/false, &new_db_edit); if (!s.ok()) { return s; } @@ -676,11 +676,11 @@ Status DBImpl::Recover( // Already set up DB ID in NewDB } else if (immutable_db_options_.write_dbid_to_manifest && recovery_ctx) { VersionEdit edit; - s = SetupDBId(write_options, read_only, is_new_db, &edit); + s = SetupDBId(write_options, read_only, is_new_db, is_retry, &edit); recovery_ctx->UpdateVersionEdits( versions_->GetColumnFamilySet()->GetDefault(), edit); } else { - s = SetupDBId(write_options, read_only, is_new_db, nullptr); + s = SetupDBId(write_options, read_only, is_new_db, is_retry, nullptr); } assert(!s.ok() || !db_id_.empty()); ROCKS_LOG_INFO(immutable_db_options_.info_log, "DB ID: %s\n", db_id_.c_str()); diff --git a/db/db_impl/db_impl_readonly.cc b/db/db_impl/db_impl_readonly.cc index e0d8d3b31a..3eedfbebb2 100644 --- a/db/db_impl/db_impl_readonly.cc +++ b/db/db_impl/db_impl_readonly.cc @@ -265,7 +265,8 @@ Status OpenForReadOnlyCheckExistence(const DBOptions& db_options, const std::shared_ptr& fs = db_options.env->GetFileSystem(); std::string manifest_path; uint64_t manifest_file_number; - s = VersionSet::GetCurrentManifestPath(dbname, fs.get(), &manifest_path, + s = VersionSet::GetCurrentManifestPath(dbname, fs.get(), /*is_retry=*/false, + &manifest_path, &manifest_file_number); } else { // Historic behavior that doesn't necessarily make sense diff --git a/db/db_io_failure_test.cc b/db/db_io_failure_test.cc index b72c259987..b38c4e4f42 100644 --- a/db/db_io_failure_test.cc +++ b/db/db_io_failure_test.cc @@ -27,12 +27,14 @@ class CorruptionFS : public FileSystemWrapper { num_writable_file_errors_(0), corruption_trigger_(INT_MAX), read_count_(0), + corrupt_offset_(0), + corrupt_len_(0), rnd_(300), fs_buffer_(fs_buffer), verify_read_(verify_read) {} ~CorruptionFS() override { // Assert that the corruption was reset, which means it got triggered - assert(corruption_trigger_ == INT_MAX); + assert(corruption_trigger_ == INT_MAX || corrupt_len_ > 0); } const char* Name() const override { return "ErrorEnv"; } @@ -48,8 +50,10 @@ class CorruptionFS : public FileSystemWrapper { } void SetCorruptionTrigger(const int trigger) { + MutexLock l(&mutex_); corruption_trigger_ = trigger; read_count_ = 0; + corrupt_fname_.clear(); } IOStatus NewRandomAccessFile(const std::string& fname, @@ -58,25 +62,31 @@ class CorruptionFS : public FileSystemWrapper { IODebugContext* dbg) override { class CorruptionRandomAccessFile : public FSRandomAccessFileOwnerWrapper { public: - CorruptionRandomAccessFile(CorruptionFS& fs, + CorruptionRandomAccessFile(CorruptionFS& fs, const std::string& fname, std::unique_ptr& file) - : FSRandomAccessFileOwnerWrapper(std::move(file)), fs_(fs) {} + : FSRandomAccessFileOwnerWrapper(std::move(file)), + fs_(fs), + fname_(fname) {} IOStatus Read(uint64_t offset, size_t len, const IOOptions& opts, Slice* result, char* scratch, IODebugContext* dbg) const override { IOStatus s = target()->Read(offset, len, opts, result, scratch, dbg); if (opts.verify_and_reconstruct_read) { + fs_.MaybeResetOverlapWithCorruptedChunk(fname_, offset, + result->size()); return s; } + + MutexLock l(&fs_.mutex_); if (s.ok() && ++fs_.read_count_ >= fs_.corruption_trigger_) { - fs_.read_count_ = 0; fs_.corruption_trigger_ = INT_MAX; char* data = const_cast(result->data()); std::memcpy( data, fs_.rnd_.RandomString(static_cast(result->size())).c_str(), result->size()); + fs_.SetCorruptedChunk(fname_, offset, result->size()); } return s; } @@ -101,14 +111,76 @@ class CorruptionFS : public FileSystemWrapper { return IOStatus::OK(); } + IOStatus Prefetch(uint64_t /*offset*/, size_t /*n*/, + const IOOptions& /*options*/, + IODebugContext* /*dbg*/) override { + return IOStatus::NotSupported("Prefetch"); + } + private: CorruptionFS& fs_; + std::string fname_; }; std::unique_ptr file; IOStatus s = target()->NewRandomAccessFile(fname, opts, &file, dbg); EXPECT_OK(s); - result->reset(new CorruptionRandomAccessFile(*this, file)); + result->reset(new CorruptionRandomAccessFile(*this, fname, file)); + + return s; + } + + IOStatus NewSequentialFile(const std::string& fname, + const FileOptions& file_opts, + std::unique_ptr* result, + IODebugContext* dbg) override { + class CorruptionSequentialFile : public FSSequentialFileOwnerWrapper { + public: + CorruptionSequentialFile(CorruptionFS& fs, const std::string& fname, + std::unique_ptr& file) + : FSSequentialFileOwnerWrapper(std::move(file)), + fs_(fs), + fname_(fname), + offset_(0) {} + + IOStatus Read(size_t len, const IOOptions& opts, Slice* result, + char* scratch, IODebugContext* dbg) override { + IOStatus s = target()->Read(len, opts, result, scratch, dbg); + if (result->size() == 0 || + fname_.find("IDENTITY") != std::string::npos) { + return s; + } + + if (opts.verify_and_reconstruct_read) { + fs_.MaybeResetOverlapWithCorruptedChunk(fname_, offset_, + result->size()); + return s; + } + + MutexLock l(&fs_.mutex_); + if (s.ok() && ++fs_.read_count_ >= fs_.corruption_trigger_) { + fs_.corruption_trigger_ = INT_MAX; + char* data = const_cast(result->data()); + std::memcpy( + data, + fs_.rnd_.RandomString(static_cast(result->size())).c_str(), + result->size()); + fs_.SetCorruptedChunk(fname_, offset_, result->size()); + } + offset_ += result->size(); + return s; + } + + private: + CorruptionFS& fs_; + std::string fname_; + size_t offset_; + }; + + std::unique_ptr file; + IOStatus s = target()->NewSequentialFile(fname, file_opts, &file, dbg); + EXPECT_OK(s); + result->reset(new CorruptionSequentialFile(*this, fname, file)); return s; } @@ -123,12 +195,40 @@ class CorruptionFS : public FileSystemWrapper { } } + void SetCorruptedChunk(const std::string& fname, size_t offset, size_t len) { + assert(corrupt_fname_.empty()); + + corrupt_fname_ = fname; + corrupt_offset_ = offset; + corrupt_len_ = len; + } + + void MaybeResetOverlapWithCorruptedChunk(const std::string& fname, + size_t offset, size_t len) { + if (fname == corrupt_fname_ && + ((offset <= corrupt_offset_ && (offset + len) > corrupt_offset_) || + (offset >= corrupt_offset_ && + offset < (corrupt_offset_ + corrupt_len_)))) { + corrupt_fname_.clear(); + } + } + + bool VerifyRetry() { return corrupt_len_ > 0 && corrupt_fname_.empty(); } + + int read_count() { return read_count_; } + + int corruption_trigger() { return corruption_trigger_; } + private: int corruption_trigger_; int read_count_; + std::string corrupt_fname_; + size_t corrupt_offset_; + size_t corrupt_len_; Random rnd_; bool fs_buffer_; bool verify_read_; + port::Mutex mutex_; }; } // anonymous namespace @@ -717,6 +817,7 @@ class DBIOCorruptionTest bbto.num_file_reads_for_auto_readahead = 0; options_.table_factory.reset(NewBlockBasedTableFactory(bbto)); options_.disable_auto_compactions = true; + options_.max_file_opening_threads = 0; Reopen(options_); } @@ -857,8 +958,8 @@ TEST_P(DBIOCorruptionTest, FlushReadCorruptionRetry) { Status s = Flush(); if (std::get<2>(GetParam())) { ASSERT_OK(s); - ASSERT_EQ(stats()->getTickerCount(FILE_READ_CORRUPTION_RETRY_COUNT), 1); - ASSERT_EQ(stats()->getTickerCount(FILE_READ_CORRUPTION_RETRY_SUCCESS_COUNT), + ASSERT_GT(stats()->getTickerCount(FILE_READ_CORRUPTION_RETRY_COUNT), 1); + ASSERT_GT(stats()->getTickerCount(FILE_READ_CORRUPTION_RETRY_SUCCESS_COUNT), 1); std::string val; @@ -885,8 +986,8 @@ TEST_P(DBIOCorruptionTest, ManifestCorruptionRetry) { if (std::get<2>(GetParam())) { ASSERT_OK(ReopenDB()); - ASSERT_EQ(stats()->getTickerCount(FILE_READ_CORRUPTION_RETRY_COUNT), 1); - ASSERT_EQ(stats()->getTickerCount(FILE_READ_CORRUPTION_RETRY_SUCCESS_COUNT), + ASSERT_GT(stats()->getTickerCount(FILE_READ_CORRUPTION_RETRY_COUNT), 1); + ASSERT_GT(stats()->getTickerCount(FILE_READ_CORRUPTION_RETRY_SUCCESS_COUNT), 1); } else { ASSERT_EQ(ReopenDB(), Status::Corruption()); @@ -970,6 +1071,57 @@ TEST_P(DBIOCorruptionTest, TablePropertiesCorruptionRetry) { ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks(); } +TEST_P(DBIOCorruptionTest, DBOpenReadCorruptionRetry) { + if (!std::get<2>(GetParam())) { + return; + } + CorruptionFS* fs = + static_cast(env_guard_->GetFileSystem().get()); + + for (int sst = 0; sst < 3; ++sst) { + for (int key = 0; key < 100; ++key) { + std::stringstream ss; + ss << std::setw(3) << 100 * sst + key; + ASSERT_OK(Put("key" + ss.str(), "val" + ss.str())); + } + ASSERT_OK(Flush()); + } + Close(); + + // DB open will create table readers unless we reduce the table cache + // capacity. + // SanitizeOptions will set max_open_files to minimum of 20. Table cache + // is allocated with max_open_files - 10 as capacity. So override + // max_open_files to 11 so table cache capacity will become 1. This will + // prevent file open during DB open and force the file to be opened + // during MultiGet + SyncPoint::GetInstance()->SetCallBack( + "SanitizeOptions::AfterChangeMaxOpenFiles", [&](void* arg) { + int* max_open_files = (int*)arg; + *max_open_files = 11; + }); + SyncPoint::GetInstance()->EnableProcessing(); + + // Progressively increase the IO count trigger for corruption, and verify + // that it was retried + int corruption_trigger = 1; + fs->SetCorruptionTrigger(corruption_trigger); + do { + fs->SetCorruptionTrigger(corruption_trigger); + ASSERT_OK(ReopenDB()); + for (int sst = 0; sst < 3; ++sst) { + for (int key = 0; key < 100; ++key) { + std::stringstream ss; + ss << std::setw(3) << 100 * sst + key; + ASSERT_EQ(Get("key" + ss.str()), "val" + ss.str()); + } + } + // Verify that the injected corruption was repaired + ASSERT_TRUE(fs->VerifyRetry()); + corruption_trigger++; + } while (fs->corruption_trigger() == INT_MAX); +} + // The parameters are - 1. Use FS provided buffer, 2. Use async IO ReadOption, // 3. Retry with verify_and_reconstruct_read IOOption INSTANTIATE_TEST_CASE_P(DBIOCorruptionTest, DBIOCorruptionTest, diff --git a/db/version_set.cc b/db/version_set.cc index 457fddd6a8..d3b461de6f 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -6012,15 +6012,19 @@ Status VersionSet::LogAndApplyHelper(ColumnFamilyData* cfd, } Status VersionSet::GetCurrentManifestPath(const std::string& dbname, - FileSystem* fs, + FileSystem* fs, bool is_retry, std::string* manifest_path, uint64_t* manifest_file_number) { assert(fs != nullptr); assert(manifest_path != nullptr); assert(manifest_file_number != nullptr); + IOOptions opts; std::string fname; - Status s = ReadFileToString(fs, CurrentFileName(dbname), &fname); + if (is_retry) { + opts.verify_and_reconstruct_read = true; + } + Status s = ReadFileToString(fs, CurrentFileName(dbname), opts, &fname); if (!s.ok()) { return s; } @@ -6050,8 +6054,8 @@ Status VersionSet::Recover( // Read "CURRENT" file, which contains a pointer to the current manifest // file std::string manifest_path; - Status s = GetCurrentManifestPath(dbname_, fs_.get(), &manifest_path, - &manifest_file_number_); + Status s = GetCurrentManifestPath(dbname_, fs_.get(), is_retry, + &manifest_path, &manifest_file_number_); if (!s.ok()) { return s; } @@ -6296,8 +6300,8 @@ Status VersionSet::ListColumnFamilies(std::vector* column_families, // Read "CURRENT" file, which contains a pointer to the current manifest file std::string manifest_path; uint64_t manifest_file_number; - Status s = - GetCurrentManifestPath(dbname, fs, &manifest_path, &manifest_file_number); + Status s = GetCurrentManifestPath(dbname, fs, /*is_retry=*/false, + &manifest_path, &manifest_file_number); if (!s.ok()) { return s; } @@ -7495,8 +7499,8 @@ Status ReactiveVersionSet::MaybeSwitchManifest( assert(manifest_reader != nullptr); Status s; std::string manifest_path; - s = GetCurrentManifestPath(dbname_, fs_.get(), &manifest_path, - &manifest_file_number_); + s = GetCurrentManifestPath(dbname_, fs_.get(), /*is_retry=*/false, + &manifest_path, &manifest_file_number_); if (!s.ok()) { return s; } diff --git a/db/version_set.h b/db/version_set.h index 18a2af62f5..95f0e093f1 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -1278,7 +1278,7 @@ class VersionSet { {}); static Status GetCurrentManifestPath(const std::string& dbname, - FileSystem* fs, + FileSystem* fs, bool is_retry, std::string* manifest_filename, uint64_t* manifest_file_number); void WakeUpWaitingManifestWriters(); diff --git a/db/version_set_test.cc b/db/version_set_test.cc index 04b9503e77..ae15a4637b 100644 --- a/db/version_set_test.cc +++ b/db/version_set_test.cc @@ -1223,7 +1223,7 @@ class VersionSetTestBase { tmp_db_options.env = env_; std::unique_ptr impl(new DBImpl(tmp_db_options, dbname_)); std::string db_id; - ASSERT_OK(impl->GetDbIdentityFromIdentityFile(&db_id)); + ASSERT_OK(impl->GetDbIdentityFromIdentityFile(IOOptions(), &db_id)); new_db.SetDBId(db_id); } new_db.SetLogNumber(0); @@ -1391,7 +1391,8 @@ class VersionSetTestBase { assert(manifest_path != nullptr); uint64_t manifest_file_number = 0; Status s = versions_->GetCurrentManifestPath( - dbname_, fs_.get(), manifest_path, &manifest_file_number); + dbname_, fs_.get(), /*is_retry=*/false, manifest_path, + &manifest_file_number); ASSERT_OK(s); } @@ -1399,7 +1400,8 @@ class VersionSetTestBase { assert(manifest_path != nullptr); uint64_t manifest_file_number = 0; Status s = versions_->GetCurrentManifestPath( - dbname_, fs_.get(), manifest_path, &manifest_file_number); + dbname_, fs_.get(), /*is_retry=*/false, manifest_path, + &manifest_file_number); ASSERT_OK(s); ASSERT_EQ(1, manifest_file_number); } @@ -3515,7 +3517,7 @@ class VersionSetTestEmptyDb tmp_db_options.env = env_; std::unique_ptr impl(new DBImpl(tmp_db_options, dbname_)); std::string db_id; - ASSERT_OK(impl->GetDbIdentityFromIdentityFile(&db_id)); + ASSERT_OK(impl->GetDbIdentityFromIdentityFile(IOOptions(), &db_id)); new_db.SetDBId(db_id); } const std::string manifest_path = DescriptorFileName(dbname_, 1); @@ -3839,7 +3841,7 @@ class VersionSetTestMissingFiles : public VersionSetTestBase, tmp_db_options.env = env_; std::unique_ptr impl(new DBImpl(tmp_db_options, dbname_)); std::string db_id; - ASSERT_OK(impl->GetDbIdentityFromIdentityFile(&db_id)); + ASSERT_OK(impl->GetDbIdentityFromIdentityFile(IOOptions(), &db_id)); new_db.SetDBId(db_id); } { diff --git a/env/file_system.cc b/env/file_system.cc index 1f02f7a7ee..fad48cc117 100644 --- a/env/file_system.cc +++ b/env/file_system.cc @@ -200,6 +200,11 @@ IOStatus WriteStringToFile(FileSystem* fs, const Slice& data, IOStatus ReadFileToString(FileSystem* fs, const std::string& fname, std::string* data) { + return ReadFileToString(fs, fname, IOOptions(), data); +} + +IOStatus ReadFileToString(FileSystem* fs, const std::string& fname, + const IOOptions& opts, std::string* data) { FileOptions soptions; data->clear(); std::unique_ptr file; @@ -212,7 +217,7 @@ IOStatus ReadFileToString(FileSystem* fs, const std::string& fname, char* space = new char[kBufferSize]; while (true) { Slice fragment; - s = file->Read(kBufferSize, IOOptions(), &fragment, space, nullptr); + s = file->Read(kBufferSize, opts, &fragment, space, nullptr); if (!s.ok()) { break; } diff --git a/include/rocksdb/file_system.h b/include/rocksdb/file_system.h index 042b38305c..9d4aaba8f2 100644 --- a/include/rocksdb/file_system.h +++ b/include/rocksdb/file_system.h @@ -1961,4 +1961,8 @@ IOStatus WriteStringToFile(FileSystem* fs, const Slice& data, IOStatus ReadFileToString(FileSystem* fs, const std::string& fname, std::string* data); +// A utility routine: read contents of named file into *data +IOStatus ReadFileToString(FileSystem* fs, const std::string& fname, + const IOOptions& opts, std::string* data); + } // namespace ROCKSDB_NAMESPACE diff --git a/options/options_parser.cc b/options/options_parser.cc index 408e6d7ab9..e5abf3b56d 100644 --- a/options/options_parser.cc +++ b/options/options_parser.cc @@ -12,6 +12,7 @@ #include #include +#include "file/file_util.h" #include "file/line_file_reader.h" #include "file/writable_file_writer.h" #include "options/cf_options.h" @@ -268,70 +269,89 @@ Status RocksDBOptionsParser::Parse(const ConfigOptions& config_options_in, Reset(); ConfigOptions config_options = config_options_in; - std::unique_ptr seq_file; - Status s = fs->NewSequentialFile(file_name, FileOptions(), &seq_file, - nullptr); - if (!s.ok()) { - return s; - } - LineFileReader lf_reader(std::move(seq_file), file_name, - config_options.file_readahead_size); - - OptionSection section = kOptionSectionUnknown; - std::string title; - std::string argument; - std::unordered_map opt_map; - std::string line; - // we only support single-lined statement. - while (lf_reader.ReadLine(&line, Env::IO_TOTAL /* rate_limiter_priority */)) { - int line_num = static_cast(lf_reader.GetLineNumber()); - line = TrimAndRemoveComment(line); - if (line.empty()) { - continue; + Status s; + bool retry = false; + do { + std::unique_ptr seq_file; + s = fs->NewSequentialFile(file_name, FileOptions(), &seq_file, nullptr); + if (!s.ok()) { + return s; } - if (IsSection(line)) { + + LineFileReader lf_reader( + std::move(seq_file), file_name, config_options.file_readahead_size, + nullptr, std::vector>{}, nullptr, retry); + + OptionSection section = kOptionSectionUnknown; + std::string title; + std::string argument; + std::unordered_map opt_map; + std::string line; + // we only support single-lined statement. + while ( + lf_reader.ReadLine(&line, Env::IO_TOTAL /* rate_limiter_priority */)) { + int line_num = static_cast(lf_reader.GetLineNumber()); + line = TrimAndRemoveComment(line); + if (line.empty()) { + continue; + } + if (IsSection(line)) { + s = EndSection(config_options, section, title, argument, opt_map); + opt_map.clear(); + if (!s.ok()) { + break; + } + + // If the option file is not generated by a higher version, unknown + // option should only mean corruption. + if (config_options.ignore_unknown_options && + section == kOptionSectionVersion) { + using VTuple = std::tuple; + if (VTuple(db_version[0], db_version[1], db_version[2]) <= + VTuple(ROCKSDB_MAJOR, ROCKSDB_MINOR, ROCKSDB_PATCH)) { + config_options.ignore_unknown_options = false; + } + } + + s = ParseSection(§ion, &title, &argument, line, line_num); + if (!s.ok()) { + break; + } + } else { + std::string name; + std::string value; + s = ParseStatement(&name, &value, line, line_num); + if (!s.ok()) { + break; + } + opt_map.insert({name, value}); + } + } + if (s.ok()) { + s = lf_reader.GetStatus(); + } + if (s.ok()) { s = EndSection(config_options, section, title, argument, opt_map); opt_map.clear(); - if (!s.ok()) { - return s; - } - - // If the option file is not generated by a higher version, unknown - // option should only mean corruption. - if (config_options.ignore_unknown_options && - section == kOptionSectionVersion) { - using VTuple = std::tuple; - if (VTuple(db_version[0], db_version[1], db_version[2]) <= - VTuple(ROCKSDB_MAJOR, ROCKSDB_MINOR, ROCKSDB_PATCH)) { - config_options.ignore_unknown_options = false; - } - } - - s = ParseSection(§ion, &title, &argument, line, line_num); - if (!s.ok()) { + } + if (s.ok()) { + s = ValidityCheck(); + } + if (!s.ok()) { + if ((s.IsCorruption() || s.IsInvalidArgument()) && !retry && + CheckFSFeatureSupport(fs, + FSSupportedOps::kVerifyAndReconstructRead)) { + retry = true; + Reset(); + } else { return s; } } else { - std::string name; - std::string value; - s = ParseStatement(&name, &value, line, line_num); - if (!s.ok()) { - return s; - } - opt_map.insert({name, value}); + return s; } - } - s = lf_reader.GetStatus(); - if (!s.ok()) { - return s; - } + } while (retry); - s = EndSection(config_options, section, title, argument, opt_map); - opt_map.clear(); - if (!s.ok()) { - return s; - } - return ValidityCheck(); + return s; } Status RocksDBOptionsParser::CheckSection(const OptionSection section, diff --git a/table/format.cc b/table/format.cc index 7e1c2817dd..46de42fbe9 100644 --- a/table/format.cc +++ b/table/format.cc @@ -560,9 +560,9 @@ Status ReadFooterFromFile(const IOOptions& opts, RandomAccessFileReader* file, IOOptions new_opts = opts; new_opts.verify_and_reconstruct_read = true; footer->Reset(); - s = ReadFooterFromFileInternal(new_opts, file, fs, prefetch_buffer, - file_size, footer, - enforce_table_magic_number); + s = ReadFooterFromFileInternal(new_opts, file, fs, + /*prefetch_buffer=*/nullptr, file_size, + footer, enforce_table_magic_number); RecordTick(stats, FILE_READ_CORRUPTION_RETRY_COUNT); if (s.ok()) { RecordTick(stats, FILE_READ_CORRUPTION_RETRY_SUCCESS_COUNT); diff --git a/table/meta_blocks.cc b/table/meta_blocks.cc index 28923fb563..7d6ab76e29 100644 --- a/table/meta_blocks.cc +++ b/table/meta_blocks.cc @@ -573,7 +573,7 @@ Status ReadMetaIndexBlockInFile(RandomAccessFileReader* file, return s; } s = ReadFooterFromFile(opts, file, *ioptions.fs, prefetch_buffer, file_size, - &footer, table_magic_number); + &footer, table_magic_number, ioptions.stats); if (!s.ok()) { return s; } diff --git a/unreleased_history/bug_fixes/missing_strong_reads.md b/unreleased_history/bug_fixes/missing_strong_reads.md new file mode 100644 index 0000000000..b0d10c4b15 --- /dev/null +++ b/unreleased_history/bug_fixes/missing_strong_reads.md @@ -0,0 +1 @@ +Fix missing cases of corruption retry during DB open and read API processing.