From a28182233109f09e23e6336b5ed860ddf2c8eece Mon Sep 17 00:00:00 2001 From: Pratik Dhandharia Date: Thu, 29 Aug 2019 14:06:07 -0700 Subject: [PATCH] Lower the risk for users to run options.force_consistency_checks = true (#5744) Summary: Open-source users recently reported two occurrences of LSM-tree corruption (https://github.com/facebook/rocksdb/issues/5558 is one), which would be caught by options.force_consistency_checks = true. options.force_consistency_checks has a usability limitation because it crashes the service once inconsistency is detected. This makes the feature hard to use. Most users serve from multiple RocksDB shards per server and the impacts of crashing the service is higher than it should be. Instead, we just pass the error back to users without killing the service, and ask them to deal with the problem accordingly. Pull Request resolved: https://github.com/facebook/rocksdb/pull/5744 Differential Revision: D17096940 Pulled By: pdhandharia fbshipit-source-id: b6780039044e265f26ed2ad03c51f4abbe8b603c --- HISTORY.md | 1 + db/db_compaction_test.cc | 24 +++++++++++ db/version_builder.cc | 86 ++++++++++++++++++++++++++++------------ db/version_builder.h | 10 ++--- db/version_set.cc | 43 +++++++++++++++----- db/version_set.h | 4 +- 6 files changed, 127 insertions(+), 41 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index da06ca2280..9bb717423b 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -6,6 +6,7 @@ * Fix bloom filter lookups by the MultiGet batching API when BlockBasedTableOptions::whole_key_filtering is false, by checking that a key is in the perfix_extractor domain and extracting the prefix before looking up. ### New Features * VerifyChecksum() by default will issue readahead. Allow ReadOptions to be passed in to those functions to override the readhead size. For checksum verifying before external SST file ingestion, a new option IngestExternalFileOptions.verify_checksums_readahead_size, is added for this readahead setting. +* When user uses options.force_consistency_check in RocksDb, instead of crashing the process, we now pass the error back to the users without killing the process. ### Public API Change * Added max_write_buffer_size_to_maintain option to better control memory usage of immutable memtables. diff --git a/db/db_compaction_test.cc b/db/db_compaction_test.cc index 45ba170113..49307bc783 100644 --- a/db/db_compaction_test.cc +++ b/db/db_compaction_test.cc @@ -4658,7 +4658,31 @@ TEST_P(DBCompactionTestWithParam, FixFileIngestionCompactionDeadlock) { ASSERT_OK(dbfull()->TEST_WaitForCompact()); Close(); } +TEST_F(DBCompactionTest, ConsistencyFailTest) { + Options options = CurrentOptions(); + DestroyAndReopen(options); + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "VersionBuilder::CheckConsistency", [&](void* arg) { + auto p = + reinterpret_cast*>(arg); + // just swap the two FileMetaData so that we hit error + // in CheckConsistency funcion + FileMetaData* temp = *(p->first); + *(p->first) = *(p->second); + *(p->second) = temp; + }); + + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + + for (int k = 0; k < 2; ++k) { + ASSERT_OK(Put("foo", "bar")); + Flush(); + } + + ASSERT_NOK(Put("foo", "bar")); + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); +} #endif // !defined(ROCKSDB_LITE) } // namespace rocksdb diff --git a/db/version_builder.cc b/db/version_builder.cc index 9d2ba9ab4e..b97853f2d6 100644 --- a/db/version_builder.cc +++ b/db/version_builder.cc @@ -27,6 +27,7 @@ #include "db/version_set.h" #include "port/port.h" #include "table/table_reader.h" +#include "util/string_util.h" namespace rocksdb { @@ -138,12 +139,12 @@ class VersionBuilder::Rep { } } - void CheckConsistency(VersionStorageInfo* vstorage) { + Status CheckConsistency(VersionStorageInfo* vstorage) { #ifdef NDEBUG if (!vstorage->force_consistency_checks()) { // Dont run consistency checks in release mode except if // explicitly asked to - return; + return Status::OK(); } #endif // make sure the files are sorted correctly @@ -152,10 +153,14 @@ class VersionBuilder::Rep { for (size_t i = 1; i < level_files.size(); i++) { auto f1 = level_files[i - 1]; auto f2 = level_files[i]; +#ifndef NDEBUG + auto pair = std::make_pair(&f1, &f2); + TEST_SYNC_POINT_CALLBACK("VersionBuilder::CheckConsistency", &pair); +#endif if (level == 0) { if (!level_zero_cmp_(f1, f2)) { fprintf(stderr, "L0 files are not sorted properly"); - abort(); + return Status::Corruption("L0 files are not sorted properly"); } if (f2->fd.smallest_seqno == f2->fd.largest_seqno) { @@ -168,7 +173,14 @@ class VersionBuilder::Rep { " vs. file with global_seqno %" PRIu64 "\n", f1->fd.smallest_seqno, f1->fd.largest_seqno, external_file_seqno); - abort(); + return Status::Corruption("L0 file with seqno " + + NumberToString(f1->fd.smallest_seqno) + + " " + + NumberToString(f1->fd.largest_seqno) + + " vs. file with global_seqno" + + NumberToString(external_file_seqno) + + " with fileNumber " + + NumberToString(f1->fd.GetNumber())); } } else if (f1->fd.smallest_seqno <= f2->fd.smallest_seqno) { fprintf(stderr, @@ -176,12 +188,19 @@ class VersionBuilder::Rep { " %" PRIu64 "\n", f1->fd.smallest_seqno, f1->fd.largest_seqno, f2->fd.smallest_seqno, f2->fd.largest_seqno); - abort(); + return Status::Corruption( + "L0 files seqno " + NumberToString(f1->fd.smallest_seqno) + + " " + NumberToString(f1->fd.largest_seqno) + " " + + NumberToString(f1->fd.GetNumber()) + " vs. " + + NumberToString(f2->fd.smallest_seqno) + " " + + NumberToString(f2->fd.largest_seqno) + " " + + NumberToString(f2->fd.GetNumber())); } } else { if (!level_nonzero_cmp_(f1, f2)) { fprintf(stderr, "L%d files are not sorted properly", level); - abort(); + return Status::Corruption("L" + NumberToString(level) + + " files are not sorted properly"); } // Make sure there is no overlap in levels > 0 @@ -190,20 +209,24 @@ class VersionBuilder::Rep { fprintf(stderr, "L%d have overlapping ranges %s vs. %s\n", level, (f1->largest).DebugString(true).c_str(), (f2->smallest).DebugString(true).c_str()); - abort(); + return Status::Corruption( + "L" + NumberToString(level) + " have overlapping ranges " + + (f1->largest).DebugString(true) + " vs. " + + (f2->smallest).DebugString(true)); } } } } + return Status::OK(); } - void CheckConsistencyForDeletes(VersionEdit* /*edit*/, uint64_t number, - int level) { + Status CheckConsistencyForDeletes(VersionEdit* /*edit*/, uint64_t number, + int level) { #ifdef NDEBUG if (!base_vstorage_->force_consistency_checks()) { // Dont run consistency checks in release mode except if // explicitly asked to - return; + return Status::OK(); } #endif // a file to be deleted better exist in the previous version @@ -241,8 +264,9 @@ class VersionBuilder::Rep { } if (!found) { fprintf(stderr, "not found %" PRIu64 "\n", number); - abort(); + return Status::Corruption("not found " + NumberToString(number)); } + return Status::OK(); } bool CheckConsistencyForNumLevels() { @@ -259,8 +283,11 @@ class VersionBuilder::Rep { } // Apply all of the edits in *edit to the current state. - void Apply(VersionEdit* edit) { - CheckConsistency(base_vstorage_); + Status Apply(VersionEdit* edit) { + Status s = CheckConsistency(base_vstorage_); + if (!s.ok()) { + return s; + } // Delete files const VersionEdit::DeletedFileSet& del = edit->GetDeletedFiles(); @@ -308,12 +335,20 @@ class VersionBuilder::Rep { } } } + return s; } // Save the current state in *v. - void SaveTo(VersionStorageInfo* vstorage) { - CheckConsistency(base_vstorage_); - CheckConsistency(vstorage); + Status SaveTo(VersionStorageInfo* vstorage) { + Status s = CheckConsistency(base_vstorage_); + if (!s.ok()) { + return s; + } + + s = CheckConsistency(vstorage); + if (!s.ok()) { + return s; + } for (int level = 0; level < num_levels_; level++) { const auto& cmp = (level == 0) ? level_zero_cmp_ : level_nonzero_cmp_; @@ -357,7 +392,8 @@ class VersionBuilder::Rep { } } - CheckConsistency(vstorage); + s = CheckConsistency(vstorage); + return s; } Status LoadTableHandlers(InternalStats* internal_stats, int max_threads, @@ -475,23 +511,23 @@ VersionBuilder::VersionBuilder(const EnvOptions& env_options, VersionBuilder::~VersionBuilder() { delete rep_; } -void VersionBuilder::CheckConsistency(VersionStorageInfo* vstorage) { - rep_->CheckConsistency(vstorage); +Status VersionBuilder::CheckConsistency(VersionStorageInfo* vstorage) { + return rep_->CheckConsistency(vstorage); } -void VersionBuilder::CheckConsistencyForDeletes(VersionEdit* edit, - uint64_t number, int level) { - rep_->CheckConsistencyForDeletes(edit, number, level); +Status VersionBuilder::CheckConsistencyForDeletes(VersionEdit* edit, + uint64_t number, int level) { + return rep_->CheckConsistencyForDeletes(edit, number, level); } bool VersionBuilder::CheckConsistencyForNumLevels() { return rep_->CheckConsistencyForNumLevels(); } -void VersionBuilder::Apply(VersionEdit* edit) { rep_->Apply(edit); } +Status VersionBuilder::Apply(VersionEdit* edit) { return rep_->Apply(edit); } -void VersionBuilder::SaveTo(VersionStorageInfo* vstorage) { - rep_->SaveTo(vstorage); +Status VersionBuilder::SaveTo(VersionStorageInfo* vstorage) { + return rep_->SaveTo(vstorage); } Status VersionBuilder::LoadTableHandlers( diff --git a/db/version_builder.h b/db/version_builder.h index 168301fdd6..f5fd121897 100644 --- a/db/version_builder.h +++ b/db/version_builder.h @@ -27,12 +27,12 @@ class VersionBuilder { VersionBuilder(const EnvOptions& env_options, TableCache* table_cache, VersionStorageInfo* base_vstorage, Logger* info_log = nullptr); ~VersionBuilder(); - void CheckConsistency(VersionStorageInfo* vstorage); - void CheckConsistencyForDeletes(VersionEdit* edit, uint64_t number, - int level); + Status CheckConsistency(VersionStorageInfo* vstorage); + Status CheckConsistencyForDeletes(VersionEdit* edit, uint64_t number, + int level); bool CheckConsistencyForNumLevels(); - void Apply(VersionEdit* edit); - void SaveTo(VersionStorageInfo* vstorage); + Status Apply(VersionEdit* edit); + Status SaveTo(VersionStorageInfo* vstorage); Status LoadTableHandlers(InternalStats* internal_stats, int max_threads, bool prefetch_index_and_filter_in_cache, bool is_initial_load, diff --git a/db/version_set.cc b/db/version_set.cc index ea6b0ab6a1..e7d334825f 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -3622,7 +3622,14 @@ Status VersionSet::ProcessManifestWrites( } else if (group_start != std::numeric_limits::max()) { group_start = std::numeric_limits::max(); } - LogAndApplyHelper(last_writer->cfd, builder, e, mu); + Status s = LogAndApplyHelper(last_writer->cfd, builder, e, mu); + if (!s.ok()) { + // free up the allocated memory + for (auto v : versions) { + delete v; + } + return s; + } batch_edits.push_back(e); } } @@ -3630,7 +3637,14 @@ Status VersionSet::ProcessManifestWrites( assert(!builder_guards.empty() && builder_guards.size() == versions.size()); auto* builder = builder_guards[i]->version_builder(); - builder->SaveTo(versions[i]->storage_info()); + Status s = builder->SaveTo(versions[i]->storage_info()); + if (!s.ok()) { + // free up the allocated memory + for (auto v : versions) { + delete v; + } + return s; + } } } @@ -4010,9 +4024,9 @@ void VersionSet::LogAndApplyCFHelper(VersionEdit* edit) { } } -void VersionSet::LogAndApplyHelper(ColumnFamilyData* cfd, - VersionBuilder* builder, VersionEdit* edit, - InstrumentedMutex* mu) { +Status VersionSet::LogAndApplyHelper(ColumnFamilyData* cfd, + VersionBuilder* builder, VersionEdit* edit, + InstrumentedMutex* mu) { #ifdef NDEBUG (void)cfd; #endif @@ -4036,7 +4050,9 @@ void VersionSet::LogAndApplyHelper(ColumnFamilyData* cfd, edit->SetLastSequence(db_options_->two_write_queues ? last_allocated_sequence_ : last_sequence_); - builder->Apply(edit); + Status s = builder->Apply(edit); + + return s; } Status VersionSet::ApplyOneVersionEditToBuilder( @@ -4129,7 +4145,10 @@ Status VersionSet::ApplyOneVersionEditToBuilder( // to builder auto builder = builders.find(edit.column_family_); assert(builder != builders.end()); - builder->second->version_builder()->Apply(&edit); + Status s = builder->second->version_builder()->Apply(&edit); + if (!s.ok()) { + return s; + } } return ExtractInfoFromVersionEdit( cfd, edit, have_log_number, log_number, have_prev_log_number, @@ -4748,7 +4767,10 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname, // to builder auto builder = builders.find(edit.column_family_); assert(builder != builders.end()); - builder->second->version_builder()->Apply(&edit); + s = builder->second->version_builder()->Apply(&edit); + if (!s.ok()) { + break; + } } if (cfd != nullptr && edit.has_log_number_) { @@ -5767,7 +5789,10 @@ Status ReactiveVersionSet::ApplyOneVersionEditToBuilder( } active_version_builders_.erase(builder_iter); } else { - builder->Apply(&edit); + Status s = builder->Apply(&edit); + if (!s.ok()) { + return s; + } } Status s = ExtractInfoFromVersionEdit( cfd, edit, have_log_number, log_number, have_prev_log_number, diff --git a/db/version_set.h b/db/version_set.h index 028c6ad1ba..766e071f4e 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -1154,8 +1154,8 @@ class VersionSet { const ColumnFamilyOptions* new_cf_options); void LogAndApplyCFHelper(VersionEdit* edit); - void LogAndApplyHelper(ColumnFamilyData* cfd, VersionBuilder* b, - VersionEdit* edit, InstrumentedMutex* mu); + Status LogAndApplyHelper(ColumnFamilyData* cfd, VersionBuilder* b, + VersionEdit* edit, InstrumentedMutex* mu); }; // ReactiveVersionSet represents a collection of versions of the column