diff --git a/db/db_follower_test.cc b/db/db_follower_test.cc index 86bf8cc7c5..febc5ae4a4 100644 --- a/db/db_follower_test.cc +++ b/db/db_follower_test.cc @@ -17,6 +17,7 @@ class DBFollowerTest : public DBTestBase { // Create the leader DB object DBFollowerTest() : DBTestBase("/db_follower_test", /*env_do_fsync*/ false) { follower_name_ = dbname_ + "/follower"; + db_parent_ = dbname_; Close(); Destroy(CurrentOptions()); EXPECT_EQ(env_->CreateDirIfMissing(dbname_), Status::OK()); @@ -27,17 +28,215 @@ class DBFollowerTest : public DBTestBase { ~DBFollowerTest() { follower_.reset(); EXPECT_EQ(DestroyDB(follower_name_, CurrentOptions()), Status::OK()); + Destroy(CurrentOptions()); + dbname_ = db_parent_; } protected: + class DBFollowerTestFS : public FileSystemWrapper { + public: + explicit DBFollowerTestFS(const std::shared_ptr& target) + : FileSystemWrapper(target), + cv_(&mutex_), + barrier_(false), + count_(0), + reinit_count_(0) {} + + const char* Name() const override { return "DBFollowerTestFS"; } + + IOStatus NewSequentialFile(const std::string& fname, + const FileOptions& file_opts, + std::unique_ptr* result, + IODebugContext* dbg = nullptr) override { + class DBFollowerTestSeqFile : public FSSequentialFileWrapper { + public: + DBFollowerTestSeqFile(DBFollowerTestFS* fs, + std::unique_ptr&& file, + uint64_t /*size*/) + : FSSequentialFileWrapper(file.get()), + fs_(fs), + file_(std::move(file)) {} + + IOStatus Read(size_t n, const IOOptions& options, Slice* result, + char* scratch, IODebugContext* dbg) override { + fs_->BarrierWait(); + return target()->Read(n, options, result, scratch, dbg); + } + + private: + DBFollowerTestFS* fs_; + std::unique_ptr file_; + }; + + std::unique_ptr file; + IOStatus s = target()->NewSequentialFile(fname, file_opts, &file, dbg); + + if (s.ok() && test::GetFileType(fname) == kDescriptorFile) { + uint64_t size = 0; + EXPECT_EQ(target()->GetFileSize(fname, IOOptions(), &size, nullptr), + IOStatus::OK()); + result->reset(new DBFollowerTestSeqFile(this, std::move(file), size)); + } else { + *result = std::move(file); + } + return s; + } + + void BarrierInit(int count) { + MutexLock l(&mutex_); + barrier_ = true; + count_ = count; + } + + void BarrierWait() { + MutexLock l(&mutex_); + if (!barrier_) { + return; + } + if (--count_ == 0) { + if (reinit_count_ > 0) { + count_ = reinit_count_; + reinit_count_ = 0; + } else { + barrier_ = false; + } + cv_.SignalAll(); + } else { + cv_.Wait(); + } + } + + void BarrierWaitAndReinit(int count) { + MutexLock l(&mutex_); + if (!barrier_) { + return; + } + reinit_count_ = count; + if (--count_ == 0) { + if (reinit_count_ > 0) { + count_ = reinit_count_; + reinit_count_ = 0; + } else { + barrier_ = false; + } + cv_.SignalAll(); + } else { + cv_.Wait(); + } + } + + private: + port::Mutex mutex_; + port::CondVar cv_; + bool barrier_; + int count_; + int reinit_count_; + }; + + class DBFollowerTestSstPartitioner : public SstPartitioner { + public: + explicit DBFollowerTestSstPartitioner(uint64_t max_keys) + : max_keys_(max_keys), num_keys_(0) {} + + const char* Name() const override { return "DBFollowerTestSstPartitioner"; } + + PartitionerResult ShouldPartition( + const PartitionerRequest& /*request*/) override { + if (++num_keys_ > max_keys_) { + num_keys_ = 0; + return PartitionerResult::kRequired; + } else { + return PartitionerResult::kNotRequired; + } + } + + bool CanDoTrivialMove(const Slice& /*smallest_user_key*/, + const Slice& /*largest_user_key*/) override { + return true; + } + + private: + uint64_t max_keys_; + uint64_t num_keys_; + }; + + class DBFollowerTestSstPartitionerFactory : public SstPartitionerFactory { + public: + explicit DBFollowerTestSstPartitionerFactory(uint64_t max_keys) + : max_keys_(max_keys) {} + + std::unique_ptr CreatePartitioner( + const SstPartitioner::Context& /*context*/) const override { + std::unique_ptr partitioner; + partitioner.reset(new DBFollowerTestSstPartitioner(max_keys_)); + return partitioner; + } + + const char* Name() const override { + return "DBFollowerTestSstPartitionerFactory"; + } + + private: + uint64_t max_keys_; + }; + Status OpenAsFollower() { - return DB::OpenAsFollower(CurrentOptions(), follower_name_, dbname_, - &follower_); + Options opts = CurrentOptions(); + if (!follower_env_) { + follower_env_ = NewCompositeEnv( + std::make_shared(env_->GetFileSystem())); + } + opts.env = follower_env_.get(); + opts.follower_refresh_catchup_period_ms = 100; + return DB::OpenAsFollower(opts, follower_name_, dbname_, &follower_); } + + std::string FollowerGet(const std::string& k) { + ReadOptions options; + options.verify_checksums = true; + std::string result; + Status s = follower()->Get(options, k, &result); + if (s.IsNotFound()) { + result = "NOT_FOUND"; + } else if (!s.ok()) { + result = s.ToString(); + } + return result; + } + DB* follower() { return follower_.get(); } + DBFollowerTestFS* follower_fs() { + return static_cast(follower_env_->GetFileSystem().get()); + } + + void CheckDirs() { + std::vector db_children; + std::vector follower_children; + EXPECT_OK(env_->GetChildren(dbname_, &db_children)); + EXPECT_OK(env_->GetChildren(follower_name_, &follower_children)); + + std::set db_filenums; + std::set follower_filenums; + for (auto& name : db_children) { + if (test::GetFileType(name) != kTableFile) { + continue; + } + db_filenums.insert(test::GetFileNumber(name)); + } + for (auto& name : follower_children) { + if (test::GetFileType(name) != kTableFile) { + continue; + } + follower_filenums.insert(test::GetFileNumber(name)); + } + db_filenums.merge(follower_filenums); + EXPECT_EQ(follower_filenums.size(), db_filenums.size()); + } private: std::string follower_name_; + std::string db_parent_; + std::unique_ptr follower_env_; std::unique_ptr follower_; }; @@ -51,8 +250,273 @@ TEST_F(DBFollowerTest, Basic) { std::string val; ASSERT_OK(follower()->Get(ReadOptions(), "k1", &val)); ASSERT_EQ(val, "v1"); + CheckDirs(); } +TEST_F(DBFollowerTest, Flush) { + SyncPoint::GetInstance()->LoadDependency({ + {"DBImplFollower::TryCatchupWithLeader:Begin1", "Leader::Start"}, + {"Leader::Done", "DBImplFollower::TryCatchupWithLeader:Begin2"}, + {"DBImplFollower::TryCatchupWithLeader:End", "Follower::WaitForCatchup"}, + }); + SyncPoint::GetInstance()->EnableProcessing(); + + ASSERT_OK(OpenAsFollower()); + TEST_SYNC_POINT("Leader::Start"); + ASSERT_OK(Put("k1", "v1")); + ASSERT_OK(Flush()); + TEST_SYNC_POINT("Leader::Done"); + + TEST_SYNC_POINT("Follower::WaitForCatchup"); + std::string val; + ASSERT_OK(follower()->Get(ReadOptions(), "k1", &val)); + ASSERT_EQ(val, "v1"); + CheckDirs(); + + SyncPoint::GetInstance()->DisableProcessing(); +} + +// This test creates 4 L0 files, immediately followed by a compaction to L1. +// The follower replays the 4 flush records from the MANIFEST unsuccessfully, +// and then successfully recovers a Version from the compaction record +TEST_F(DBFollowerTest, RetryCatchup) { + Options opts = CurrentOptions(); + opts.disable_auto_compactions = true; + Reopen(opts); + + ASSERT_OK(OpenAsFollower()); + SyncPoint::GetInstance()->LoadDependency({ + {"DBImplFollower::TryCatchupWithLeader:Begin1", "Leader::Start"}, + {"DBImpl::BackgroundCompaction:Start", + "DBImplFollower::TryCatchupWithLeader:Begin2"}, + {"VersionEditHandlerPointInTime::MaybeCreateVersion:Begin1", + "DBImpl::BackgroundCompaction:BeforeCompaction"}, + {"DBImpl::BackgroundCallCompaction:PurgedObsoleteFiles", + "VersionEditHandlerPointInTime::MaybeCreateVersion:Begin2"}, + {"DBImplFollower::TryCatchupWithLeader:End", "Follower::WaitForCatchup"}, + }); + SyncPoint::GetInstance()->EnableProcessing(); + TEST_SYNC_POINT("Leader::Start"); + ASSERT_OK(Put("k1", "v1")); + ASSERT_OK(Flush()); + ASSERT_OK(Put("k1", "v2")); + ASSERT_OK(Flush()); + ASSERT_OK(Put("k1", "v3")); + ASSERT_OK(Flush()); + ASSERT_OK(Put("k1", "v4")); + ASSERT_OK(Flush()); + + ASSERT_OK(dbfull()->TEST_CompactRange(0, nullptr, nullptr, nullptr, true)); + + TEST_SYNC_POINT("Follower::WaitForCatchup"); + ASSERT_EQ(FollowerGet("k1"), "v4"); + CheckDirs(); + + SyncPoint::GetInstance()->ClearAllCallBacks(); + SyncPoint::GetInstance()->DisableProcessing(); +} + +// This test validates the same as the previous test, except there is a +// MANIFEST rollover between the flushes and compaction. The follower +// does not switch to a new MANIFEST in ReadAndApply. So it would require +// another round of refresh before catching up. +TEST_F(DBFollowerTest, RetryCatchupManifestRollover) { + Options opts = CurrentOptions(); + opts.disable_auto_compactions = true; + Reopen(opts); + + ASSERT_OK(Put("k1", "v1")); + ASSERT_OK(Flush()); + ASSERT_OK(Put("k1", "v2")); + ASSERT_OK(Flush()); + ASSERT_OK(Put("k1", "v3")); + ASSERT_OK(Flush()); + ASSERT_OK(OpenAsFollower()); + SyncPoint::GetInstance()->LoadDependency({ + {"DBImplFollower::TryCatchupWithLeader:Begin1", "Leader::Start"}, + {"Leader::Flushed", "DBImplFollower::TryCatchupWithLeader:Begin2"}, + {"VersionEditHandlerPointInTime::MaybeCreateVersion:Begin1", + "Leader::Done"}, + {"DBImpl::BackgroundCallCompaction:PurgedObsoleteFiles", + "VersionEditHandlerPointInTime::MaybeCreateVersion:Begin2"}, + {"DBImplFollower::TryCatchupWithLeader:End", + "Follower::WaitForCatchup:1"}, + }); + SyncPoint::GetInstance()->EnableProcessing(); + TEST_SYNC_POINT("Leader::Start"); + ASSERT_OK(Put("k1", "v4")); + ASSERT_OK(Flush()); + + TEST_SYNC_POINT("Leader::Flushed"); + TEST_SYNC_POINT("Leader::Done"); + Reopen(opts); + ASSERT_OK(dbfull()->TEST_CompactRange(0, nullptr, nullptr, nullptr, true)); + + TEST_SYNC_POINT("Follower::WaitForCatchup:1"); + SyncPoint::GetInstance()->LoadDependency({ + {"DBImplFollower::TryCatchupWithLeader:End", + "Follower::WaitForCatchup:2"}, + }); + TEST_SYNC_POINT("Follower::WaitForCatchup:2"); + ASSERT_EQ(FollowerGet("k1"), "v4"); + + SyncPoint::GetInstance()->ClearAllCallBacks(); + SyncPoint::GetInstance()->DisableProcessing(); +} + +// This test creates 4 L0 files and compacts them. The follower, during catchup, +// successfully instantiates 4 Versions corresponding to the 4 files (but +// donesn't install them yet), followed by deleting those 4 and adding a new +// file from compaction. The test verifies that the 4 L0 files are deleted +// correctly by the follower. +// We use teh Barrier* functions to ensure that the follower first sees the 4 +// L0 files and is able to link them, and then sees the compaction that +// obsoletes those L0 files (so those L0 files are intermediates that it has +// to explicitly delete). Suppose we don't have any barriers, its possible +// the follower reads the L0 records and compaction records from the MANIFEST +// in one read, which means those L0 files would have already been deleted +// by the leader and the follower cannot link to them. +TEST_F(DBFollowerTest, IntermediateObsoleteFiles) { + Options opts = CurrentOptions(); + opts.disable_auto_compactions = true; + Reopen(opts); + ASSERT_OK(OpenAsFollower()); + + follower_fs()->BarrierInit(2); + ASSERT_OK(Put("k1", "v1")); + ASSERT_OK(Flush()); + ASSERT_OK(Put("k1", "v2")); + ASSERT_OK(Flush()); + ASSERT_OK(Put("k1", "v3")); + ASSERT_OK(Flush()); + ASSERT_OK(Put("k1", "v4")); + ASSERT_OK(Flush()); + follower_fs()->BarrierWaitAndReinit(2); + + ASSERT_OK(dbfull()->TEST_CompactRange(0, nullptr, nullptr, nullptr, true)); + follower_fs()->BarrierWait(); + + SyncPoint::GetInstance()->LoadDependency({ + {"DBImplFollower::TryCatchupWithLeader:End", + "Follower::WaitForCatchup:1"}, + }); + SyncPoint::GetInstance()->EnableProcessing(); + TEST_SYNC_POINT("Follower::WaitForCatchup:1"); + CheckDirs(); + ASSERT_EQ(FollowerGet("k1"), "v4"); +} + +// This test verifies a scenario where the follower can recover a Version +// partially (i.e some of the additions cannot be found), and the files +// that are found are obsoleted by a subsequent VersionEdit. +TEST_F(DBFollowerTest, PartialVersionRecovery) { + Options opts = CurrentOptions(); + opts.disable_auto_compactions = true; + opts.sst_partitioner_factory = + std::make_shared(1); + Reopen(opts); + + ASSERT_OK(Put("k1", "v1")); + ASSERT_OK(Put("k2", "v1")); + ASSERT_OK(Put("k3", "v1")); + ASSERT_OK(Flush()); + MoveFilesToLevel(2); + + ASSERT_OK(Put("k1", "v2")); + ASSERT_OK(Flush()); + ASSERT_OK(Put("k3", "v2")); + ASSERT_OK(Flush()); + MoveFilesToLevel(1); + + ASSERT_OK(OpenAsFollower()); + ASSERT_OK(dbfull()->SetOptions(dbfull()->DefaultColumnFamily(), + {{"max_compaction_bytes", "1"}})); + + follower_fs()->BarrierInit(2); + Slice key("k1"); + ASSERT_OK(dbfull()->TEST_CompactRange(1, &key, &key, nullptr, true)); + + follower_fs()->BarrierWaitAndReinit(2); + + // The second compaction input overlaps the previous compaction outputs + // by one file. This file is never added to VersionStorageInfo since it + // was added and deleted before the catch up completes. We later verify that + // the follower correctly deleted this file. + key = Slice("k3"); + ASSERT_OK(dbfull()->TEST_CompactRange(1, &key, &key, nullptr, true)); + follower_fs()->BarrierWait(); + + SyncPoint::GetInstance()->LoadDependency({ + {"DBImplFollower::TryCatchupWithLeader:End", + "Follower::WaitForCatchup:1"}, + }); + SyncPoint::GetInstance()->EnableProcessing(); + TEST_SYNC_POINT("Follower::WaitForCatchup:1"); + CheckDirs(); + ASSERT_EQ(FollowerGet("k1"), "v2"); + ASSERT_EQ(FollowerGet("k2"), "v1"); + ASSERT_EQ(FollowerGet("k3"), "v2"); + SyncPoint::GetInstance()->DisableProcessing(); +} + +// This test verifies a scenario similar to the PartialVersionRecovery, except +// with a MANIFEST rollover in between. When there is a rollover, the +// follower's attempt ends without installing a new Version. The next catch up +// attempt will recover a full Version. +TEST_F(DBFollowerTest, PartialVersionRecoveryWithRollover) { + Options opts = CurrentOptions(); + opts.disable_auto_compactions = true; + opts.sst_partitioner_factory = + std::make_shared(1); + Reopen(opts); + + ASSERT_OK(Put("k1", "v1")); + ASSERT_OK(Put("k2", "v1")); + ASSERT_OK(Put("k3", "v1")); + ASSERT_OK(Flush()); + MoveFilesToLevel(2); + + ASSERT_OK(Put("k1", "v2")); + ASSERT_OK(Flush()); + ASSERT_OK(Put("k3", "v2")); + ASSERT_OK(Flush()); + MoveFilesToLevel(1); + + opts.max_compaction_bytes = 1; + Reopen(opts); + + ASSERT_OK(OpenAsFollower()); + + follower_fs()->BarrierInit(2); + Slice key("k1"); + ASSERT_OK(dbfull()->TEST_CompactRange(1, &key, &key, nullptr, true)); + + follower_fs()->BarrierWaitAndReinit(2); + Reopen(opts); + key = Slice("k3"); + ASSERT_OK(dbfull()->TEST_CompactRange(1, &key, &key, nullptr, true)); + follower_fs()->BarrierWait(); + + SyncPoint::GetInstance()->LoadDependency({ + {"DBImplFollower::TryCatchupWithLeader:Begin1", + "Follower::WaitForCatchup:1"}, + {"Follower::WaitForCatchup:2", + "DBImplFollower::TryCatchupWithLeader:Begin2"}, + }); + SyncPoint::GetInstance()->EnableProcessing(); + TEST_SYNC_POINT("Follower::WaitForCatchup:1"); + TEST_SYNC_POINT("Follower::WaitForCatchup:2"); + SyncPoint::GetInstance()->LoadDependency({ + {"DBImplFollower::TryCatchupWithLeader:End", + "Follower::WaitForCatchup:3"}, + }); + TEST_SYNC_POINT("Follower::WaitForCatchup:3"); + CheckDirs(); + ASSERT_EQ(FollowerGet("k1"), "v2"); + ASSERT_EQ(FollowerGet("k2"), "v1"); + ASSERT_EQ(FollowerGet("k3"), "v2"); + SyncPoint::GetInstance()->DisableProcessing(); +} #endif } // namespace ROCKSDB_NAMESPACE diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 8192269ed2..40acbc96f7 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -1648,6 +1648,7 @@ class DBImpl : public DB { friend class ForwardIterator; friend struct SuperVersion; friend class CompactedDBImpl; + friend class DBImplFollower; #ifndef NDEBUG friend class DBTest_ConcurrentFlushWAL_Test; friend class DBTest_MixedSlowdownOptionsStop_Test; diff --git a/db/db_impl/db_impl_follower.cc b/db/db_impl/db_impl_follower.cc index 8d21f530cd..a8c736f1bf 100644 --- a/db/db_impl/db_impl_follower.cc +++ b/db/db_impl/db_impl_follower.cc @@ -95,17 +95,28 @@ Status DBImplFollower::TryCatchUpWithLeader() { assert(versions_.get() != nullptr); assert(manifest_reader_.get() != nullptr); Status s; + + TEST_SYNC_POINT("DBImplFollower::TryCatchupWithLeader:Begin1"); + TEST_SYNC_POINT("DBImplFollower::TryCatchupWithLeader:Begin2"); // read the manifest and apply new changes to the follower instance std::unordered_set cfds_changed; JobContext job_context(0, true /*create_superversion*/); { InstrumentedMutexLock lock_guard(&mutex_); + std::vector files_to_delete; s = static_cast_with_check(versions_.get()) ->ReadAndApply(&mutex_, &manifest_reader_, - manifest_reader_status_.get(), &cfds_changed); + manifest_reader_status_.get(), &cfds_changed, + &files_to_delete); + ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem_); + pending_outputs_inserted_elem_.reset(new std::list::iterator( + CaptureCurrentFileNumberInPendingOutputs())); ROCKS_LOG_INFO(immutable_db_options_.info_log, "Last sequence is %" PRIu64, static_cast(versions_->LastSequence())); + ROCKS_LOG_INFO( + immutable_db_options_.info_log, "Next file number is %" PRIu64, + static_cast(versions_->current_next_file_number())); for (ColumnFamilyData* cfd : cfds_changed) { if (cfd->IsDropped()) { ROCKS_LOG_DEBUG(immutable_db_options_.info_log, "[%s] is dropped\n", @@ -147,9 +158,33 @@ Status DBImplFollower::TryCatchUpWithLeader() { sv_context.NewSuperVersion(); } } + + for (auto& file : files_to_delete) { + IOStatus io_s = fs_->DeleteFile(file, IOOptions(), nullptr); + if (!io_s.ok()) { + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "Cannot delete file %s: %s", file.c_str(), + io_s.ToString().c_str()); + } + } } job_context.Clean(); + // Cleanup unused, obsolete files. + JobContext purge_files_job_context(0); + { + InstrumentedMutexLock lock_guard(&mutex_); + // Currently, follower instance does not create any database files, thus + // is unnecessary for the follower to force full scan. + FindObsoleteFiles(&purge_files_job_context, /*force=*/false); + } + if (purge_files_job_context.HaveSomethingToDelete()) { + PurgeObsoleteFiles(purge_files_job_context); + } + purge_files_job_context.Clean(); + + TEST_SYNC_POINT("DBImplFollower::TryCatchupWithLeader:End"); + return s; } @@ -199,6 +234,8 @@ Status DBImplFollower::Close() { catch_up_thread_.reset(); } + ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem_); + return DBImpl::Close(); } diff --git a/db/db_impl/db_impl_follower.h b/db/db_impl/db_impl_follower.h index 60992c111e..374c60d5c6 100644 --- a/db/db_impl/db_impl_follower.h +++ b/db/db_impl/db_impl_follower.h @@ -27,7 +27,7 @@ class DBImplFollower : public DBImplSecondary { bool OwnTablesAndLogs() const override { // TODO: Change this to true once we've properly implemented file // deletion for the read scaling case - return false; + return true; } Status Recover(const std::vector& column_families, @@ -49,5 +49,6 @@ class DBImplFollower : public DBImplSecondary { std::string src_path_; port::Mutex mu_; port::CondVar cv_; + std::unique_ptr::iterator> pending_outputs_inserted_elem_; }; } // namespace ROCKSDB_NAMESPACE diff --git a/db/db_impl/db_impl_secondary.cc b/db/db_impl/db_impl_secondary.cc index a5509c4f12..92944d1181 100644 --- a/db/db_impl/db_impl_secondary.cc +++ b/db/db_impl/db_impl_secondary.cc @@ -680,7 +680,8 @@ Status DBImplSecondary::TryCatchUpWithPrimary() { InstrumentedMutexLock lock_guard(&mutex_); s = static_cast_with_check(versions_.get()) ->ReadAndApply(&mutex_, &manifest_reader_, - manifest_reader_status_.get(), &cfds_changed); + manifest_reader_status_.get(), &cfds_changed, + /*files_to_delete=*/nullptr); ROCKS_LOG_INFO(immutable_db_options_.info_log, "Last sequence is %" PRIu64, static_cast(versions_->LastSequence())); diff --git a/db/version_edit_handler.cc b/db/version_edit_handler.cc index 42bdcd3c33..3284768046 100644 --- a/db/version_edit_handler.cc +++ b/db/version_edit_handler.cc @@ -152,7 +152,7 @@ Status FileChecksumRetriever::ApplyVersionEdit(VersionEdit& edit, VersionEditHandler::VersionEditHandler( bool read_only, std::vector column_families, - VersionSet* version_set, bool track_missing_files, + VersionSet* version_set, bool track_found_and_missing_files, bool no_error_if_files_missing, const std::shared_ptr& io_tracer, const ReadOptions& read_options, bool skip_load_table_files, EpochNumberRequirement epoch_number_requirement) @@ -160,7 +160,7 @@ VersionEditHandler::VersionEditHandler( read_only_(read_only), column_families_(std::move(column_families)), version_set_(version_set), - track_missing_files_(track_missing_files), + track_found_and_missing_files_(track_found_and_missing_files), no_error_if_files_missing_(no_error_if_files_missing), io_tracer_(io_tracer), skip_load_table_files_(skip_load_table_files), @@ -500,7 +500,8 @@ ColumnFamilyData* VersionEditHandler::CreateCfAndInit( assert(builders_.find(cf_id) == builders_.end()); builders_.emplace(cf_id, VersionBuilderUPtr(new BaseReferencedVersionBuilder(cfd))); - if (track_missing_files_) { + if (track_found_and_missing_files_) { + cf_to_found_files_.emplace(cf_id, std::unordered_set()); cf_to_missing_files_.emplace(cf_id, std::unordered_set()); cf_to_missing_blob_files_high_.emplace(cf_id, kInvalidBlobFileNumber); } @@ -513,7 +514,11 @@ ColumnFamilyData* VersionEditHandler::DestroyCfAndCleanup( auto builder_iter = builders_.find(cf_id); assert(builder_iter != builders_.end()); builders_.erase(builder_iter); - if (track_missing_files_) { + if (track_found_and_missing_files_) { + auto found_files_iter = cf_to_found_files_.find(cf_id); + assert(found_files_iter != cf_to_found_files_.end()); + cf_to_found_files_.erase(found_files_iter); + auto missing_files_iter = cf_to_missing_files_.find(cf_id); assert(missing_files_iter != cf_to_missing_files_.end()); cf_to_missing_files_.erase(missing_files_iter); @@ -729,7 +734,7 @@ VersionEditHandlerPointInTime::VersionEditHandlerPointInTime( const ReadOptions& read_options, EpochNumberRequirement epoch_number_requirement) : VersionEditHandler(read_only, column_families, version_set, - /*track_missing_files=*/true, + /*track_found_and_missing_files=*/true, /*no_error_if_files_missing=*/true, io_tracer, read_options, epoch_number_requirement) {} @@ -824,6 +829,12 @@ void VersionEditHandlerPointInTime::CheckIterationResult( version_set_->AppendVersion(cfd, v_iter->second); versions_.erase(v_iter); + // Let's clear found_files, since any files in that are part of the + // installed Version. Any files that got obsoleted would have already + // been moved to intermediate_files_ + auto found_files_iter = cf_to_found_files_.find(cfd->GetID()); + assert(found_files_iter != cf_to_found_files_.end()); + found_files_iter->second.clear(); } } } else { @@ -854,10 +865,16 @@ ColumnFamilyData* VersionEditHandlerPointInTime::DestroyCfAndCleanup( Status VersionEditHandlerPointInTime::MaybeCreateVersion( const VersionEdit& edit, ColumnFamilyData* cfd, bool force_create_version) { + TEST_SYNC_POINT("VersionEditHandlerPointInTime::MaybeCreateVersion:Begin1"); + TEST_SYNC_POINT("VersionEditHandlerPointInTime::MaybeCreateVersion:Begin2"); assert(cfd != nullptr); if (!force_create_version) { assert(edit.GetColumnFamily() == cfd->GetID()); } + auto found_files_iter = cf_to_found_files_.find(cfd->GetID()); + assert(found_files_iter != cf_to_found_files_.end()); + std::unordered_set& found_files = found_files_iter->second; + auto missing_files_iter = cf_to_missing_files_.find(cfd->GetID()); assert(missing_files_iter != cf_to_missing_files_.end()); std::unordered_set& missing_files = missing_files_iter->second; @@ -889,6 +906,18 @@ Status VersionEditHandlerPointInTime::MaybeCreateVersion( auto fiter = missing_files.find(file_num); if (fiter != missing_files.end()) { missing_files.erase(fiter); + } else { + fiter = found_files.find(file_num); + // Only mark new files added during this catchup attempt for deletion. + // These files were never installed in VersionStorageInfo. + // Already referenced files that are deleted by a VersionEdit will + // be added to the VersionStorageInfo's obsolete files when the old + // version is dereferenced. + if (fiter != found_files.end()) { + intermediate_files_.emplace_back( + MakeTableFileName(cfd->ioptions()->cf_paths[0].path, file_num)); + found_files.erase(fiter); + } } } @@ -904,9 +933,14 @@ Status VersionEditHandlerPointInTime::MaybeCreateVersion( s = VerifyFile(cfd, fpath, level, meta); if (s.IsPathNotFound() || s.IsNotFound() || s.IsCorruption()) { missing_files.insert(file_num); + if (s.IsCorruption()) { + found_files.insert(file_num); + } s = Status::OK(); } else if (!s.ok()) { break; + } else { + found_files.insert(file_num); } } diff --git a/db/version_edit_handler.h b/db/version_edit_handler.h index 4caa9c0898..c918c6ec98 100644 --- a/db/version_edit_handler.h +++ b/db/version_edit_handler.h @@ -104,7 +104,7 @@ using VersionBuilderUPtr = std::unique_ptr; // To use this class and its subclasses, // 1. Create an object of VersionEditHandler or its subclasses. // VersionEditHandler handler(read_only, column_families, version_set, -// track_missing_files, +// track_found_and_missing_files, // no_error_if_files_missing); // 2. Status s = handler.Iterate(reader, &db_id); // 3. Check s and handle possible errors. @@ -116,16 +116,17 @@ class VersionEditHandler : public VersionEditHandlerBase { explicit VersionEditHandler( bool read_only, const std::vector& column_families, - VersionSet* version_set, bool track_missing_files, + VersionSet* version_set, bool track_found_and_missing_files, bool no_error_if_files_missing, const std::shared_ptr& io_tracer, const ReadOptions& read_options, EpochNumberRequirement epoch_number_requirement = EpochNumberRequirement::kMustPresent) - : VersionEditHandler( - read_only, column_families, version_set, track_missing_files, - no_error_if_files_missing, io_tracer, read_options, - /*skip_load_table_files=*/false, epoch_number_requirement) {} + : VersionEditHandler(read_only, column_families, version_set, + track_found_and_missing_files, + no_error_if_files_missing, io_tracer, read_options, + /*skip_load_table_files=*/false, + epoch_number_requirement) {} ~VersionEditHandler() override {} @@ -144,7 +145,7 @@ class VersionEditHandler : public VersionEditHandlerBase { protected: explicit VersionEditHandler( bool read_only, std::vector column_families, - VersionSet* version_set, bool track_missing_files, + VersionSet* version_set, bool track_found_and_missing_files, bool no_error_if_files_missing, const std::shared_ptr& io_tracer, const ReadOptions& read_options, bool skip_load_table_files, @@ -195,7 +196,8 @@ class VersionEditHandler : public VersionEditHandlerBase { // by subsequent manifest records, Recover() will return failure status. std::unordered_map column_families_not_found_; VersionEditParams version_edit_params_; - const bool track_missing_files_; + const bool track_found_and_missing_files_; + std::unordered_map> cf_to_found_files_; std::unordered_map> cf_to_missing_files_; std::unordered_map cf_to_missing_blob_files_high_; @@ -273,6 +275,8 @@ class VersionEditHandlerPointInTime : public VersionEditHandler { bool in_atomic_group_ = false; + std::vector intermediate_files_; + private: bool AtomicUpdateVersionsCompleted(); bool AtomicUpdateVersionsContains(uint32_t cfid); @@ -310,6 +314,10 @@ class ManifestTailer : public VersionEditHandlerPointInTime { return cfds_changed_; } + std::vector& GetIntermediateFiles() { + return intermediate_files_; + } + protected: Status Initialize() override; @@ -342,7 +350,7 @@ class DumpManifestHandler : public VersionEditHandler { bool json) : VersionEditHandler( /*read_only=*/true, column_families, version_set, - /*track_missing_files=*/false, + /*track_found_and_missing_files=*/false, /*no_error_if_files_missing=*/false, io_tracer, read_options, /*skip_load_table_files=*/true), verbose_(verbose), diff --git a/db/version_set.cc b/db/version_set.cc index ffcf210dc4..f8dfff5bd8 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -6063,8 +6063,8 @@ Status VersionSet::Recover( true /* checksum */, 0 /* log_number */); VersionEditHandler handler( read_only, column_families, const_cast(this), - /*track_missing_files=*/false, no_error_if_files_missing, io_tracer_, - read_options, EpochNumberRequirement::kMightMissing); + /*track_found_and_missing_files=*/false, no_error_if_files_missing, + io_tracer_, read_options, EpochNumberRequirement::kMightMissing); handler.Iterate(reader, &log_read_status); s = handler.status(); if (s.ok()) { @@ -7439,7 +7439,8 @@ Status ReactiveVersionSet::ReadAndApply( InstrumentedMutex* mu, std::unique_ptr* manifest_reader, Status* manifest_read_status, - std::unordered_set* cfds_changed) { + std::unordered_set* cfds_changed, + std::vector* files_to_delete) { assert(manifest_reader != nullptr); assert(cfds_changed != nullptr); mu->AssertHeld(); @@ -7456,6 +7457,9 @@ Status ReactiveVersionSet::ReadAndApply( if (s.ok()) { *cfds_changed = std::move(manifest_tailer_->GetUpdatedColumnFamilies()); } + if (files_to_delete) { + *files_to_delete = std::move(manifest_tailer_->GetIntermediateFiles()); + } return s; } diff --git a/db/version_set.h b/db/version_set.h index ec98c77402..ba86878d40 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -1741,7 +1741,8 @@ class ReactiveVersionSet : public VersionSet { InstrumentedMutex* mu, std::unique_ptr* manifest_reader, Status* manifest_read_status, - std::unordered_set* cfds_changed); + std::unordered_set* cfds_changed, + std::vector* files_to_delete); Status Recover(const std::vector& column_families, std::unique_ptr* manifest_reader, diff --git a/db/version_set_test.cc b/db/version_set_test.cc index f6a983d6b2..d4b748db7b 100644 --- a/db/version_set_test.cc +++ b/db/version_set_test.cc @@ -2742,7 +2742,8 @@ TEST_F(VersionSetAtomicGroupTest, std::unordered_set cfds_changed; mu.Lock(); EXPECT_OK(reactive_versions_->ReadAndApply( - &mu, &manifest_reader, manifest_reader_status.get(), &cfds_changed)); + &mu, &manifest_reader, manifest_reader_status.get(), &cfds_changed, + /*files_to_delete=*/nullptr)); mu.Unlock(); EXPECT_TRUE(first_in_atomic_group_); EXPECT_TRUE(last_in_atomic_group_); @@ -2797,7 +2798,8 @@ TEST_F(VersionSetAtomicGroupTest, std::unordered_set cfds_changed; mu.Lock(); EXPECT_OK(reactive_versions_->ReadAndApply( - &mu, &manifest_reader, manifest_reader_status.get(), &cfds_changed)); + &mu, &manifest_reader, manifest_reader_status.get(), &cfds_changed, + /*files_to_delete=*/nullptr)); mu.Unlock(); // Reactive version set should be empty now. EXPECT_TRUE(reactive_versions_->TEST_read_edits_in_atomic_group() == 0); @@ -2826,7 +2828,8 @@ TEST_F(VersionSetAtomicGroupTest, std::unordered_set cfds_changed; mu.Lock(); EXPECT_OK(reactive_versions_->ReadAndApply( - &mu, &manifest_reader, manifest_reader_status.get(), &cfds_changed)); + &mu, &manifest_reader, manifest_reader_status.get(), &cfds_changed, + /*files_to_delete=*/nullptr)); mu.Unlock(); EXPECT_TRUE(first_in_atomic_group_); EXPECT_FALSE(last_in_atomic_group_); @@ -2882,7 +2885,8 @@ TEST_F(VersionSetAtomicGroupTest, AddNewEditsToLog(kAtomicGroupSize); mu.Lock(); EXPECT_NOK(reactive_versions_->ReadAndApply( - &mu, &manifest_reader, manifest_reader_status.get(), &cfds_changed)); + &mu, &manifest_reader, manifest_reader_status.get(), &cfds_changed, + /*files_to_delete=*/nullptr)); mu.Unlock(); EXPECT_EQ(edits_[kAtomicGroupSize / 2].DebugString(), corrupted_edit_.DebugString()); @@ -2932,7 +2936,8 @@ TEST_F(VersionSetAtomicGroupTest, AddNewEditsToLog(kAtomicGroupSize); mu.Lock(); EXPECT_NOK(reactive_versions_->ReadAndApply( - &mu, &manifest_reader, manifest_reader_status.get(), &cfds_changed)); + &mu, &manifest_reader, manifest_reader_status.get(), &cfds_changed, + /*files_to_delete=*/nullptr)); mu.Unlock(); EXPECT_EQ(edits_[1].DebugString(), edit_with_incorrect_group_size_.DebugString()); diff --git a/test_util/testutil.cc b/test_util/testutil.cc index 5372126ef5..ab926cef21 100644 --- a/test_util/testutil.cc +++ b/test_util/testutil.cc @@ -565,6 +565,30 @@ void DeleteDir(Env* env, const std::string& dirname) { TryDeleteDir(env, dirname).PermitUncheckedError(); } +FileType GetFileType(const std::string& path) { + FileType type = kTempFile; + std::size_t found = path.find_last_of('/'); + if (found == std::string::npos) { + found = 0; + } + std::string file_name = path.substr(found); + uint64_t number = 0; + ParseFileName(file_name, &number, &type); + return type; +} + +uint64_t GetFileNumber(const std::string& path) { + FileType type = kTempFile; + std::size_t found = path.find_last_of('/'); + if (found == std::string::npos) { + found = 0; + } + std::string file_name = path.substr(found); + uint64_t number = 0; + ParseFileName(file_name, &number, &type); + return number; +} + Status CreateEnvFromSystem(const ConfigOptions& config_options, Env** result, std::shared_ptr* guard) { const char* env_uri = getenv("TEST_ENV_URI"); diff --git a/test_util/testutil.h b/test_util/testutil.h index b3fa0954cb..02accdc521 100644 --- a/test_util/testutil.h +++ b/test_util/testutil.h @@ -882,6 +882,12 @@ Status TryDeleteDir(Env* env, const std::string& dirname); // Delete a directory if it exists void DeleteDir(Env* env, const std::string& dirname); +// Find the FileType from the file path +FileType GetFileType(const std::string& path); + +// Get the file number given the file path +uint64_t GetFileNumber(const std::string& path); + // Creates an Env from the system environment by looking at the system // environment variables. Status CreateEnvFromSystem(const ConfigOptions& options, Env** result,