From fb09ef05dcee8fa0ccaebf88ef1992858ea5a083 Mon Sep 17 00:00:00 2001 From: Yanqin Jin Date: Fri, 20 Mar 2020 19:17:54 -0700 Subject: [PATCH] Attempt to recover from db with missing table files (#6334) Summary: There are situations when RocksDB tries to recover, but the db is in an inconsistent state due to SST files referenced in the MANIFEST being missing. In this case, previous RocksDB will just fail the recovery and return a non-ok status. This PR enables another possibility. During recovery, RocksDB checks possible MANIFEST files, and try to recover to the most recent state without missing table file. `VersionSet::Recover()` applies version edits incrementally and "materializes" a version only when this version does not reference any missing table file. After processing the entire MANIFEST, the version created last will be the latest version. `DBImpl::Recover()` calls `VersionSet::Recover()`. Afterwards, WAL replay will *not* be performed. To use this capability, set `options.best_efforts_recovery = true` when opening the db. Best-efforts recovery is currently incompatible with atomic flush. Test plan (on devserver): ``` $make check $COMPILE_WITH_ASAN=1 make all && make check ``` Pull Request resolved: https://github.com/facebook/rocksdb/pull/6334 Reviewed By: anand1976 Differential Revision: D19778960 Pulled By: riversand963 fbshipit-source-id: c27ea80f29bc952e7d3311ecf5ee9c54393b40a8 --- CMakeLists.txt | 1 + HISTORY.md | 1 + TARGETS | 1 + db/column_family.cc | 8 +- db/column_family.h | 8 +- db/db_basic_test.cc | 152 ++++++ db/db_impl/db_impl.h | 11 + db/db_impl/db_impl_files.cc | 52 ++ db/db_impl/db_impl_open.cc | 22 +- db/version_builder.cc | 22 + db/version_builder.h | 17 + db/version_edit.h | 2 + db/version_edit_handler.cc | 576 +++++++++++++++++++++++ db/version_edit_handler.h | 123 +++++ db/version_set.cc | 219 ++++++++- db/version_set.h | 23 +- db/version_set_test.cc | 784 ++++++++++++++++++++++++++++++- env/mock_env.cc | 12 +- env/mock_env.h | 2 - file/filename.cc | 12 + file/filename.h | 8 + include/rocksdb/options.h | 10 + options/db_options.cc | 5 +- options/db_options.h | 1 + options/options_helper.cc | 4 + options/options_settable_test.cc | 3 +- src.mk | 1 + 27 files changed, 2014 insertions(+), 66 deletions(-) create mode 100644 db/version_edit_handler.cc create mode 100644 db/version_edit_handler.h diff --git a/CMakeLists.txt b/CMakeLists.txt index f2d60fa42c..23a068ee73 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -565,6 +565,7 @@ set(SOURCES db/trim_history_scheduler.cc db/version_builder.cc db/version_edit.cc + db/version_edit_handler.cc db/version_set.cc db/wal_manager.cc db/write_batch.cc diff --git a/HISTORY.md b/HISTORY.md index 7208b07e93..af2d631cf0 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -13,6 +13,7 @@ ### New Features * Basic support for user timestamp in iterator. Seek/SeekToFirst/Next and lower/upper bounds are supported. Reverse iteration is not supported. Merge is not considered. * When file lock failure when the lock is held by the current process, return acquiring time and thread ID in the error message. +* Added a new option, best_efforts_recovery (default: false), to allow database to open in a db dir with missing table files. During best efforts recovery, missing table files are ignored, and database recovers to the most recent state without missing table file. Cross-column-family consistency is not guaranteed even if WAL is enabled. ## 6.8.0 (02/24/2020) ### Java API Changes diff --git a/TARGETS b/TARGETS index d4ea69b001..429566eb6d 100644 --- a/TARGETS +++ b/TARGETS @@ -171,6 +171,7 @@ cpp_library( "db/trim_history_scheduler.cc", "db/version_builder.cc", "db/version_edit.cc", + "db/version_edit_handler.cc", "db/version_set.cc", "db/wal_manager.cc", "db/write_batch.cc", diff --git a/db/column_family.cc b/db/column_family.cc index 407e684a19..299c97b798 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -1397,8 +1397,8 @@ ColumnFamilySet::ColumnFamilySet(const std::string& dbname, const ImmutableDBOptions* db_options, const FileOptions& file_options, Cache* table_cache, - WriteBufferManager* write_buffer_manager, - WriteController* write_controller, + WriteBufferManager* _write_buffer_manager, + WriteController* _write_controller, BlockCacheTracer* const block_cache_tracer) : max_column_family_(0), dummy_cfd_(new ColumnFamilyData( @@ -1410,8 +1410,8 @@ ColumnFamilySet::ColumnFamilySet(const std::string& dbname, db_options_(db_options), file_options_(file_options), table_cache_(table_cache), - write_buffer_manager_(write_buffer_manager), - write_controller_(write_controller), + write_buffer_manager_(_write_buffer_manager), + write_controller_(_write_controller), block_cache_tracer_(block_cache_tracer) { // initialize linked list dummy_cfd_->prev_ = dummy_cfd_; diff --git a/db/column_family.h b/db/column_family.h index 342f6ba8c1..cd2546d72d 100644 --- a/db/column_family.h +++ b/db/column_family.h @@ -647,8 +647,8 @@ class ColumnFamilySet { ColumnFamilySet(const std::string& dbname, const ImmutableDBOptions* db_options, const FileOptions& file_options, Cache* table_cache, - WriteBufferManager* write_buffer_manager, - WriteController* write_controller, + WriteBufferManager* _write_buffer_manager, + WriteController* _write_controller, BlockCacheTracer* const block_cache_tracer); ~ColumnFamilySet(); @@ -678,6 +678,10 @@ class ColumnFamilySet { Cache* get_table_cache() { return table_cache_; } + WriteBufferManager* write_buffer_manager() { return write_buffer_manager_; } + + WriteController* write_controller() { return write_controller_; } + private: friend class ColumnFamilyData; // helper function that gets called from cfd destructor diff --git a/db/db_basic_test.cc b/db/db_basic_test.cc index c33690e275..f67f04425d 100644 --- a/db/db_basic_test.cc +++ b/db/db_basic_test.cc @@ -1734,6 +1734,158 @@ TEST_F(DBBasicTest, MultiGetIOBufferOverrun) { keys.data(), values.data(), statuses.data(), true); } +TEST_F(DBBasicTest, IncrementalRecoveryNoCorrupt) { + Options options = CurrentOptions(); + DestroyAndReopen(options); + CreateAndReopenWithCF({"pikachu", "eevee"}, options); + size_t num_cfs = handles_.size(); + ASSERT_EQ(3, num_cfs); + WriteOptions write_opts; + write_opts.disableWAL = true; + for (size_t cf = 0; cf != num_cfs; ++cf) { + for (size_t i = 0; i != 10000; ++i) { + std::string key_str = Key(static_cast(i)); + std::string value_str = std::to_string(cf) + "_" + std::to_string(i); + + ASSERT_OK(Put(static_cast(cf), key_str, value_str)); + if (0 == (i % 1000)) { + ASSERT_OK(Flush(static_cast(cf))); + } + } + } + for (size_t cf = 0; cf != num_cfs; ++cf) { + ASSERT_OK(Flush(static_cast(cf))); + } + Close(); + options.best_efforts_recovery = true; + ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu", "eevee"}, + options); + num_cfs = handles_.size(); + ASSERT_EQ(3, num_cfs); + for (size_t cf = 0; cf != num_cfs; ++cf) { + for (int i = 0; i != 10000; ++i) { + std::string key_str = Key(static_cast(i)); + std::string expected_value_str = + std::to_string(cf) + "_" + std::to_string(i); + ASSERT_EQ(expected_value_str, Get(static_cast(cf), key_str)); + } + } +} + +namespace { +class TableFileListener : public EventListener { + public: + void OnTableFileCreated(const TableFileCreationInfo& info) override { + InstrumentedMutexLock lock(&mutex_); + cf_to_paths_[info.cf_name].push_back(info.file_path); + } + std::vector& GetFiles(const std::string& cf_name) { + InstrumentedMutexLock lock(&mutex_); + return cf_to_paths_[cf_name]; + } + + private: + InstrumentedMutex mutex_; + std::unordered_map> cf_to_paths_; +}; +} // namespace + +TEST_F(DBBasicTest, RecoverWithMissingFiles) { + Options options = CurrentOptions(); + DestroyAndReopen(options); + TableFileListener* listener = new TableFileListener(); + // Disable auto compaction to simplify SST file name tracking. + options.disable_auto_compactions = true; + options.listeners.emplace_back(listener); + CreateAndReopenWithCF({"pikachu", "eevee"}, options); + std::vector all_cf_names = {kDefaultColumnFamilyName, "pikachu", + "eevee"}; + size_t num_cfs = handles_.size(); + ASSERT_EQ(3, num_cfs); + for (size_t cf = 0; cf != num_cfs; ++cf) { + ASSERT_OK(Put(static_cast(cf), "a", "0_value")); + ASSERT_OK(Flush(static_cast(cf))); + ASSERT_OK(Put(static_cast(cf), "b", "0_value")); + ASSERT_OK(Flush(static_cast(cf))); + ASSERT_OK(Put(static_cast(cf), "c", "0_value")); + ASSERT_OK(Flush(static_cast(cf))); + } + + // Delete files + for (size_t i = 0; i < all_cf_names.size(); ++i) { + std::vector& files = listener->GetFiles(all_cf_names[i]); + ASSERT_EQ(3, files.size()); + for (int j = static_cast(files.size() - 1); j >= static_cast(i); + --j) { + ASSERT_OK(env_->DeleteFile(files[j])); + } + } + options.best_efforts_recovery = true; + ReopenWithColumnFamilies(all_cf_names, options); + // Verify data + ReadOptions read_opts; + read_opts.total_order_seek = true; + { + std::unique_ptr iter(db_->NewIterator(read_opts, handles_[0])); + iter->SeekToFirst(); + ASSERT_FALSE(iter->Valid()); + iter.reset(db_->NewIterator(read_opts, handles_[1])); + iter->SeekToFirst(); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ("a", iter->key()); + iter->Next(); + ASSERT_FALSE(iter->Valid()); + iter.reset(db_->NewIterator(read_opts, handles_[2])); + iter->SeekToFirst(); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ("a", iter->key()); + iter->Next(); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ("b", iter->key()); + iter->Next(); + ASSERT_FALSE(iter->Valid()); + } +} + +TEST_F(DBBasicTest, SkipWALIfMissingTableFiles) { + Options options = CurrentOptions(); + DestroyAndReopen(options); + TableFileListener* listener = new TableFileListener(); + options.listeners.emplace_back(listener); + CreateAndReopenWithCF({"pikachu"}, options); + std::vector kAllCfNames = {kDefaultColumnFamilyName, "pikachu"}; + size_t num_cfs = handles_.size(); + ASSERT_EQ(2, num_cfs); + for (int cf = 0; cf < static_cast(kAllCfNames.size()); ++cf) { + ASSERT_OK(Put(cf, "a", "0_value")); + ASSERT_OK(Flush(cf)); + ASSERT_OK(Put(cf, "b", "0_value")); + } + // Delete files + for (size_t i = 0; i < kAllCfNames.size(); ++i) { + std::vector& files = listener->GetFiles(kAllCfNames[i]); + ASSERT_EQ(1, files.size()); + for (int j = static_cast(files.size() - 1); j >= static_cast(i); + --j) { + ASSERT_OK(env_->DeleteFile(files[j])); + } + } + options.best_efforts_recovery = true; + ReopenWithColumnFamilies(kAllCfNames, options); + // Verify WAL is not applied + ReadOptions read_opts; + read_opts.total_order_seek = true; + std::unique_ptr iter(db_->NewIterator(read_opts, handles_[0])); + iter->SeekToFirst(); + ASSERT_FALSE(iter->Valid()); + iter.reset(db_->NewIterator(read_opts, handles_[1])); + iter->SeekToFirst(); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ("a", iter->key()); + iter->Next(); + ASSERT_FALSE(iter->Valid()); +} + class DBBasicTestWithParallelIO : public DBTestBase, public testing::WithParamInterface> { diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index a22b209819..978ee8c9c6 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -1124,6 +1124,17 @@ class DBImpl : public DB { virtual bool OwnTablesAndLogs() const { return true; } + // REQUIRES: db mutex held when calling this function, but the db mutex can + // be released and re-acquired. Db mutex will be held when the function + // returns. + // Currently, this function should be called only in best-efforts recovery + // mode. + // After best-efforts recovery, there may be SST files in db/cf paths that are + // not referenced in the MANIFEST. We delete these SST files. In the + // meantime, we find out the largest file number present in the paths, and + // bump up the version set's next_file_number_ to be 1 + largest_file_number. + Status CleanupFilesAfterRecovery(); + private: friend class DB; friend class ErrorHandler; diff --git a/db/db_impl/db_impl_files.cc b/db/db_impl/db_impl_files.cc index c5d07dd01f..20381eaa92 100644 --- a/db/db_impl/db_impl_files.cc +++ b/db/db_impl/db_impl_files.cc @@ -14,6 +14,7 @@ #include "db/event_helpers.h" #include "db/memtable_list.h" #include "file/file_util.h" +#include "file/filename.h" #include "file/sst_file_manager_impl.h" #include "util/autovector.h" @@ -664,4 +665,55 @@ uint64_t PrecomputeMinLogNumberToKeep( return min_log_number_to_keep; } +Status DBImpl::CleanupFilesAfterRecovery() { + mutex_.AssertHeld(); + std::vector paths; + paths.push_back(dbname_); + for (const auto& db_path : immutable_db_options_.db_paths) { + paths.push_back(db_path.path); + } + for (const auto* cfd : *versions_->GetColumnFamilySet()) { + for (const auto& cf_path : cfd->ioptions()->cf_paths) { + paths.push_back(cf_path.path); + } + } + // Dedup paths + std::sort(paths.begin(), paths.end()); + paths.erase(std::unique(paths.begin(), paths.end()), paths.end()); + + uint64_t next_file_number = versions_->current_next_file_number(); + uint64_t largest_file_number = next_file_number; + std::set files_to_delete; + for (const auto& path : paths) { + std::vector files; + env_->GetChildren(path, &files); + for (const auto& fname : files) { + uint64_t number = 0; + FileType type; + if (!ParseFileName(fname, &number, &type)) { + continue; + } + const std::string normalized_fpath = NormalizePath(path + fname); + largest_file_number = std::max(largest_file_number, number); + if (type == kTableFile && number >= next_file_number && + files_to_delete.find(normalized_fpath) == files_to_delete.end()) { + files_to_delete.insert(normalized_fpath); + } + } + } + if (largest_file_number > next_file_number) { + versions_->next_file_number_.store(largest_file_number + 1); + } + mutex_.Unlock(); + Status s; + for (const auto& fname : files_to_delete) { + s = env_->DeleteFile(fname); + if (!s.ok()) { + break; + } + } + mutex_.Lock(); + return s; +} + } // namespace ROCKSDB_NAMESPACE diff --git a/db/db_impl/db_impl_open.cc b/db/db_impl/db_impl_open.cc index 4318e5b75a..09728b6ab6 100644 --- a/db/db_impl/db_impl_open.cc +++ b/db/db_impl/db_impl_open.cc @@ -252,6 +252,12 @@ Status DBImpl::ValidateOptions(const DBOptions& db_options) { "atomic_flush is incompatible with enable_pipelined_write"); } + // TODO remove this restriction + if (db_options.atomic_flush && db_options.best_efforts_recovery) { + return Status::InvalidArgument( + "atomic_flush is currently incompatible with best-efforts recovery"); + } + return Status::OK(); } @@ -419,7 +425,17 @@ Status DBImpl::Recover( } } assert(db_id_.empty()); - Status s = versions_->Recover(column_families, read_only, &db_id_); + Status s; + bool missing_table_file = false; + if (!immutable_db_options_.best_efforts_recovery) { + s = versions_->Recover(column_families, read_only, &db_id_); + } else { + s = versions_->TryRecover(column_families, read_only, &db_id_, + &missing_table_file); + if (s.ok()) { + s = CleanupFilesAfterRecovery(); + } + } if (!s.ok()) { return s; } @@ -499,7 +515,9 @@ Status DBImpl::Recover( // attention to it in case we are recovering a database // produced by an older version of rocksdb. std::vector filenames; - s = env_->GetChildren(immutable_db_options_.wal_dir, &filenames); + if (!immutable_db_options_.best_efforts_recovery) { + s = env_->GetChildren(immutable_db_options_.wal_dir, &filenames); + } if (s.IsNotFound()) { return Status::InvalidArgument("wal_dir not found", immutable_db_options_.wal_dir); diff --git a/db/version_builder.cc b/db/version_builder.cc index d0ea16f0a6..cb4c5a2469 100644 --- a/db/version_builder.cc +++ b/db/version_builder.cc @@ -528,4 +528,26 @@ Status VersionBuilder::LoadTableHandlers( is_initial_load, prefix_extractor); } +BaseReferencedVersionBuilder::BaseReferencedVersionBuilder( + ColumnFamilyData* cfd) + : version_builder_(new VersionBuilder( + cfd->current()->version_set()->file_options(), cfd->table_cache(), + cfd->current()->storage_info(), cfd->ioptions()->info_log)), + version_(cfd->current()) { + version_->Ref(); +} + +BaseReferencedVersionBuilder::BaseReferencedVersionBuilder( + ColumnFamilyData* cfd, Version* v) + : version_builder_(new VersionBuilder( + cfd->current()->version_set()->file_options(), cfd->table_cache(), + v->storage_info(), cfd->ioptions()->info_log)), + version_(v) { + assert(version_ != cfd->current()); +} + +BaseReferencedVersionBuilder::~BaseReferencedVersionBuilder() { + version_->Unref(); +} + } // namespace ROCKSDB_NAMESPACE diff --git a/db/version_builder.h b/db/version_builder.h index a2c8e02947..1c75fee6d4 100644 --- a/db/version_builder.h +++ b/db/version_builder.h @@ -21,6 +21,8 @@ class VersionStorageInfo; class VersionEdit; struct FileMetaData; class InternalStats; +class Version; +class ColumnFamilyData; // A helper class so we can efficiently apply a whole sequence // of edits to a particular state without creating intermediate @@ -44,5 +46,20 @@ class VersionBuilder { std::unique_ptr rep_; }; +// A wrapper of version builder which references the current version in +// constructor and unref it in the destructor. +// Both of the constructor and destructor need to be called inside DB Mutex. +class BaseReferencedVersionBuilder { + public: + explicit BaseReferencedVersionBuilder(ColumnFamilyData* cfd); + BaseReferencedVersionBuilder(ColumnFamilyData* cfd, Version* v); + ~BaseReferencedVersionBuilder(); + VersionBuilder* version_builder() const { return version_builder_.get(); } + + private: + std::unique_ptr version_builder_; + Version* version_; +}; + extern bool NewestFirstBySeqNo(FileMetaData* a, FileMetaData* b); } // namespace ROCKSDB_NAMESPACE diff --git a/db/version_edit.h b/db/version_edit.h index 221a1efdb2..1ea38cf68c 100644 --- a/db/version_edit.h +++ b/db/version_edit.h @@ -438,6 +438,8 @@ class VersionEdit { private: friend class ReactiveVersionSet; + friend class VersionEditHandler; + friend class VersionEditHandlerPointInTime; friend class VersionSet; friend class Version; friend class AtomicGroupReadBuffer; diff --git a/db/version_edit_handler.cc b/db/version_edit_handler.cc new file mode 100644 index 0000000000..44e21efa64 --- /dev/null +++ b/db/version_edit_handler.cc @@ -0,0 +1,576 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include "db/version_edit_handler.h" + +#include "monitoring/persistent_stats_history.h" + +namespace ROCKSDB_NAMESPACE { + +VersionEditHandler::VersionEditHandler( + bool read_only, const std::vector& column_families, + VersionSet* version_set, bool track_missing_files, + bool no_error_if_table_files_missing) + : read_only_(read_only), + column_families_(column_families), + status_(), + version_set_(version_set), + track_missing_files_(track_missing_files), + no_error_if_table_files_missing_(no_error_if_table_files_missing), + initialized_(false) { + assert(version_set_ != nullptr); +} + +Status VersionEditHandler::Iterate(log::Reader& reader, std::string* db_id) { + Slice record; + std::string scratch; + size_t recovered_edits = 0; + Status s = Initialize(); + while (reader.ReadRecord(&record, &scratch) && s.ok()) { + VersionEdit edit; + s = edit.DecodeFrom(record); + if (!s.ok()) { + break; + } + if (edit.has_db_id_) { + version_set_->db_id_ = edit.GetDbId(); + if (db_id != nullptr) { + *db_id = version_set_->db_id_; + } + } + s = read_buffer_.AddEdit(&edit); + if (!s.ok()) { + break; + } + ColumnFamilyData* cfd = nullptr; + if (edit.is_in_atomic_group_) { + if (read_buffer_.IsFull()) { + for (auto& e : read_buffer_.replay_buffer()) { + s = ApplyVersionEdit(e, &cfd); + if (!s.ok()) { + break; + } + ++recovered_edits; + } + if (!s.ok()) { + break; + } + read_buffer_.Clear(); + } + } else { + s = ApplyVersionEdit(edit, &cfd); + if (s.ok()) { + ++recovered_edits; + } + } + } + + CheckIterationResult(reader, &s); + + if (!s.ok()) { + status_ = s; + } + return s; +} + +Status VersionEditHandler::Initialize() { + Status s; + if (!initialized_) { + for (const auto& cf_desc : column_families_) { + name_to_options_.emplace(cf_desc.name, cf_desc.options); + } + auto default_cf_iter = name_to_options_.find(kDefaultColumnFamilyName); + if (default_cf_iter == name_to_options_.end()) { + s = Status::InvalidArgument("Default column family not specified"); + } + if (s.ok()) { + VersionEdit default_cf_edit; + default_cf_edit.AddColumnFamily(kDefaultColumnFamilyName); + default_cf_edit.SetColumnFamily(0); + ColumnFamilyData* cfd = + CreateCfAndInit(default_cf_iter->second, default_cf_edit); + assert(cfd != nullptr); +#ifdef NDEBUG + (void)cfd; +#endif + initialized_ = true; + } + } + return s; +} + +Status VersionEditHandler::ApplyVersionEdit(VersionEdit& edit, + ColumnFamilyData** cfd) { + Status s; + if (edit.is_column_family_add_) { + s = OnColumnFamilyAdd(edit, cfd); + } else if (edit.is_column_family_drop_) { + s = OnColumnFamilyDrop(edit, cfd); + } else { + s = OnNonCfOperation(edit, cfd); + } + if (s.ok()) { + assert(cfd != nullptr); + s = ExtractInfoFromVersionEdit(*cfd, edit); + } + return s; +} + +Status VersionEditHandler::OnColumnFamilyAdd(VersionEdit& edit, + ColumnFamilyData** cfd) { + bool cf_in_not_found = false; + bool cf_in_builders = false; + CheckColumnFamilyId(edit, &cf_in_not_found, &cf_in_builders); + + assert(cfd != nullptr); + *cfd = nullptr; + Status s; + if (cf_in_builders || cf_in_not_found) { + s = Status::Corruption("MANIFEST adding the same column family twice: " + + edit.column_family_name_); + } + if (s.ok()) { + auto cf_options = name_to_options_.find(edit.column_family_name_); + // implicitly add persistent_stats column family without requiring user + // to specify + ColumnFamilyData* tmp_cfd = nullptr; + bool is_persistent_stats_column_family = + edit.column_family_name_.compare(kPersistentStatsColumnFamilyName) == 0; + if (cf_options == name_to_options_.end() && + !is_persistent_stats_column_family) { + column_families_not_found_.emplace(edit.column_family_, + edit.column_family_name_); + } else { + if (is_persistent_stats_column_family) { + ColumnFamilyOptions cfo; + OptimizeForPersistentStats(&cfo); + tmp_cfd = CreateCfAndInit(cfo, edit); + } else { + tmp_cfd = CreateCfAndInit(cf_options->second, edit); + } + *cfd = tmp_cfd; + } + } + return s; +} + +Status VersionEditHandler::OnColumnFamilyDrop(VersionEdit& edit, + ColumnFamilyData** cfd) { + bool cf_in_not_found = false; + bool cf_in_builders = false; + CheckColumnFamilyId(edit, &cf_in_not_found, &cf_in_builders); + + assert(cfd != nullptr); + *cfd = nullptr; + ColumnFamilyData* tmp_cfd = nullptr; + Status s; + if (cf_in_builders) { + tmp_cfd = DestroyCfAndCleanup(edit); + } else if (cf_in_not_found) { + column_families_not_found_.erase(edit.column_family_); + } else { + s = Status::Corruption("MANIFEST - dropping non-existing column family"); + } + *cfd = tmp_cfd; + return s; +} + +Status VersionEditHandler::OnNonCfOperation(VersionEdit& edit, + ColumnFamilyData** cfd) { + bool cf_in_not_found = false; + bool cf_in_builders = false; + CheckColumnFamilyId(edit, &cf_in_not_found, &cf_in_builders); + + assert(cfd != nullptr); + *cfd = nullptr; + Status s; + if (!cf_in_not_found) { + if (!cf_in_builders) { + s = Status::Corruption( + "MANIFEST record referencing unknown column family"); + } + ColumnFamilyData* tmp_cfd = nullptr; + if (s.ok()) { + auto builder_iter = builders_.find(edit.column_family_); + assert(builder_iter != builders_.end()); + tmp_cfd = version_set_->GetColumnFamilySet()->GetColumnFamily( + edit.column_family_); + assert(tmp_cfd != nullptr); + s = MaybeCreateVersion(edit, tmp_cfd, /*force_create_version=*/false); + if (s.ok()) { + s = builder_iter->second->version_builder()->Apply(&edit); + } + } + *cfd = tmp_cfd; + } + return s; +} + +// TODO maybe cache the computation result +bool VersionEditHandler::HasMissingFiles() const { + bool ret = false; + for (const auto& elem : cf_to_missing_files_) { + const auto& missing_files = elem.second; + if (!missing_files.empty()) { + ret = true; + break; + } + } + return ret; +} + +void VersionEditHandler::CheckColumnFamilyId(const VersionEdit& edit, + bool* cf_in_not_found, + bool* cf_in_builders) const { + assert(cf_in_not_found != nullptr); + assert(cf_in_builders != nullptr); + // Not found means that user didn't supply that column + // family option AND we encountered column family add + // record. Once we encounter column family drop record, + // we will delete the column family from + // column_families_not_found. + bool in_not_found = column_families_not_found_.find(edit.column_family_) != + column_families_not_found_.end(); + // in builders means that user supplied that column family + // option AND that we encountered column family add record + bool in_builders = builders_.find(edit.column_family_) != builders_.end(); + // They cannot both be true + assert(!(in_not_found && in_builders)); + *cf_in_not_found = in_not_found; + *cf_in_builders = in_builders; +} + +void VersionEditHandler::CheckIterationResult(const log::Reader& reader, + Status* s) { + assert(s != nullptr); + if (!s->ok()) { + read_buffer_.Clear(); + } else if (!version_edit_params_.has_log_number_ || + !version_edit_params_.has_next_file_number_ || + !version_edit_params_.has_last_sequence_) { + std::string msg("no "); + if (!version_edit_params_.has_log_number_) { + msg.append("log_file_number, "); + } + if (!version_edit_params_.has_next_file_number_) { + msg.append("next_file_number, "); + } + if (!version_edit_params_.has_last_sequence_) { + msg.append("last_sequence, "); + } + msg = msg.substr(0, msg.size() - 2); + msg.append(" entry in MANIFEST"); + *s = Status::Corruption(msg); + } + if (s->ok() && !read_only_ && !column_families_not_found_.empty()) { + std::string msg; + for (const auto& cf : column_families_not_found_) { + msg.append(", "); + msg.append(cf.second); + } + msg = msg.substr(2); + *s = Status::InvalidArgument("Column families not opened: " + msg); + } + if (s->ok()) { + version_set_->GetColumnFamilySet()->UpdateMaxColumnFamily( + version_edit_params_.max_column_family_); + version_set_->MarkMinLogNumberToKeep2PC( + version_edit_params_.min_log_number_to_keep_); + version_set_->MarkFileNumberUsed(version_edit_params_.prev_log_number_); + version_set_->MarkFileNumberUsed(version_edit_params_.log_number_); + for (auto* cfd : *(version_set_->GetColumnFamilySet())) { + auto builder_iter = builders_.find(cfd->GetID()); + assert(builder_iter != builders_.end()); + auto* builder = builder_iter->second->version_builder(); + if (!builder->CheckConsistencyForNumLevels()) { + *s = Status::InvalidArgument( + "db has more levels than options.num_levels"); + break; + } + } + } + if (s->ok()) { + for (auto* cfd : *(version_set_->GetColumnFamilySet())) { + if (cfd->IsDropped()) { + continue; + } + if (read_only_) { + cfd->table_cache()->SetTablesAreImmortal(); + } + *s = LoadTables(cfd, /*prefetch_index_and_filter_in_cache=*/false, + /*is_initial_load=*/true); + if (!s->ok()) { + break; + } + } + } + if (s->ok()) { + for (auto* cfd : *(version_set_->column_family_set_)) { + if (cfd->IsDropped()) { + continue; + } + assert(cfd->initialized()); + VersionEdit edit; + *s = MaybeCreateVersion(edit, cfd, /*force_create_version=*/true); + if (!s->ok()) { + break; + } + } + } + if (s->ok()) { + version_set_->manifest_file_size_ = reader.GetReadOffset(); + assert(version_set_->manifest_file_size_ > 0); + version_set_->next_file_number_.store( + version_edit_params_.next_file_number_ + 1); + version_set_->last_allocated_sequence_ = + version_edit_params_.last_sequence_; + version_set_->last_published_sequence_ = + version_edit_params_.last_sequence_; + version_set_->last_sequence_ = version_edit_params_.last_sequence_; + version_set_->prev_log_number_ = version_edit_params_.prev_log_number_; + } +} + +ColumnFamilyData* VersionEditHandler::CreateCfAndInit( + const ColumnFamilyOptions& cf_options, const VersionEdit& edit) { + ColumnFamilyData* cfd = version_set_->CreateColumnFamily(cf_options, &edit); + assert(cfd != nullptr); + cfd->set_initialized(); + assert(builders_.find(edit.column_family_) == builders_.end()); + builders_.emplace(edit.column_family_, + VersionBuilderUPtr(new BaseReferencedVersionBuilder(cfd))); + if (track_missing_files_) { + cf_to_missing_files_.emplace(edit.column_family_, + std::unordered_set()); + } + return cfd; +} + +ColumnFamilyData* VersionEditHandler::DestroyCfAndCleanup( + const VersionEdit& edit) { + auto builder_iter = builders_.find(edit.column_family_); + assert(builder_iter != builders_.end()); + builders_.erase(builder_iter); + if (track_missing_files_) { + auto missing_files_iter = cf_to_missing_files_.find(edit.column_family_); + assert(missing_files_iter != cf_to_missing_files_.end()); + cf_to_missing_files_.erase(missing_files_iter); + } + ColumnFamilyData* ret = + version_set_->GetColumnFamilySet()->GetColumnFamily(edit.column_family_); + assert(ret != nullptr); + if (ret->UnrefAndTryDelete()) { + ret = nullptr; + } else { + assert(false); + } + return ret; +} + +Status VersionEditHandler::MaybeCreateVersion(const VersionEdit& /*edit*/, + ColumnFamilyData* cfd, + bool force_create_version) { + assert(cfd->initialized()); + if (force_create_version) { + auto builder_iter = builders_.find(cfd->GetID()); + assert(builder_iter != builders_.end()); + auto* builder = builder_iter->second->version_builder(); + auto* v = new Version(cfd, version_set_, version_set_->file_options_, + *cfd->GetLatestMutableCFOptions(), + version_set_->current_version_number_++); + builder->SaveTo(v->storage_info()); + // Install new version + v->PrepareApply(*cfd->GetLatestMutableCFOptions(), + !(version_set_->db_options_->skip_stats_update_on_db_open)); + version_set_->AppendVersion(cfd, v); + } + return Status::OK(); +} + +Status VersionEditHandler::LoadTables(ColumnFamilyData* cfd, + bool prefetch_index_and_filter_in_cache, + bool is_initial_load) { + assert(cfd != nullptr); + assert(!cfd->IsDropped()); + Status s; + auto builder_iter = builders_.find(cfd->GetID()); + assert(builder_iter != builders_.end()); + assert(builder_iter->second != nullptr); + VersionBuilder* builder = builder_iter->second->version_builder(); + assert(builder); + s = builder->LoadTableHandlers( + cfd->internal_stats(), + version_set_->db_options_->max_file_opening_threads, + prefetch_index_and_filter_in_cache, is_initial_load, + cfd->GetLatestMutableCFOptions()->prefix_extractor.get()); + if (s.IsPathNotFound() && no_error_if_table_files_missing_) { + s = Status::OK(); + } + if (!s.ok() && !version_set_->db_options_->paranoid_checks) { + s = Status::OK(); + } + return s; +} + +Status VersionEditHandler::ExtractInfoFromVersionEdit(ColumnFamilyData* cfd, + const VersionEdit& edit) { + Status s; + if (cfd != nullptr) { + if (edit.has_db_id_) { + version_edit_params_.SetDBId(edit.db_id_); + } + if (edit.has_log_number_) { + if (cfd->GetLogNumber() > edit.log_number_) { + ROCKS_LOG_WARN( + version_set_->db_options()->info_log, + "MANIFEST corruption detected, but ignored - Log numbers in " + "records NOT monotonically increasing"); + } else { + cfd->SetLogNumber(edit.log_number_); + version_edit_params_.SetLogNumber(edit.log_number_); + } + } + if (edit.has_comparator_ && + edit.comparator_ != cfd->user_comparator()->Name()) { + s = Status::InvalidArgument( + cfd->user_comparator()->Name(), + "does not match existing comparator " + edit.comparator_); + } + } + + if (s.ok()) { + if (edit.has_prev_log_number_) { + version_edit_params_.SetPrevLogNumber(edit.prev_log_number_); + } + if (edit.has_next_file_number_) { + version_edit_params_.SetNextFile(edit.next_file_number_); + } + if (edit.has_max_column_family_) { + version_edit_params_.SetMaxColumnFamily(edit.max_column_family_); + } + if (edit.has_min_log_number_to_keep_) { + version_edit_params_.min_log_number_to_keep_ = + std::max(version_edit_params_.min_log_number_to_keep_, + edit.min_log_number_to_keep_); + } + if (edit.has_last_sequence_) { + version_edit_params_.SetLastSequence(edit.last_sequence_); + } + if (!version_edit_params_.has_prev_log_number_) { + version_edit_params_.SetPrevLogNumber(0); + } + } + return s; +} + +VersionEditHandlerPointInTime::VersionEditHandlerPointInTime( + bool read_only, const std::vector& column_families, + VersionSet* version_set) + : VersionEditHandler(read_only, column_families, version_set, + /*track_missing_files=*/true, + /*no_error_if_table_files_missing=*/true) {} + +VersionEditHandlerPointInTime::~VersionEditHandlerPointInTime() { + for (const auto& elem : versions_) { + delete elem.second; + } + versions_.clear(); +} + +void VersionEditHandlerPointInTime::CheckIterationResult( + const log::Reader& reader, Status* s) { + VersionEditHandler::CheckIterationResult(reader, s); + assert(s != nullptr); + if (s->ok()) { + for (auto* cfd : *(version_set_->column_family_set_)) { + if (cfd->IsDropped()) { + continue; + } + assert(cfd->initialized()); + auto v_iter = versions_.find(cfd->GetID()); + if (v_iter != versions_.end()) { + assert(v_iter->second != nullptr); + + version_set_->AppendVersion(cfd, v_iter->second); + versions_.erase(v_iter); + } + } + } +} + +ColumnFamilyData* VersionEditHandlerPointInTime::DestroyCfAndCleanup( + const VersionEdit& edit) { + ColumnFamilyData* cfd = VersionEditHandler::DestroyCfAndCleanup(edit); + auto v_iter = versions_.find(edit.column_family_); + if (v_iter != versions_.end()) { + delete v_iter->second; + versions_.erase(v_iter); + } + return cfd; +} + +Status VersionEditHandlerPointInTime::MaybeCreateVersion( + const VersionEdit& edit, ColumnFamilyData* cfd, bool force_create_version) { + assert(cfd != nullptr); + if (!force_create_version) { + assert(edit.column_family_ == cfd->GetID()); + } + auto missing_files_iter = cf_to_missing_files_.find(cfd->GetID()); + assert(missing_files_iter != cf_to_missing_files_.end()); + std::unordered_set& missing_files = missing_files_iter->second; + const bool prev_has_missing_files = !missing_files.empty(); + for (const auto& file : edit.GetDeletedFiles()) { + uint64_t file_num = file.second; + auto fiter = missing_files.find(file_num); + if (fiter != missing_files.end()) { + missing_files.erase(fiter); + } + } + Status s; + for (const auto& elem : edit.GetNewFiles()) { + const FileMetaData& meta = elem.second; + const FileDescriptor& fd = meta.fd; + uint64_t file_num = fd.GetNumber(); + const std::string fpath = + MakeTableFileName(cfd->ioptions()->cf_paths[0].path, file_num); + s = version_set_->VerifyFileMetadata(fpath, meta); + if (s.IsPathNotFound() || s.IsNotFound()) { + missing_files.insert(file_num); + s = Status::OK(); + } + } + bool missing_info = !version_edit_params_.has_log_number_ || + !version_edit_params_.has_next_file_number_ || + !version_edit_params_.has_last_sequence_; + + // Create version before apply edit + if (!missing_info && ((!missing_files.empty() && !prev_has_missing_files) || + (missing_files.empty() && force_create_version))) { + auto builder_iter = builders_.find(cfd->GetID()); + assert(builder_iter != builders_.end()); + auto* builder = builder_iter->second->version_builder(); + auto* version = new Version(cfd, version_set_, version_set_->file_options_, + *cfd->GetLatestMutableCFOptions(), + version_set_->current_version_number_++); + builder->SaveTo(version->storage_info()); + version->PrepareApply( + *cfd->GetLatestMutableCFOptions(), + !version_set_->db_options_->skip_stats_update_on_db_open); + auto v_iter = versions_.find(cfd->GetID()); + if (v_iter != versions_.end()) { + delete v_iter->second; + v_iter->second = version; + } else { + versions_.emplace(cfd->GetID(), version); + } + } + return s; +} + +} // namespace ROCKSDB_NAMESPACE diff --git a/db/version_edit_handler.h b/db/version_edit_handler.h new file mode 100644 index 0000000000..7df940d6fd --- /dev/null +++ b/db/version_edit_handler.h @@ -0,0 +1,123 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#pragma once + +#include "db/version_builder.h" +#include "db/version_edit.h" +#include "db/version_set.h" + +namespace ROCKSDB_NAMESPACE { + +typedef std::unique_ptr VersionBuilderUPtr; + +// A class used for scanning MANIFEST file. +// VersionEditHandler reads a MANIFEST file, parses the version edits, and +// builds the version set's in-memory state, e.g. the version storage info for +// the versions of column families. +// To use this class and its subclasses, +// 1. Create an object of VersionEditHandler or its subclasses. +// VersionEditHandler handler(read_only, column_families, version_set, +// track_missing_files, ignore_missing_files); +// 2. Status s = handler.Iterate(reader, &db_id); +// 3. Check s and handle possible errors. +// +// Not thread-safe, external synchronization is necessary if an object of +// VersionEditHandler is shared by multiple threads. +class VersionEditHandler { + public: + explicit VersionEditHandler( + bool read_only, + const std::vector& column_families, + VersionSet* version_set, bool track_missing_files, + bool ignore_missing_files); + + virtual ~VersionEditHandler() {} + + Status Iterate(log::Reader& reader, std::string* db_id); + + const Status& status() const { return status_; } + + bool HasMissingFiles() const; + + protected: + Status ApplyVersionEdit(VersionEdit& edit, ColumnFamilyData** cfd); + + Status OnColumnFamilyAdd(VersionEdit& edit, ColumnFamilyData** cfd); + + Status OnColumnFamilyDrop(VersionEdit& edit, ColumnFamilyData** cfd); + + Status OnNonCfOperation(VersionEdit& edit, ColumnFamilyData** cfd); + + Status Initialize(); + + void CheckColumnFamilyId(const VersionEdit& edit, bool* cf_in_not_found, + bool* cf_in_builders) const; + + virtual void CheckIterationResult(const log::Reader& reader, Status* s); + + ColumnFamilyData* CreateCfAndInit(const ColumnFamilyOptions& cf_options, + const VersionEdit& edit); + + virtual ColumnFamilyData* DestroyCfAndCleanup(const VersionEdit& edit); + + virtual Status MaybeCreateVersion(const VersionEdit& edit, + ColumnFamilyData* cfd, + bool force_create_version); + + Status LoadTables(ColumnFamilyData* cfd, + bool prefetch_index_and_filter_in_cache, + bool is_initial_load); + + const bool read_only_; + const std::vector& column_families_; + Status status_; + VersionSet* version_set_; + AtomicGroupReadBuffer read_buffer_; + std::unordered_map builders_; + std::unordered_map name_to_options_; + std::unordered_map column_families_not_found_; + VersionEditParams version_edit_params_; + const bool track_missing_files_; + std::unordered_map> + cf_to_missing_files_; + bool no_error_if_table_files_missing_; + + private: + Status ExtractInfoFromVersionEdit(ColumnFamilyData* cfd, + const VersionEdit& edit); + + bool initialized_; +}; + +// A class similar to its base class, i.e. VersionEditHandler. +// VersionEditHandlerPointInTime restores the versions to the most recent point +// in time such that at this point, the version does not have missing files. +// +// Not thread-safe, external synchronization is necessary if an object of +// VersionEditHandlerPointInTime is shared by multiple threads. +class VersionEditHandlerPointInTime : public VersionEditHandler { + public: + VersionEditHandlerPointInTime( + bool read_only, + const std::vector& column_families, + VersionSet* version_set); + ~VersionEditHandlerPointInTime() override; + + protected: + void CheckIterationResult(const log::Reader& reader, Status* s) override; + ColumnFamilyData* DestroyCfAndCleanup(const VersionEdit& edit) override; + Status MaybeCreateVersion(const VersionEdit& edit, ColumnFamilyData* cfd, + bool force_create_version) override; + + private: + std::unordered_map versions_; +}; + +} // namespace ROCKSDB_NAMESPACE diff --git a/db/version_set.cc b/db/version_set.cc index 99bc12cba6..6753a58d04 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -29,6 +29,7 @@ #include "db/pinned_iterators_manager.h" #include "db/table_cache.h" #include "db/version_builder.h" +#include "db/version_edit_handler.h" #include "file/filename.h" #include "file/random_access_file_reader.h" #include "file/read_write_util.h" @@ -1202,28 +1203,6 @@ void LevelIterator::InitFileIterator(size_t new_file_index) { } } // anonymous namespace -// A wrapper of version builder which references the current version in -// constructor and unref it in the destructor. -// Both of the constructor and destructor need to be called inside DB Mutex. -class BaseReferencedVersionBuilder { - public: - explicit BaseReferencedVersionBuilder(ColumnFamilyData* cfd) - : version_builder_(new VersionBuilder( - cfd->current()->version_set()->file_options(), cfd->table_cache(), - cfd->current()->storage_info(), cfd->ioptions()->info_log)), - version_(cfd->current()) { - version_->Ref(); - } - ~BaseReferencedVersionBuilder() { - version_->Unref(); - } - VersionBuilder* version_builder() { return version_builder_.get(); } - - private: - std::unique_ptr version_builder_; - Version* version_; -}; - Status Version::GetTableProperties(std::shared_ptr* tp, const FileMetaData* file_meta, const std::string* fname) const { @@ -3570,6 +3549,33 @@ VersionSet::~VersionSet() { obsolete_files_.clear(); } +void VersionSet::Reset() { + if (column_family_set_) { + Cache* table_cache = column_family_set_->get_table_cache(); + WriteBufferManager* wbm = column_family_set_->write_buffer_manager(); + WriteController* wc = column_family_set_->write_controller(); + column_family_set_.reset(new ColumnFamilySet(dbname_, db_options_, + file_options_, table_cache, + wbm, wc, block_cache_tracer_)); + } + db_id_.clear(); + next_file_number_.store(2); + min_log_number_to_keep_2pc_.store(0); + manifest_file_number_ = 0; + options_file_number_ = 0; + pending_manifest_file_number_ = 0; + last_sequence_.store(0); + last_allocated_sequence_.store(0); + last_published_sequence_.store(0); + prev_log_number_ = 0; + descriptor_log_.reset(); + current_version_number_ = 0; + manifest_writers_.clear(); + manifest_file_size_ = 0; + obsolete_files_.clear(); + obsolete_manifests_.clear(); +} + void VersionSet::AppendVersion(ColumnFamilyData* column_family_data, Version* v) { // compute new compaction score @@ -4452,8 +4458,6 @@ Status VersionSet::Recover( reporter.status = &s; log::Reader reader(nullptr, std::move(manifest_file_reader), &reporter, true /* checksum */, 0 /* log_number */); - Slice record; - std::string scratch; AtomicGroupReadBuffer read_buffer; s = ReadAndRecover(&reader, &read_buffer, cf_name_to_options, column_families_not_found, builders, @@ -4581,6 +4585,159 @@ Status VersionSet::Recover( return s; } +namespace { +class ManifestPicker { + public: + explicit ManifestPicker(const std::string& dbname, FileSystem* fs); + void SeekToFirstManifest(); + // REQUIRES Valid() == true + std::string GetNextManifest(uint64_t* file_number, std::string* file_name); + bool Valid() const { return manifest_file_iter_ != manifest_files_.end(); } + const Status& status() const { return status_; } + + private: + const std::string& dbname_; + FileSystem* const fs_; + // MANIFEST file names(s) + std::vector manifest_files_; + std::vector::const_iterator manifest_file_iter_; + Status status_; +}; + +ManifestPicker::ManifestPicker(const std::string& dbname, FileSystem* fs) + : dbname_(dbname), fs_(fs) {} + +void ManifestPicker::SeekToFirstManifest() { + assert(fs_ != nullptr); + std::vector children; + Status s = fs_->GetChildren(dbname_, IOOptions(), &children, /*dbg=*/nullptr); + if (!s.ok()) { + status_ = s; + return; + } + for (const auto& fname : children) { + uint64_t file_num = 0; + FileType file_type; + bool parse_ok = ParseFileName(fname, &file_num, &file_type); + if (parse_ok && file_type == kDescriptorFile) { + manifest_files_.push_back(fname); + } + } + std::sort(manifest_files_.begin(), manifest_files_.end(), + [](const std::string& lhs, const std::string& rhs) { + uint64_t num1 = 0; + uint64_t num2 = 0; + FileType type1; + FileType type2; + bool parse_ok1 = ParseFileName(lhs, &num1, &type1); + bool parse_ok2 = ParseFileName(rhs, &num2, &type2); +#ifndef NDEBUG + assert(parse_ok1); + assert(parse_ok2); +#else + (void)parse_ok1; + (void)parse_ok2; +#endif + return num1 > num2; + }); + manifest_file_iter_ = manifest_files_.begin(); +} + +std::string ManifestPicker::GetNextManifest(uint64_t* number, + std::string* file_name) { + assert(status_.ok()); + assert(Valid()); + std::string ret; + if (manifest_file_iter_ != manifest_files_.end()) { + ret.assign(dbname_); + if (ret.back() != kFilePathSeparator) { + ret.push_back(kFilePathSeparator); + } + ret.append(*manifest_file_iter_); + if (number) { + FileType type; + bool parse = ParseFileName(*manifest_file_iter_, number, &type); + assert(type == kDescriptorFile); +#ifndef NDEBUG + assert(parse); +#else + (void)parse; +#endif + } + if (file_name) { + *file_name = *manifest_file_iter_; + } + ++manifest_file_iter_; + } + return ret; +} +} // namespace + +Status VersionSet::TryRecover( + const std::vector& column_families, bool read_only, + std::string* db_id, bool* has_missing_table_file) { + ManifestPicker manifest_picker(dbname_, fs_); + manifest_picker.SeekToFirstManifest(); + Status s = manifest_picker.status(); + if (!s.ok()) { + return s; + } + if (!manifest_picker.Valid()) { + return Status::Corruption("Cannot locate MANIFEST file in " + dbname_); + } + std::string manifest_path = + manifest_picker.GetNextManifest(&manifest_file_number_, nullptr); + while (!manifest_path.empty()) { + s = TryRecoverFromOneManifest(manifest_path, column_families, read_only, + db_id, has_missing_table_file); + if (s.ok() || !manifest_picker.Valid()) { + break; + } + Reset(); + manifest_path = + manifest_picker.GetNextManifest(&manifest_file_number_, nullptr); + } + return s; +} + +Status VersionSet::TryRecoverFromOneManifest( + const std::string& manifest_path, + const std::vector& column_families, bool read_only, + std::string* db_id, bool* has_missing_table_file) { + ROCKS_LOG_INFO(db_options_->info_log, "Trying to recover from manifest: %s\n", + manifest_path.c_str()); + std::unique_ptr manifest_file_reader; + Status s; + { + std::unique_ptr manifest_file; + s = fs_->NewSequentialFile(manifest_path, + fs_->OptimizeForManifestRead(file_options_), + &manifest_file, nullptr); + if (!s.ok()) { + return s; + } + manifest_file_reader.reset( + new SequentialFileReader(std::move(manifest_file), manifest_path, + db_options_->log_readahead_size)); + } + + VersionSet::LogReporter reporter; + reporter.status = &s; + log::Reader reader(nullptr, std::move(manifest_file_reader), &reporter, + /*checksum=*/true, /*log_num=*/0); + { + VersionEditHandlerPointInTime handler_pit(read_only, column_families, + const_cast(this)); + + s = handler_pit.Iterate(reader, db_id); + + assert(nullptr != has_missing_table_file); + *has_missing_table_file = handler_pit.HasMissingFiles(); + } + + return s; +} + Status VersionSet::ListColumnFamilies(std::vector* column_families, const std::string& dbname, FileSystem* fs) { @@ -5518,7 +5675,7 @@ void VersionSet::GetObsoleteFiles(std::vector* files, } ColumnFamilyData* VersionSet::CreateColumnFamily( - const ColumnFamilyOptions& cf_options, VersionEdit* edit) { + const ColumnFamilyOptions& cf_options, const VersionEdit* edit) { assert(edit->is_column_family_add_); MutableCFOptions dummy_cf_options; @@ -5573,6 +5730,18 @@ uint64_t VersionSet::GetTotalSstFilesSize(Version* dummy_versions) { return total_files_size; } +Status VersionSet::VerifyFileMetadata(const std::string& fpath, + const FileMetaData& meta) const { + uint64_t fsize = 0; + Status status = fs_->GetFileSize(fpath, IOOptions(), &fsize, nullptr); + if (status.ok()) { + if (fsize != meta.fd.GetFileSize()) { + status = Status::Corruption("File size mismatch: " + fpath); + } + } + return status; +} + ReactiveVersionSet::ReactiveVersionSet(const std::string& dbname, const ImmutableDBOptions* _db_options, const FileOptions& _file_options, diff --git a/db/version_set.h b/db/version_set.h index a6629e89d6..b33a7c6e8e 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -688,6 +688,8 @@ class Version { FileSystem* fs_; friend class ReactiveVersionSet; friend class VersionSet; + friend class VersionEditHandler; + friend class VersionEditHandlerPointInTime; const InternalKeyComparator* internal_comparator() const { return storage_info_.internal_comparator_; @@ -876,6 +878,17 @@ class VersionSet { Status Recover(const std::vector& column_families, bool read_only = false, std::string* db_id = nullptr); + Status TryRecover(const std::vector& column_families, + bool read_only, std::string* db_id, + bool* has_missing_table_file); + + // Try to recover the version set to the most recent consistent state + // recorded in the specified manifest. + Status TryRecoverFromOneManifest( + const std::string& manifest_path, + const std::vector& column_families, + bool read_only, std::string* db_id, bool* has_missing_table_file); + // Reads a manifest file and returns a list of column families in // column_families. static Status ListColumnFamilies(std::vector* column_families, @@ -1059,6 +1072,8 @@ class VersionSet { struct ManifestWriter; friend class Version; + friend class VersionEditHandler; + friend class VersionEditHandlerPointInTime; friend class DBImpl; friend class DBImplReadOnly; @@ -1069,6 +1084,8 @@ class VersionSet { } }; + void Reset(); + // Returns approximated offset of a key in a file for a given version. uint64_t ApproximateOffsetOf(Version* v, const FdWithKeyRange& f, const Slice& key, TableReaderCaller caller); @@ -1091,7 +1108,7 @@ class VersionSet { void AppendVersion(ColumnFamilyData* column_family_data, Version* v); ColumnFamilyData* CreateColumnFamily(const ColumnFamilyOptions& cf_options, - VersionEdit* edit); + const VersionEdit* edit); Status ReadAndRecover( log::Reader* reader, AtomicGroupReadBuffer* read_buffer, @@ -1115,8 +1132,10 @@ class VersionSet { const VersionEdit& from_edit, VersionEditParams* version_edit_params); - std::unique_ptr column_family_set_; + Status VerifyFileMetadata(const std::string& fpath, + const FileMetaData& meta) const; + std::unique_ptr column_family_set_; Env* const env_; FileSystem* const fs_; const std::string dbname_; diff --git a/db/version_set_test.cc b/db/version_set_test.cc index 03e0e26d27..e877273d22 100644 --- a/db/version_set_test.cc +++ b/db/version_set_test.cc @@ -10,6 +10,7 @@ #include "db/version_set.h" #include "db/db_impl/db_impl.h" #include "db/log_writer.h" +#include "env/mock_env.h" #include "logging/logging.h" #include "table/mock_table.h" #include "test_util/testharness.h" @@ -611,16 +612,37 @@ class VersionSetTestBase { const static std::string kColumnFamilyName3; int num_initial_edits_; - VersionSetTestBase() - : env_(Env::Default()), - fs_(std::make_shared(env_)), - dbname_(test::PerThreadDBPath("version_set_test")), - db_options_(), + explicit VersionSetTestBase(const std::string& name) + : mem_env_(nullptr), + env_(nullptr), + env_guard_(), + fs_(), + dbname_(test::PerThreadDBPath(name)), + options_(), + db_options_(options_), + cf_options_(options_), + immutable_cf_options_(db_options_, cf_options_), mutable_cf_options_(cf_options_), table_cache_(NewLRUCache(50000, 16)), write_buffer_manager_(db_options_.db_write_buffer_size), shutting_down_(false), mock_table_factory_(std::make_shared()) { + const char* test_env_uri = getenv("TEST_ENV_URI"); + Env* base_env = nullptr; + if (test_env_uri) { + Status s = Env::LoadEnv(test_env_uri, &base_env, &env_guard_); + EXPECT_OK(s); + EXPECT_NE(Env::Default(), base_env); + } else { + base_env = Env::Default(); + } + EXPECT_NE(nullptr, base_env); + if (getenv("MEM_ENV")) { + mem_env_ = new MockEnv(base_env); + } + env_ = mem_env_ ? mem_env_ : base_env; + + fs_ = std::make_shared(env_); EXPECT_OK(env_->CreateDirIfMissing(dbname_)); db_options_.env = env_; @@ -628,23 +650,38 @@ class VersionSetTestBase { versions_.reset(new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(), &write_buffer_manager_, &write_controller_, - /*block_cache_tracer=*/nullptr)), - reactive_versions_ = std::make_shared( - dbname_, &db_options_, env_options_, table_cache_.get(), - &write_buffer_manager_, &write_controller_); + /*block_cache_tracer=*/nullptr)); + reactive_versions_ = std::make_shared( + dbname_, &db_options_, env_options_, table_cache_.get(), + &write_buffer_manager_, &write_controller_); db_options_.db_paths.emplace_back(dbname_, std::numeric_limits::max()); } - void PrepareManifest(std::vector* column_families, - SequenceNumber* last_seqno, - std::unique_ptr* log_writer) { + virtual ~VersionSetTestBase() { + if (getenv("KEEP_DB")) { + fprintf(stdout, "DB is still at %s\n", dbname_.c_str()); + } else { + Options options; + options.env = env_; + EXPECT_OK(DestroyDB(dbname_, options)); + } + if (mem_env_) { + delete mem_env_; + mem_env_ = nullptr; + } + } + + protected: + virtual void PrepareManifest( + std::vector* column_families, + SequenceNumber* last_seqno, std::unique_ptr* log_writer) { assert(column_families != nullptr); assert(last_seqno != nullptr); assert(log_writer != nullptr); VersionEdit new_db; if (db_options_.write_dbid_to_manifest) { - DBImpl* impl = new DBImpl(DBOptions(), dbname_); + std::unique_ptr impl(new DBImpl(DBOptions(), dbname_)); std::string db_id; impl->GetDbIdentityFromIdentityFile(&db_id); new_db.SetDBId(db_id); @@ -715,12 +752,25 @@ class VersionSetTestBase { versions_->GetColumnFamilySet()->NumberOfColumnFamilies()); } + void VerifyManifest(std::string* manifest_path) const { + assert(manifest_path != nullptr); + uint64_t manifest_file_number = 0; + Status s = versions_->GetCurrentManifestPath( + dbname_, fs_.get(), manifest_path, &manifest_file_number); + ASSERT_OK(s); + ASSERT_EQ(1, manifest_file_number); + } + + MockEnv* mem_env_; Env* env_; + std::shared_ptr env_guard_; std::shared_ptr fs_; const std::string dbname_; EnvOptions env_options_; + Options options_; ImmutableDBOptions db_options_; ColumnFamilyOptions cf_options_; + ImmutableCFOptions immutable_cf_options_; MutableCFOptions mutable_cf_options_; std::shared_ptr table_cache_; WriteController write_controller_; @@ -738,7 +788,7 @@ const std::string VersionSetTestBase::kColumnFamilyName3 = "charles"; class VersionSetTest : public VersionSetTestBase, public testing::Test { public: - VersionSetTest() : VersionSetTestBase() {} + VersionSetTest() : VersionSetTestBase("version_set_test") {} }; TEST_F(VersionSetTest, SameColumnFamilyGroupCommit) { @@ -780,7 +830,8 @@ TEST_F(VersionSetTest, SameColumnFamilyGroupCommit) { class VersionSetAtomicGroupTest : public VersionSetTestBase, public testing::Test { public: - VersionSetAtomicGroupTest() : VersionSetTestBase() {} + VersionSetAtomicGroupTest() + : VersionSetTestBase("version_set_atomic_group_test") {} void SetUp() override { PrepareManifest(&column_families_, &last_seqno_, &log_writer_); @@ -1164,7 +1215,8 @@ TEST_F(VersionSetAtomicGroupTest, class VersionSetTestDropOneCF : public VersionSetTestBase, public testing::TestWithParam { public: - VersionSetTestDropOneCF() : VersionSetTestBase() {} + VersionSetTestDropOneCF() + : VersionSetTestBase("version_set_test_drop_one_cf") {} }; // This test simulates the following execution sequence @@ -1279,6 +1331,706 @@ INSTANTIATE_TEST_CASE_P( testing::Values(VersionSetTestBase::kColumnFamilyName1, VersionSetTestBase::kColumnFamilyName2, VersionSetTestBase::kColumnFamilyName3)); + +class EmptyDefaultCfNewManifest : public VersionSetTestBase, + public testing::Test { + public: + EmptyDefaultCfNewManifest() : VersionSetTestBase("version_set_new_db_test") {} + // Emulate DBImpl::NewDB() + void PrepareManifest(std::vector* /*column_families*/, + SequenceNumber* /*last_seqno*/, + std::unique_ptr* log_writer) override { + assert(log_writer != nullptr); + VersionEdit new_db; + new_db.SetLogNumber(0); + std::unique_ptr file; + const std::string manifest_path = DescriptorFileName(dbname_, 1); + Status s = env_->NewWritableFile( + manifest_path, &file, env_->OptimizeForManifestWrite(env_options_)); + ASSERT_OK(s); + std::unique_ptr file_writer( + new WritableFileWriter(NewLegacyWritableFileWrapper(std::move(file)), + manifest_path, env_options_)); + log_writer->reset(new log::Writer(std::move(file_writer), 0, true)); + std::string record; + ASSERT_TRUE(new_db.EncodeTo(&record)); + s = (*log_writer)->AddRecord(record); + ASSERT_OK(s); + // Create new column family + VersionEdit new_cf; + new_cf.AddColumnFamily(VersionSetTestBase::kColumnFamilyName1); + new_cf.SetColumnFamily(1); + new_cf.SetLastSequence(2); + new_cf.SetNextFile(2); + record.clear(); + ASSERT_TRUE(new_cf.EncodeTo(&record)); + s = (*log_writer)->AddRecord(record); + ASSERT_OK(s); + } + + protected: + bool write_dbid_to_manifest_ = false; + std::unique_ptr log_writer_; +}; + +// Create db, create column family. Cf creation will switch to a new MANIFEST. +// Then reopen db, trying to recover. +TEST_F(EmptyDefaultCfNewManifest, Recover) { + PrepareManifest(nullptr, nullptr, &log_writer_); + log_writer_.reset(); + Status s = SetCurrentFile(env_, dbname_, 1, /*directory_to_fsync=*/nullptr); + ASSERT_OK(s); + std::string manifest_path; + VerifyManifest(&manifest_path); + std::vector column_families; + column_families.emplace_back(kDefaultColumnFamilyName, cf_options_); + column_families.emplace_back(VersionSetTestBase::kColumnFamilyName1, + cf_options_); + std::string db_id; + bool has_missing_table_file = false; + s = versions_->TryRecoverFromOneManifest( + manifest_path, column_families, false, &db_id, &has_missing_table_file); + ASSERT_OK(s); + ASSERT_FALSE(has_missing_table_file); +} + +class VersionSetTestEmptyDb + : public VersionSetTestBase, + public testing::TestWithParam< + std::tuple>> { + public: + static const std::string kUnknownColumnFamilyName; + VersionSetTestEmptyDb() : VersionSetTestBase("version_set_test_empty_db") {} + + protected: + void PrepareManifest(std::vector* /*column_families*/, + SequenceNumber* /*last_seqno*/, + std::unique_ptr* log_writer) override { + assert(nullptr != log_writer); + VersionEdit new_db; + if (db_options_.write_dbid_to_manifest) { + std::unique_ptr impl(new DBImpl(DBOptions(), dbname_)); + std::string db_id; + impl->GetDbIdentityFromIdentityFile(&db_id); + new_db.SetDBId(db_id); + } + const std::string manifest_path = DescriptorFileName(dbname_, 1); + std::unique_ptr file; + Status s = env_->NewWritableFile( + manifest_path, &file, env_->OptimizeForManifestWrite(env_options_)); + ASSERT_OK(s); + std::unique_ptr file_writer( + new WritableFileWriter(NewLegacyWritableFileWrapper(std::move(file)), + manifest_path, env_options_)); + { + log_writer->reset(new log::Writer(std::move(file_writer), 0, false)); + std::string record; + new_db.EncodeTo(&record); + s = (*log_writer)->AddRecord(record); + ASSERT_OK(s); + } + } + + std::unique_ptr log_writer_; +}; + +const std::string VersionSetTestEmptyDb::kUnknownColumnFamilyName = "unknown"; + +TEST_P(VersionSetTestEmptyDb, OpenFromIncompleteManifest0) { + db_options_.write_dbid_to_manifest = std::get<0>(GetParam()); + PrepareManifest(nullptr, nullptr, &log_writer_); + log_writer_.reset(); + Status s = SetCurrentFile(env_, dbname_, 1, /*directory_to_fsync=*/nullptr); + ASSERT_OK(s); + + std::string manifest_path; + VerifyManifest(&manifest_path); + + bool read_only = std::get<1>(GetParam()); + const std::vector cf_names = std::get<2>(GetParam()); + + std::vector column_families; + for (const auto& cf_name : cf_names) { + column_families.emplace_back(cf_name, cf_options_); + } + + std::string db_id; + bool has_missing_table_file = false; + s = versions_->TryRecoverFromOneManifest(manifest_path, column_families, + read_only, &db_id, + &has_missing_table_file); + auto iter = + std::find(cf_names.begin(), cf_names.end(), kDefaultColumnFamilyName); + if (iter == cf_names.end()) { + ASSERT_TRUE(s.IsInvalidArgument()); + } else { + ASSERT_TRUE(s.IsCorruption()); + } +} + +TEST_P(VersionSetTestEmptyDb, OpenFromIncompleteManifest1) { + db_options_.write_dbid_to_manifest = std::get<0>(GetParam()); + PrepareManifest(nullptr, nullptr, &log_writer_); + // Only a subset of column families in the MANIFEST. + VersionEdit new_cf1; + new_cf1.AddColumnFamily(VersionSetTestBase::kColumnFamilyName1); + new_cf1.SetColumnFamily(1); + Status s; + { + std::string record; + new_cf1.EncodeTo(&record); + s = log_writer_->AddRecord(record); + ASSERT_OK(s); + } + log_writer_.reset(); + s = SetCurrentFile(env_, dbname_, 1, /*directory_to_fsync=*/nullptr); + ASSERT_OK(s); + + std::string manifest_path; + VerifyManifest(&manifest_path); + + bool read_only = std::get<1>(GetParam()); + const std::vector& cf_names = std::get<2>(GetParam()); + std::vector column_families; + for (const auto& cf_name : cf_names) { + column_families.emplace_back(cf_name, cf_options_); + } + std::string db_id; + bool has_missing_table_file = false; + s = versions_->TryRecoverFromOneManifest(manifest_path, column_families, + read_only, &db_id, + &has_missing_table_file); + auto iter = + std::find(cf_names.begin(), cf_names.end(), kDefaultColumnFamilyName); + if (iter == cf_names.end()) { + ASSERT_TRUE(s.IsInvalidArgument()); + } else { + ASSERT_TRUE(s.IsCorruption()); + } +} + +TEST_P(VersionSetTestEmptyDb, OpenFromInCompleteManifest2) { + db_options_.write_dbid_to_manifest = std::get<0>(GetParam()); + PrepareManifest(nullptr, nullptr, &log_writer_); + // Write all column families but no log_number, next_file_number and + // last_sequence. + const std::vector all_cf_names = { + kDefaultColumnFamilyName, kColumnFamilyName1, kColumnFamilyName2, + kColumnFamilyName3}; + uint32_t cf_id = 1; + Status s; + for (size_t i = 1; i != all_cf_names.size(); ++i) { + VersionEdit new_cf; + new_cf.AddColumnFamily(all_cf_names[i]); + new_cf.SetColumnFamily(cf_id++); + std::string record; + ASSERT_TRUE(new_cf.EncodeTo(&record)); + s = log_writer_->AddRecord(record); + ASSERT_OK(s); + } + log_writer_.reset(); + s = SetCurrentFile(env_, dbname_, 1, /*directory_to_fsync=*/nullptr); + ASSERT_OK(s); + + std::string manifest_path; + VerifyManifest(&manifest_path); + + bool read_only = std::get<1>(GetParam()); + const std::vector& cf_names = std::get<2>(GetParam()); + std::vector column_families; + for (const auto& cf_name : cf_names) { + column_families.emplace_back(cf_name, cf_options_); + } + std::string db_id; + bool has_missing_table_file = false; + s = versions_->TryRecoverFromOneManifest(manifest_path, column_families, + read_only, &db_id, + &has_missing_table_file); + auto iter = + std::find(cf_names.begin(), cf_names.end(), kDefaultColumnFamilyName); + if (iter == cf_names.end()) { + ASSERT_TRUE(s.IsInvalidArgument()); + } else { + ASSERT_TRUE(s.IsCorruption()); + } +} + +TEST_P(VersionSetTestEmptyDb, OpenManifestWithUnknownCF) { + db_options_.write_dbid_to_manifest = std::get<0>(GetParam()); + PrepareManifest(nullptr, nullptr, &log_writer_); + // Write all column families but no log_number, next_file_number and + // last_sequence. + const std::vector all_cf_names = { + kDefaultColumnFamilyName, kColumnFamilyName1, kColumnFamilyName2, + kColumnFamilyName3}; + uint32_t cf_id = 1; + Status s; + for (size_t i = 1; i != all_cf_names.size(); ++i) { + VersionEdit new_cf; + new_cf.AddColumnFamily(all_cf_names[i]); + new_cf.SetColumnFamily(cf_id++); + std::string record; + ASSERT_TRUE(new_cf.EncodeTo(&record)); + s = log_writer_->AddRecord(record); + ASSERT_OK(s); + } + { + VersionEdit tmp_edit; + tmp_edit.SetColumnFamily(4); + tmp_edit.SetLogNumber(0); + tmp_edit.SetNextFile(2); + tmp_edit.SetLastSequence(0); + std::string record; + ASSERT_TRUE(tmp_edit.EncodeTo(&record)); + s = log_writer_->AddRecord(record); + ASSERT_OK(s); + } + log_writer_.reset(); + s = SetCurrentFile(env_, dbname_, 1, /*directory_to_fsync=*/nullptr); + ASSERT_OK(s); + + std::string manifest_path; + VerifyManifest(&manifest_path); + + bool read_only = std::get<1>(GetParam()); + const std::vector& cf_names = std::get<2>(GetParam()); + std::vector column_families; + for (const auto& cf_name : cf_names) { + column_families.emplace_back(cf_name, cf_options_); + } + std::string db_id; + bool has_missing_table_file = false; + s = versions_->TryRecoverFromOneManifest(manifest_path, column_families, + read_only, &db_id, + &has_missing_table_file); + auto iter = + std::find(cf_names.begin(), cf_names.end(), kDefaultColumnFamilyName); + if (iter == cf_names.end()) { + ASSERT_TRUE(s.IsInvalidArgument()); + } else { + ASSERT_TRUE(s.IsCorruption()); + } +} + +TEST_P(VersionSetTestEmptyDb, OpenCompleteManifest) { + db_options_.write_dbid_to_manifest = std::get<0>(GetParam()); + PrepareManifest(nullptr, nullptr, &log_writer_); + // Write all column families but no log_number, next_file_number and + // last_sequence. + const std::vector all_cf_names = { + kDefaultColumnFamilyName, kColumnFamilyName1, kColumnFamilyName2, + kColumnFamilyName3}; + uint32_t cf_id = 1; + Status s; + for (size_t i = 1; i != all_cf_names.size(); ++i) { + VersionEdit new_cf; + new_cf.AddColumnFamily(all_cf_names[i]); + new_cf.SetColumnFamily(cf_id++); + std::string record; + ASSERT_TRUE(new_cf.EncodeTo(&record)); + s = log_writer_->AddRecord(record); + ASSERT_OK(s); + } + { + VersionEdit tmp_edit; + tmp_edit.SetLogNumber(0); + tmp_edit.SetNextFile(2); + tmp_edit.SetLastSequence(0); + std::string record; + ASSERT_TRUE(tmp_edit.EncodeTo(&record)); + s = log_writer_->AddRecord(record); + ASSERT_OK(s); + } + log_writer_.reset(); + s = SetCurrentFile(env_, dbname_, 1, /*directory_to_fsync=*/nullptr); + ASSERT_OK(s); + + std::string manifest_path; + VerifyManifest(&manifest_path); + + bool read_only = std::get<1>(GetParam()); + const std::vector& cf_names = std::get<2>(GetParam()); + std::vector column_families; + for (const auto& cf_name : cf_names) { + column_families.emplace_back(cf_name, cf_options_); + } + std::string db_id; + bool has_missing_table_file = false; + s = versions_->TryRecoverFromOneManifest(manifest_path, column_families, + read_only, &db_id, + &has_missing_table_file); + auto iter = + std::find(cf_names.begin(), cf_names.end(), kDefaultColumnFamilyName); + if (iter == cf_names.end()) { + ASSERT_TRUE(s.IsInvalidArgument()); + } else if (read_only) { + ASSERT_OK(s); + ASSERT_FALSE(has_missing_table_file); + } else if (cf_names.size() == all_cf_names.size()) { + ASSERT_OK(s); + ASSERT_FALSE(has_missing_table_file); + } else if (cf_names.size() < all_cf_names.size()) { + ASSERT_TRUE(s.IsInvalidArgument()); + } else { + ASSERT_OK(s); + ASSERT_FALSE(has_missing_table_file); + ColumnFamilyData* cfd = versions_->GetColumnFamilySet()->GetColumnFamily( + kUnknownColumnFamilyName); + ASSERT_EQ(nullptr, cfd); + } +} + +INSTANTIATE_TEST_CASE_P( + BestEffortRecovery, VersionSetTestEmptyDb, + testing::Combine( + /*write_dbid_to_manifest=*/testing::Bool(), + /*read_only=*/testing::Bool(), + /*cf_names=*/ + testing::Values( + std::vector(), + std::vector({kDefaultColumnFamilyName}), + std::vector({VersionSetTestBase::kColumnFamilyName1, + VersionSetTestBase::kColumnFamilyName2, + VersionSetTestBase::kColumnFamilyName3}), + std::vector({kDefaultColumnFamilyName, + VersionSetTestBase::kColumnFamilyName1}), + std::vector({kDefaultColumnFamilyName, + VersionSetTestBase::kColumnFamilyName1, + VersionSetTestBase::kColumnFamilyName2, + VersionSetTestBase::kColumnFamilyName3}), + std::vector( + {kDefaultColumnFamilyName, + VersionSetTestBase::kColumnFamilyName1, + VersionSetTestBase::kColumnFamilyName2, + VersionSetTestBase::kColumnFamilyName3, + VersionSetTestEmptyDb::kUnknownColumnFamilyName})))); + +class VersionSetTestMissingFiles : public VersionSetTestBase, + public testing::Test { + public: + VersionSetTestMissingFiles() + : VersionSetTestBase("version_set_test_missing_files"), + block_based_table_options_(), + table_factory_(std::make_shared( + block_based_table_options_)), + internal_comparator_( + std::make_shared(options_.comparator)) {} + + protected: + void PrepareManifest(std::vector* column_families, + SequenceNumber* last_seqno, + std::unique_ptr* log_writer) override { + assert(column_families != nullptr); + assert(last_seqno != nullptr); + assert(log_writer != nullptr); + const std::string manifest = DescriptorFileName(dbname_, 1); + std::unique_ptr file; + Status s = env_->NewWritableFile( + manifest, &file, env_->OptimizeForManifestWrite(env_options_)); + ASSERT_OK(s); + std::unique_ptr file_writer(new WritableFileWriter( + NewLegacyWritableFileWrapper(std::move(file)), manifest, env_options_)); + log_writer->reset(new log::Writer(std::move(file_writer), 0, false)); + VersionEdit new_db; + if (db_options_.write_dbid_to_manifest) { + std::unique_ptr impl(new DBImpl(DBOptions(), dbname_)); + std::string db_id; + impl->GetDbIdentityFromIdentityFile(&db_id); + new_db.SetDBId(db_id); + } + { + std::string record; + ASSERT_TRUE(new_db.EncodeTo(&record)); + s = (*log_writer)->AddRecord(record); + ASSERT_OK(s); + } + const std::vector cf_names = { + kDefaultColumnFamilyName, kColumnFamilyName1, kColumnFamilyName2, + kColumnFamilyName3}; + uint32_t cf_id = 1; // default cf id is 0 + cf_options_.table_factory = table_factory_; + for (const auto& cf_name : cf_names) { + column_families->emplace_back(cf_name, cf_options_); + if (cf_name == kDefaultColumnFamilyName) { + continue; + } + VersionEdit new_cf; + new_cf.AddColumnFamily(cf_name); + new_cf.SetColumnFamily(cf_id); + std::string record; + ASSERT_TRUE(new_cf.EncodeTo(&record)); + s = (*log_writer)->AddRecord(record); + ASSERT_OK(s); + + VersionEdit cf_files; + cf_files.SetColumnFamily(cf_id); + cf_files.SetLogNumber(0); + record.clear(); + ASSERT_TRUE(cf_files.EncodeTo(&record)); + s = (*log_writer)->AddRecord(record); + ASSERT_OK(s); + ++cf_id; + } + SequenceNumber seq = 2; + { + VersionEdit edit; + edit.SetNextFile(7); + edit.SetLastSequence(seq); + std::string record; + ASSERT_TRUE(edit.EncodeTo(&record)); + s = (*log_writer)->AddRecord(record); + ASSERT_OK(s); + } + *last_seqno = seq + 1; + } + + struct SstInfo { + uint64_t file_number; + std::string column_family; + std::string key; // the only key + int level = 0; + SstInfo(uint64_t file_num, const std::string& cf_name, + const std::string& _key) + : SstInfo(file_num, cf_name, _key, 0) {} + SstInfo(uint64_t file_num, const std::string& cf_name, + const std::string& _key, int lvl) + : file_number(file_num), + column_family(cf_name), + key(_key), + level(lvl) {} + }; + + // Create dummy sst, return their metadata. Note that only file name and size + // are used. + void CreateDummyTableFiles(const std::vector& file_infos, + std::vector* file_metas) { + assert(file_metas != nullptr); + for (const auto& info : file_infos) { + uint64_t file_num = info.file_number; + std::string fname = MakeTableFileName(dbname_, file_num); + std::unique_ptr file; + Status s = fs_->NewWritableFile(fname, FileOptions(), &file, nullptr); + ASSERT_OK(s); + std::unique_ptr fwriter( + new WritableFileWriter(std::move(file), fname, FileOptions(), env_)); + std::vector> + int_tbl_prop_collector_factories; + + std::unique_ptr builder(table_factory_->NewTableBuilder( + TableBuilderOptions( + immutable_cf_options_, mutable_cf_options_, *internal_comparator_, + &int_tbl_prop_collector_factories, kNoCompression, + /*_sample_for_compression=*/0, CompressionOptions(), + /*_skip_filters=*/false, info.column_family, info.level), + TablePropertiesCollectorFactory::Context::kUnknownColumnFamily, + fwriter.get())); + InternalKey ikey(info.key, 0, ValueType::kTypeValue); + builder->Add(ikey.Encode(), "value"); + ASSERT_OK(builder->Finish()); + fwriter->Flush(); + uint64_t file_size = 0; + s = fs_->GetFileSize(fname, IOOptions(), &file_size, nullptr); + ASSERT_OK(s); + ASSERT_NE(0, file_size); + FileMetaData meta; + meta = FileMetaData(file_num, /*file_path_id=*/0, file_size, ikey, ikey, + 0, 0, false, 0, 0, 0, kUnknownFileChecksum, + kUnknownFileChecksumFuncName); + file_metas->emplace_back(meta); + } + } + + // This method updates last_sequence_. + void WriteFileAdditionAndDeletionToManifest( + uint32_t cf, const std::vector>& added_files, + const std::vector>& deleted_files) { + VersionEdit edit; + edit.SetColumnFamily(cf); + for (const auto& elem : added_files) { + int level = elem.first; + edit.AddFile(level, elem.second); + } + for (const auto& elem : deleted_files) { + int level = elem.first; + edit.DeleteFile(level, elem.second); + } + edit.SetLastSequence(last_seqno_); + ++last_seqno_; + assert(log_writer_.get() != nullptr); + std::string record; + ASSERT_TRUE(edit.EncodeTo(&record)); + Status s = log_writer_->AddRecord(record); + ASSERT_OK(s); + } + + BlockBasedTableOptions block_based_table_options_; + std::shared_ptr table_factory_; + std::shared_ptr internal_comparator_; + std::vector column_families_; + SequenceNumber last_seqno_; + std::unique_ptr log_writer_; +}; + +TEST_F(VersionSetTestMissingFiles, ManifestFarBehindSst) { + std::vector existing_files = { + SstInfo(100, kDefaultColumnFamilyName, "a"), + SstInfo(102, kDefaultColumnFamilyName, "b"), + SstInfo(103, kDefaultColumnFamilyName, "c"), + SstInfo(107, kDefaultColumnFamilyName, "d"), + SstInfo(110, kDefaultColumnFamilyName, "e")}; + std::vector file_metas; + CreateDummyTableFiles(existing_files, &file_metas); + + PrepareManifest(&column_families_, &last_seqno_, &log_writer_); + std::vector> added_files; + for (uint64_t file_num = 10; file_num < 15; ++file_num) { + std::string smallest_ukey = "a"; + std::string largest_ukey = "b"; + InternalKey smallest_ikey(smallest_ukey, 1, ValueType::kTypeValue); + InternalKey largest_ikey(largest_ukey, 1, ValueType::kTypeValue); + FileMetaData meta = + FileMetaData(file_num, /*file_path_id=*/0, /*file_size=*/12, + smallest_ikey, largest_ikey, 0, 0, false, 0, 0, 0, + kUnknownFileChecksum, kUnknownFileChecksumFuncName); + added_files.emplace_back(0, meta); + } + WriteFileAdditionAndDeletionToManifest( + /*cf=*/0, added_files, std::vector>()); + std::vector> deleted_files; + deleted_files.emplace_back(0, 10); + WriteFileAdditionAndDeletionToManifest( + /*cf=*/0, std::vector>(), deleted_files); + log_writer_.reset(); + Status s = SetCurrentFile(env_, dbname_, 1, nullptr); + ASSERT_OK(s); + std::string manifest_path; + VerifyManifest(&manifest_path); + std::string db_id; + bool has_missing_table_file = false; + s = versions_->TryRecoverFromOneManifest(manifest_path, column_families_, + /*read_only=*/false, &db_id, + &has_missing_table_file); + ASSERT_OK(s); + ASSERT_TRUE(has_missing_table_file); + for (ColumnFamilyData* cfd : *(versions_->GetColumnFamilySet())) { + VersionStorageInfo* vstorage = cfd->current()->storage_info(); + const std::vector& files = vstorage->LevelFiles(0); + ASSERT_TRUE(files.empty()); + } +} + +TEST_F(VersionSetTestMissingFiles, ManifestAheadofSst) { + std::vector existing_files = { + SstInfo(100, kDefaultColumnFamilyName, "a"), + SstInfo(102, kDefaultColumnFamilyName, "b"), + SstInfo(103, kDefaultColumnFamilyName, "c"), + SstInfo(107, kDefaultColumnFamilyName, "d"), + SstInfo(110, kDefaultColumnFamilyName, "e")}; + std::vector file_metas; + CreateDummyTableFiles(existing_files, &file_metas); + + PrepareManifest(&column_families_, &last_seqno_, &log_writer_); + std::vector> added_files; + for (size_t i = 3; i != 5; ++i) { + added_files.emplace_back(0, file_metas[i]); + } + WriteFileAdditionAndDeletionToManifest( + /*cf=*/0, added_files, std::vector>()); + + added_files.clear(); + for (uint64_t file_num = 120; file_num < 130; ++file_num) { + std::string smallest_ukey = "a"; + std::string largest_ukey = "b"; + InternalKey smallest_ikey(smallest_ukey, 1, ValueType::kTypeValue); + InternalKey largest_ikey(largest_ukey, 1, ValueType::kTypeValue); + FileMetaData meta = + FileMetaData(file_num, /*file_path_id=*/0, /*file_size=*/12, + smallest_ikey, largest_ikey, 0, 0, false, 0, 0, 0, + kUnknownFileChecksum, kUnknownFileChecksumFuncName); + added_files.emplace_back(0, meta); + } + WriteFileAdditionAndDeletionToManifest( + /*cf=*/0, added_files, std::vector>()); + log_writer_.reset(); + Status s = SetCurrentFile(env_, dbname_, 1, nullptr); + ASSERT_OK(s); + std::string manifest_path; + VerifyManifest(&manifest_path); + std::string db_id; + bool has_missing_table_file = false; + s = versions_->TryRecoverFromOneManifest(manifest_path, column_families_, + /*read_only=*/false, &db_id, + &has_missing_table_file); + ASSERT_OK(s); + ASSERT_TRUE(has_missing_table_file); + for (ColumnFamilyData* cfd : *(versions_->GetColumnFamilySet())) { + VersionStorageInfo* vstorage = cfd->current()->storage_info(); + const std::vector& files = vstorage->LevelFiles(0); + if (cfd->GetName() == kDefaultColumnFamilyName) { + ASSERT_EQ(2, files.size()); + for (const auto* fmeta : files) { + if (fmeta->fd.GetNumber() != 107 && fmeta->fd.GetNumber() != 110) { + ASSERT_FALSE(true); + } + } + } else { + ASSERT_TRUE(files.empty()); + } + } +} + +TEST_F(VersionSetTestMissingFiles, NoFileMissing) { + std::vector existing_files = { + SstInfo(100, kDefaultColumnFamilyName, "a"), + SstInfo(102, kDefaultColumnFamilyName, "b"), + SstInfo(103, kDefaultColumnFamilyName, "c"), + SstInfo(107, kDefaultColumnFamilyName, "d"), + SstInfo(110, kDefaultColumnFamilyName, "e")}; + std::vector file_metas; + CreateDummyTableFiles(existing_files, &file_metas); + + PrepareManifest(&column_families_, &last_seqno_, &log_writer_); + std::vector> added_files; + for (const auto& meta : file_metas) { + added_files.emplace_back(0, meta); + } + WriteFileAdditionAndDeletionToManifest( + /*cf=*/0, added_files, std::vector>()); + std::vector> deleted_files; + deleted_files.emplace_back(/*level=*/0, 100); + WriteFileAdditionAndDeletionToManifest( + /*cf=*/0, std::vector>(), deleted_files); + log_writer_.reset(); + Status s = SetCurrentFile(env_, dbname_, 1, nullptr); + ASSERT_OK(s); + std::string manifest_path; + VerifyManifest(&manifest_path); + std::string db_id; + bool has_missing_table_file = false; + s = versions_->TryRecoverFromOneManifest(manifest_path, column_families_, + /*read_only=*/false, &db_id, + &has_missing_table_file); + ASSERT_OK(s); + ASSERT_FALSE(has_missing_table_file); + for (ColumnFamilyData* cfd : *(versions_->GetColumnFamilySet())) { + VersionStorageInfo* vstorage = cfd->current()->storage_info(); + const std::vector& files = vstorage->LevelFiles(0); + if (cfd->GetName() == kDefaultColumnFamilyName) { + ASSERT_EQ(existing_files.size() - deleted_files.size(), files.size()); + bool has_deleted_file = false; + for (const auto* fmeta : files) { + if (fmeta->fd.GetNumber() == 100) { + has_deleted_file = true; + break; + } + } + ASSERT_FALSE(has_deleted_file); + } else { + ASSERT_TRUE(files.empty()); + } + } +} + } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) { diff --git a/env/mock_env.cc b/env/mock_env.cc index ab249bbb8c..ea63cd16a8 100644 --- a/env/mock_env.cc +++ b/env/mock_env.cc @@ -10,6 +10,7 @@ #include "env/mock_env.h" #include #include +#include "file/filename.h" #include "port/sys_time.h" #include "util/cast_util.h" #include "util/murmurhash.h" @@ -747,17 +748,6 @@ Status MockEnv::CorruptBuffer(const std::string& fname) { return Status::OK(); } -std::string MockEnv::NormalizePath(const std::string path) { - std::string dst; - for (auto c : path) { - if (!dst.empty() && c == '/' && dst.back() == '/') { - continue; - } - dst.push_back(c); - } - return dst; -} - void MockEnv::FakeSleepForMicroseconds(int64_t micros) { fake_sleep_micros_.fetch_add(micros); } diff --git a/env/mock_env.h b/env/mock_env.h index 5cd9d499d4..1ed5c0b1f7 100644 --- a/env/mock_env.h +++ b/env/mock_env.h @@ -106,8 +106,6 @@ class MockEnv : public EnvWrapper { void FakeSleepForMicroseconds(int64_t micros); private: - std::string NormalizePath(const std::string path); - // Map from filenames to MemFile objects, representing a simple file system. typedef std::map FileSystem; port::Mutex mutex_; diff --git a/file/filename.cc b/file/filename.cc index a68e14c90f..07cc083692 100644 --- a/file/filename.cc +++ b/file/filename.cc @@ -453,4 +453,16 @@ Status GetInfoLogFiles(Env* env, const std::string& db_log_dir, return Status::OK(); } +std::string NormalizePath(const std::string& path) { + std::string dst; + for (auto c : path) { + if (!dst.empty() && c == kFilePathSeparator && + dst.back() == kFilePathSeparator) { + continue; + } + dst.push_back(c); + } + return dst; +} + } // namespace ROCKSDB_NAMESPACE diff --git a/file/filename.h b/file/filename.h index 201429cf5f..9a4be0a530 100644 --- a/file/filename.h +++ b/file/filename.h @@ -29,6 +29,12 @@ class Env; class Directory; class WritableFileWriter; +#ifdef OS_WIN +const char kFilePathSeparator = '\\'; +#else +const char kFilePathSeparator = '/'; +#endif + enum FileType { kLogFile, kDBLockFile, @@ -183,4 +189,6 @@ extern Status GetInfoLogFiles(Env* env, const std::string& db_log_dir, const std::string& dbname, std::string* parent_dir, std::vector* file_names); + +extern std::string NormalizePath(const std::string& path); } // namespace ROCKSDB_NAMESPACE diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index f5d44fb743..1976cdd1c6 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -1130,6 +1130,16 @@ struct DBOptions { // // Default: nullptr std::shared_ptr sst_file_checksum_func = nullptr; + + // By default, RocksDB recovery fails if any table file referenced in + // MANIFEST are missing after scanning the MANIFEST. + // Best-efforts recovery is another recovery mode that + // tries to restore the database to the most recent point in time without + // missing file. + // Currently not compatible with atomic flush. Furthermore, WAL files will + // not be used for recovery if best_efforts_recovery is true. + // Default: false + bool best_efforts_recovery = false; }; // Options to control the behavior of a database (passed to DB::Open) diff --git a/options/db_options.cc b/options/db_options.cc index 0dcc8ef79f..4788977863 100644 --- a/options/db_options.cc +++ b/options/db_options.cc @@ -95,7 +95,8 @@ ImmutableDBOptions::ImmutableDBOptions(const DBOptions& options) persist_stats_to_disk(options.persist_stats_to_disk), write_dbid_to_manifest(options.write_dbid_to_manifest), log_readahead_size(options.log_readahead_size), - sst_file_checksum_func(options.sst_file_checksum_func) { + sst_file_checksum_func(options.sst_file_checksum_func), + best_efforts_recovery(options.best_efforts_recovery) { } void ImmutableDBOptions::Dump(Logger* log) const { @@ -250,6 +251,8 @@ void ImmutableDBOptions::Dump(Logger* log) const { sst_file_checksum_func ? sst_file_checksum_func->Name() : kUnknownFileChecksumFuncName.c_str()); + ROCKS_LOG_HEADER(log, " Options.best_efforts_recovery: %d", + static_cast(best_efforts_recovery)); } MutableDBOptions::MutableDBOptions() diff --git a/options/db_options.h b/options/db_options.h index cba298a21f..3ab6b73eb1 100644 --- a/options/db_options.h +++ b/options/db_options.h @@ -88,6 +88,7 @@ struct ImmutableDBOptions { bool write_dbid_to_manifest; size_t log_readahead_size; std::shared_ptr sst_file_checksum_func; + bool best_efforts_recovery; }; struct MutableDBOptions { diff --git a/options/options_helper.cc b/options/options_helper.cc index bc36dbcfe7..3ebbcee4e2 100644 --- a/options/options_helper.cc +++ b/options/options_helper.cc @@ -145,6 +145,7 @@ DBOptions BuildDBOptions(const ImmutableDBOptions& immutable_db_options, immutable_db_options.avoid_unnecessary_blocking_io; options.log_readahead_size = immutable_db_options.log_readahead_size; options.sst_file_checksum_func = immutable_db_options.sst_file_checksum_func; + options.best_efforts_recovery = immutable_db_options.best_efforts_recovery; return options; } @@ -1681,6 +1682,9 @@ std::unordered_map {"log_readahead_size", {offsetof(struct DBOptions, log_readahead_size), OptionType::kSizeT, OptionVerificationType::kNormal, false, 0}}, + {"best_efforts_recovery", + {offsetof(struct DBOptions, best_efforts_recovery), + OptionType::kBoolean, OptionVerificationType::kNormal, false, 0}}, }; std::unordered_map diff --git a/options/options_settable_test.cc b/options/options_settable_test.cc index c5fee7b3e5..057d772bf3 100644 --- a/options/options_settable_test.cc +++ b/options/options_settable_test.cc @@ -303,7 +303,8 @@ TEST_F(OptionsSettableTest, DBOptionsAllFieldsSettable) { "atomic_flush=false;" "avoid_unnecessary_blocking_io=false;" "log_readahead_size=0;" - "write_dbid_to_manifest=false", + "write_dbid_to_manifest=false;" + "best_efforts_recovery=false", new_options)); ASSERT_EQ(unset_bytes_base, NumUnsetBytes(new_options_ptr, sizeof(DBOptions), diff --git a/src.mk b/src.mk index 89558cf961..dd32180515 100644 --- a/src.mk +++ b/src.mk @@ -59,6 +59,7 @@ LIB_SOURCES = \ db/trim_history_scheduler.cc \ db/version_builder.cc \ db/version_edit.cc \ + db/version_edit_handler.cc \ db/version_set.cc \ db/wal_manager.cc \ db/write_batch.cc \