From 0ed93552f4cd6004e966815e1c18347e01628830 Mon Sep 17 00:00:00 2001 From: anand76 Date: Fri, 17 May 2024 19:13:33 -0700 Subject: [PATCH] Implement obsolete file deletion (GC) in follower (#12657) Summary: This PR implements deletion of obsolete files in a follower RocksDB instance. The follower tails the leader's MANIFEST and creates links to newly added SST files. These links need to be deleted once those files become obsolete in order to reclaim space. There are three cases to be considered - 1. New files added and links created, but the Version could not be installed due to some missing files. Those links need to be preserved so a subsequent catch up attempt can succeed. We insert the next file number in the `VersionSet` to `pending_outputs_` to prevent their deletion. 2. Files deleted from the previous successfully installed `Version`. These are deleted as usual in `PurgeObsoleteFiles`. 3. New files added by a `VersionEdit` and deleted by a subsequent `VersionEdit`, both processed in the same catchup attempt. Links will be created for the new files when verifying a candidate `Version`. Those need to be deleted explicitly as they're never added to `VersionStorageInfo`, and thus not deleted by `PurgeObsoleteFiles`. Test plan - New unit tests in `db_follower_test`. Pull Request resolved: https://github.com/facebook/rocksdb/pull/12657 Reviewed By: jowlyzhang Differential Revision: D57462697 Pulled By: anand1976 fbshipit-source-id: 898f15570638dd4930f839ffd31c560f9cb73916 --- db/db_follower_test.cc | 468 +++++++++++++++++++++++++++++++- db/db_impl/db_impl.h | 1 + db/db_impl/db_impl_follower.cc | 39 ++- db/db_impl/db_impl_follower.h | 3 +- db/db_impl/db_impl_secondary.cc | 3 +- db/version_edit_handler.cc | 44 ++- db/version_edit_handler.h | 26 +- db/version_set.cc | 10 +- db/version_set.h | 3 +- db/version_set_test.cc | 15 +- test_util/testutil.cc | 24 ++ test_util/testutil.h | 6 + 12 files changed, 614 insertions(+), 28 deletions(-) diff --git a/db/db_follower_test.cc b/db/db_follower_test.cc index 86bf8cc7c5..febc5ae4a4 100644 --- a/db/db_follower_test.cc +++ b/db/db_follower_test.cc @@ -17,6 +17,7 @@ class DBFollowerTest : public DBTestBase { // Create the leader DB object DBFollowerTest() : DBTestBase("/db_follower_test", /*env_do_fsync*/ false) { follower_name_ = dbname_ + "/follower"; + db_parent_ = dbname_; Close(); Destroy(CurrentOptions()); EXPECT_EQ(env_->CreateDirIfMissing(dbname_), Status::OK()); @@ -27,17 +28,215 @@ class DBFollowerTest : public DBTestBase { ~DBFollowerTest() { follower_.reset(); EXPECT_EQ(DestroyDB(follower_name_, CurrentOptions()), Status::OK()); + Destroy(CurrentOptions()); + dbname_ = db_parent_; } protected: + class DBFollowerTestFS : public FileSystemWrapper { + public: + explicit DBFollowerTestFS(const std::shared_ptr& target) + : FileSystemWrapper(target), + cv_(&mutex_), + barrier_(false), + count_(0), + reinit_count_(0) {} + + const char* Name() const override { return "DBFollowerTestFS"; } + + IOStatus NewSequentialFile(const std::string& fname, + const FileOptions& file_opts, + std::unique_ptr* result, + IODebugContext* dbg = nullptr) override { + class DBFollowerTestSeqFile : public FSSequentialFileWrapper { + public: + DBFollowerTestSeqFile(DBFollowerTestFS* fs, + std::unique_ptr&& file, + uint64_t /*size*/) + : FSSequentialFileWrapper(file.get()), + fs_(fs), + file_(std::move(file)) {} + + IOStatus Read(size_t n, const IOOptions& options, Slice* result, + char* scratch, IODebugContext* dbg) override { + fs_->BarrierWait(); + return target()->Read(n, options, result, scratch, dbg); + } + + private: + DBFollowerTestFS* fs_; + std::unique_ptr file_; + }; + + std::unique_ptr file; + IOStatus s = target()->NewSequentialFile(fname, file_opts, &file, dbg); + + if (s.ok() && test::GetFileType(fname) == kDescriptorFile) { + uint64_t size = 0; + EXPECT_EQ(target()->GetFileSize(fname, IOOptions(), &size, nullptr), + IOStatus::OK()); + result->reset(new DBFollowerTestSeqFile(this, std::move(file), size)); + } else { + *result = std::move(file); + } + return s; + } + + void BarrierInit(int count) { + MutexLock l(&mutex_); + barrier_ = true; + count_ = count; + } + + void BarrierWait() { + MutexLock l(&mutex_); + if (!barrier_) { + return; + } + if (--count_ == 0) { + if (reinit_count_ > 0) { + count_ = reinit_count_; + reinit_count_ = 0; + } else { + barrier_ = false; + } + cv_.SignalAll(); + } else { + cv_.Wait(); + } + } + + void BarrierWaitAndReinit(int count) { + MutexLock l(&mutex_); + if (!barrier_) { + return; + } + reinit_count_ = count; + if (--count_ == 0) { + if (reinit_count_ > 0) { + count_ = reinit_count_; + reinit_count_ = 0; + } else { + barrier_ = false; + } + cv_.SignalAll(); + } else { + cv_.Wait(); + } + } + + private: + port::Mutex mutex_; + port::CondVar cv_; + bool barrier_; + int count_; + int reinit_count_; + }; + + class DBFollowerTestSstPartitioner : public SstPartitioner { + public: + explicit DBFollowerTestSstPartitioner(uint64_t max_keys) + : max_keys_(max_keys), num_keys_(0) {} + + const char* Name() const override { return "DBFollowerTestSstPartitioner"; } + + PartitionerResult ShouldPartition( + const PartitionerRequest& /*request*/) override { + if (++num_keys_ > max_keys_) { + num_keys_ = 0; + return PartitionerResult::kRequired; + } else { + return PartitionerResult::kNotRequired; + } + } + + bool CanDoTrivialMove(const Slice& /*smallest_user_key*/, + const Slice& /*largest_user_key*/) override { + return true; + } + + private: + uint64_t max_keys_; + uint64_t num_keys_; + }; + + class DBFollowerTestSstPartitionerFactory : public SstPartitionerFactory { + public: + explicit DBFollowerTestSstPartitionerFactory(uint64_t max_keys) + : max_keys_(max_keys) {} + + std::unique_ptr CreatePartitioner( + const SstPartitioner::Context& /*context*/) const override { + std::unique_ptr partitioner; + partitioner.reset(new DBFollowerTestSstPartitioner(max_keys_)); + return partitioner; + } + + const char* Name() const override { + return "DBFollowerTestSstPartitionerFactory"; + } + + private: + uint64_t max_keys_; + }; + Status OpenAsFollower() { - return DB::OpenAsFollower(CurrentOptions(), follower_name_, dbname_, - &follower_); + Options opts = CurrentOptions(); + if (!follower_env_) { + follower_env_ = NewCompositeEnv( + std::make_shared(env_->GetFileSystem())); + } + opts.env = follower_env_.get(); + opts.follower_refresh_catchup_period_ms = 100; + return DB::OpenAsFollower(opts, follower_name_, dbname_, &follower_); } + + std::string FollowerGet(const std::string& k) { + ReadOptions options; + options.verify_checksums = true; + std::string result; + Status s = follower()->Get(options, k, &result); + if (s.IsNotFound()) { + result = "NOT_FOUND"; + } else if (!s.ok()) { + result = s.ToString(); + } + return result; + } + DB* follower() { return follower_.get(); } + DBFollowerTestFS* follower_fs() { + return static_cast(follower_env_->GetFileSystem().get()); + } + + void CheckDirs() { + std::vector db_children; + std::vector follower_children; + EXPECT_OK(env_->GetChildren(dbname_, &db_children)); + EXPECT_OK(env_->GetChildren(follower_name_, &follower_children)); + + std::set db_filenums; + std::set follower_filenums; + for (auto& name : db_children) { + if (test::GetFileType(name) != kTableFile) { + continue; + } + db_filenums.insert(test::GetFileNumber(name)); + } + for (auto& name : follower_children) { + if (test::GetFileType(name) != kTableFile) { + continue; + } + follower_filenums.insert(test::GetFileNumber(name)); + } + db_filenums.merge(follower_filenums); + EXPECT_EQ(follower_filenums.size(), db_filenums.size()); + } private: std::string follower_name_; + std::string db_parent_; + std::unique_ptr follower_env_; std::unique_ptr follower_; }; @@ -51,8 +250,273 @@ TEST_F(DBFollowerTest, Basic) { std::string val; ASSERT_OK(follower()->Get(ReadOptions(), "k1", &val)); ASSERT_EQ(val, "v1"); + CheckDirs(); } +TEST_F(DBFollowerTest, Flush) { + SyncPoint::GetInstance()->LoadDependency({ + {"DBImplFollower::TryCatchupWithLeader:Begin1", "Leader::Start"}, + {"Leader::Done", "DBImplFollower::TryCatchupWithLeader:Begin2"}, + {"DBImplFollower::TryCatchupWithLeader:End", "Follower::WaitForCatchup"}, + }); + SyncPoint::GetInstance()->EnableProcessing(); + + ASSERT_OK(OpenAsFollower()); + TEST_SYNC_POINT("Leader::Start"); + ASSERT_OK(Put("k1", "v1")); + ASSERT_OK(Flush()); + TEST_SYNC_POINT("Leader::Done"); + + TEST_SYNC_POINT("Follower::WaitForCatchup"); + std::string val; + ASSERT_OK(follower()->Get(ReadOptions(), "k1", &val)); + ASSERT_EQ(val, "v1"); + CheckDirs(); + + SyncPoint::GetInstance()->DisableProcessing(); +} + +// This test creates 4 L0 files, immediately followed by a compaction to L1. +// The follower replays the 4 flush records from the MANIFEST unsuccessfully, +// and then successfully recovers a Version from the compaction record +TEST_F(DBFollowerTest, RetryCatchup) { + Options opts = CurrentOptions(); + opts.disable_auto_compactions = true; + Reopen(opts); + + ASSERT_OK(OpenAsFollower()); + SyncPoint::GetInstance()->LoadDependency({ + {"DBImplFollower::TryCatchupWithLeader:Begin1", "Leader::Start"}, + {"DBImpl::BackgroundCompaction:Start", + "DBImplFollower::TryCatchupWithLeader:Begin2"}, + {"VersionEditHandlerPointInTime::MaybeCreateVersion:Begin1", + "DBImpl::BackgroundCompaction:BeforeCompaction"}, + {"DBImpl::BackgroundCallCompaction:PurgedObsoleteFiles", + "VersionEditHandlerPointInTime::MaybeCreateVersion:Begin2"}, + {"DBImplFollower::TryCatchupWithLeader:End", "Follower::WaitForCatchup"}, + }); + SyncPoint::GetInstance()->EnableProcessing(); + TEST_SYNC_POINT("Leader::Start"); + ASSERT_OK(Put("k1", "v1")); + ASSERT_OK(Flush()); + ASSERT_OK(Put("k1", "v2")); + ASSERT_OK(Flush()); + ASSERT_OK(Put("k1", "v3")); + ASSERT_OK(Flush()); + ASSERT_OK(Put("k1", "v4")); + ASSERT_OK(Flush()); + + ASSERT_OK(dbfull()->TEST_CompactRange(0, nullptr, nullptr, nullptr, true)); + + TEST_SYNC_POINT("Follower::WaitForCatchup"); + ASSERT_EQ(FollowerGet("k1"), "v4"); + CheckDirs(); + + SyncPoint::GetInstance()->ClearAllCallBacks(); + SyncPoint::GetInstance()->DisableProcessing(); +} + +// This test validates the same as the previous test, except there is a +// MANIFEST rollover between the flushes and compaction. The follower +// does not switch to a new MANIFEST in ReadAndApply. So it would require +// another round of refresh before catching up. +TEST_F(DBFollowerTest, RetryCatchupManifestRollover) { + Options opts = CurrentOptions(); + opts.disable_auto_compactions = true; + Reopen(opts); + + ASSERT_OK(Put("k1", "v1")); + ASSERT_OK(Flush()); + ASSERT_OK(Put("k1", "v2")); + ASSERT_OK(Flush()); + ASSERT_OK(Put("k1", "v3")); + ASSERT_OK(Flush()); + ASSERT_OK(OpenAsFollower()); + SyncPoint::GetInstance()->LoadDependency({ + {"DBImplFollower::TryCatchupWithLeader:Begin1", "Leader::Start"}, + {"Leader::Flushed", "DBImplFollower::TryCatchupWithLeader:Begin2"}, + {"VersionEditHandlerPointInTime::MaybeCreateVersion:Begin1", + "Leader::Done"}, + {"DBImpl::BackgroundCallCompaction:PurgedObsoleteFiles", + "VersionEditHandlerPointInTime::MaybeCreateVersion:Begin2"}, + {"DBImplFollower::TryCatchupWithLeader:End", + "Follower::WaitForCatchup:1"}, + }); + SyncPoint::GetInstance()->EnableProcessing(); + TEST_SYNC_POINT("Leader::Start"); + ASSERT_OK(Put("k1", "v4")); + ASSERT_OK(Flush()); + + TEST_SYNC_POINT("Leader::Flushed"); + TEST_SYNC_POINT("Leader::Done"); + Reopen(opts); + ASSERT_OK(dbfull()->TEST_CompactRange(0, nullptr, nullptr, nullptr, true)); + + TEST_SYNC_POINT("Follower::WaitForCatchup:1"); + SyncPoint::GetInstance()->LoadDependency({ + {"DBImplFollower::TryCatchupWithLeader:End", + "Follower::WaitForCatchup:2"}, + }); + TEST_SYNC_POINT("Follower::WaitForCatchup:2"); + ASSERT_EQ(FollowerGet("k1"), "v4"); + + SyncPoint::GetInstance()->ClearAllCallBacks(); + SyncPoint::GetInstance()->DisableProcessing(); +} + +// This test creates 4 L0 files and compacts them. The follower, during catchup, +// successfully instantiates 4 Versions corresponding to the 4 files (but +// donesn't install them yet), followed by deleting those 4 and adding a new +// file from compaction. The test verifies that the 4 L0 files are deleted +// correctly by the follower. +// We use teh Barrier* functions to ensure that the follower first sees the 4 +// L0 files and is able to link them, and then sees the compaction that +// obsoletes those L0 files (so those L0 files are intermediates that it has +// to explicitly delete). Suppose we don't have any barriers, its possible +// the follower reads the L0 records and compaction records from the MANIFEST +// in one read, which means those L0 files would have already been deleted +// by the leader and the follower cannot link to them. +TEST_F(DBFollowerTest, IntermediateObsoleteFiles) { + Options opts = CurrentOptions(); + opts.disable_auto_compactions = true; + Reopen(opts); + ASSERT_OK(OpenAsFollower()); + + follower_fs()->BarrierInit(2); + ASSERT_OK(Put("k1", "v1")); + ASSERT_OK(Flush()); + ASSERT_OK(Put("k1", "v2")); + ASSERT_OK(Flush()); + ASSERT_OK(Put("k1", "v3")); + ASSERT_OK(Flush()); + ASSERT_OK(Put("k1", "v4")); + ASSERT_OK(Flush()); + follower_fs()->BarrierWaitAndReinit(2); + + ASSERT_OK(dbfull()->TEST_CompactRange(0, nullptr, nullptr, nullptr, true)); + follower_fs()->BarrierWait(); + + SyncPoint::GetInstance()->LoadDependency({ + {"DBImplFollower::TryCatchupWithLeader:End", + "Follower::WaitForCatchup:1"}, + }); + SyncPoint::GetInstance()->EnableProcessing(); + TEST_SYNC_POINT("Follower::WaitForCatchup:1"); + CheckDirs(); + ASSERT_EQ(FollowerGet("k1"), "v4"); +} + +// This test verifies a scenario where the follower can recover a Version +// partially (i.e some of the additions cannot be found), and the files +// that are found are obsoleted by a subsequent VersionEdit. +TEST_F(DBFollowerTest, PartialVersionRecovery) { + Options opts = CurrentOptions(); + opts.disable_auto_compactions = true; + opts.sst_partitioner_factory = + std::make_shared(1); + Reopen(opts); + + ASSERT_OK(Put("k1", "v1")); + ASSERT_OK(Put("k2", "v1")); + ASSERT_OK(Put("k3", "v1")); + ASSERT_OK(Flush()); + MoveFilesToLevel(2); + + ASSERT_OK(Put("k1", "v2")); + ASSERT_OK(Flush()); + ASSERT_OK(Put("k3", "v2")); + ASSERT_OK(Flush()); + MoveFilesToLevel(1); + + ASSERT_OK(OpenAsFollower()); + ASSERT_OK(dbfull()->SetOptions(dbfull()->DefaultColumnFamily(), + {{"max_compaction_bytes", "1"}})); + + follower_fs()->BarrierInit(2); + Slice key("k1"); + ASSERT_OK(dbfull()->TEST_CompactRange(1, &key, &key, nullptr, true)); + + follower_fs()->BarrierWaitAndReinit(2); + + // The second compaction input overlaps the previous compaction outputs + // by one file. This file is never added to VersionStorageInfo since it + // was added and deleted before the catch up completes. We later verify that + // the follower correctly deleted this file. + key = Slice("k3"); + ASSERT_OK(dbfull()->TEST_CompactRange(1, &key, &key, nullptr, true)); + follower_fs()->BarrierWait(); + + SyncPoint::GetInstance()->LoadDependency({ + {"DBImplFollower::TryCatchupWithLeader:End", + "Follower::WaitForCatchup:1"}, + }); + SyncPoint::GetInstance()->EnableProcessing(); + TEST_SYNC_POINT("Follower::WaitForCatchup:1"); + CheckDirs(); + ASSERT_EQ(FollowerGet("k1"), "v2"); + ASSERT_EQ(FollowerGet("k2"), "v1"); + ASSERT_EQ(FollowerGet("k3"), "v2"); + SyncPoint::GetInstance()->DisableProcessing(); +} + +// This test verifies a scenario similar to the PartialVersionRecovery, except +// with a MANIFEST rollover in between. When there is a rollover, the +// follower's attempt ends without installing a new Version. The next catch up +// attempt will recover a full Version. +TEST_F(DBFollowerTest, PartialVersionRecoveryWithRollover) { + Options opts = CurrentOptions(); + opts.disable_auto_compactions = true; + opts.sst_partitioner_factory = + std::make_shared(1); + Reopen(opts); + + ASSERT_OK(Put("k1", "v1")); + ASSERT_OK(Put("k2", "v1")); + ASSERT_OK(Put("k3", "v1")); + ASSERT_OK(Flush()); + MoveFilesToLevel(2); + + ASSERT_OK(Put("k1", "v2")); + ASSERT_OK(Flush()); + ASSERT_OK(Put("k3", "v2")); + ASSERT_OK(Flush()); + MoveFilesToLevel(1); + + opts.max_compaction_bytes = 1; + Reopen(opts); + + ASSERT_OK(OpenAsFollower()); + + follower_fs()->BarrierInit(2); + Slice key("k1"); + ASSERT_OK(dbfull()->TEST_CompactRange(1, &key, &key, nullptr, true)); + + follower_fs()->BarrierWaitAndReinit(2); + Reopen(opts); + key = Slice("k3"); + ASSERT_OK(dbfull()->TEST_CompactRange(1, &key, &key, nullptr, true)); + follower_fs()->BarrierWait(); + + SyncPoint::GetInstance()->LoadDependency({ + {"DBImplFollower::TryCatchupWithLeader:Begin1", + "Follower::WaitForCatchup:1"}, + {"Follower::WaitForCatchup:2", + "DBImplFollower::TryCatchupWithLeader:Begin2"}, + }); + SyncPoint::GetInstance()->EnableProcessing(); + TEST_SYNC_POINT("Follower::WaitForCatchup:1"); + TEST_SYNC_POINT("Follower::WaitForCatchup:2"); + SyncPoint::GetInstance()->LoadDependency({ + {"DBImplFollower::TryCatchupWithLeader:End", + "Follower::WaitForCatchup:3"}, + }); + TEST_SYNC_POINT("Follower::WaitForCatchup:3"); + CheckDirs(); + ASSERT_EQ(FollowerGet("k1"), "v2"); + ASSERT_EQ(FollowerGet("k2"), "v1"); + ASSERT_EQ(FollowerGet("k3"), "v2"); + SyncPoint::GetInstance()->DisableProcessing(); +} #endif } // namespace ROCKSDB_NAMESPACE diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 8192269ed2..40acbc96f7 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -1648,6 +1648,7 @@ class DBImpl : public DB { friend class ForwardIterator; friend struct SuperVersion; friend class CompactedDBImpl; + friend class DBImplFollower; #ifndef NDEBUG friend class DBTest_ConcurrentFlushWAL_Test; friend class DBTest_MixedSlowdownOptionsStop_Test; diff --git a/db/db_impl/db_impl_follower.cc b/db/db_impl/db_impl_follower.cc index 8d21f530cd..a8c736f1bf 100644 --- a/db/db_impl/db_impl_follower.cc +++ b/db/db_impl/db_impl_follower.cc @@ -95,17 +95,28 @@ Status DBImplFollower::TryCatchUpWithLeader() { assert(versions_.get() != nullptr); assert(manifest_reader_.get() != nullptr); Status s; + + TEST_SYNC_POINT("DBImplFollower::TryCatchupWithLeader:Begin1"); + TEST_SYNC_POINT("DBImplFollower::TryCatchupWithLeader:Begin2"); // read the manifest and apply new changes to the follower instance std::unordered_set cfds_changed; JobContext job_context(0, true /*create_superversion*/); { InstrumentedMutexLock lock_guard(&mutex_); + std::vector files_to_delete; s = static_cast_with_check(versions_.get()) ->ReadAndApply(&mutex_, &manifest_reader_, - manifest_reader_status_.get(), &cfds_changed); + manifest_reader_status_.get(), &cfds_changed, + &files_to_delete); + ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem_); + pending_outputs_inserted_elem_.reset(new std::list::iterator( + CaptureCurrentFileNumberInPendingOutputs())); ROCKS_LOG_INFO(immutable_db_options_.info_log, "Last sequence is %" PRIu64, static_cast(versions_->LastSequence())); + ROCKS_LOG_INFO( + immutable_db_options_.info_log, "Next file number is %" PRIu64, + static_cast(versions_->current_next_file_number())); for (ColumnFamilyData* cfd : cfds_changed) { if (cfd->IsDropped()) { ROCKS_LOG_DEBUG(immutable_db_options_.info_log, "[%s] is dropped\n", @@ -147,9 +158,33 @@ Status DBImplFollower::TryCatchUpWithLeader() { sv_context.NewSuperVersion(); } } + + for (auto& file : files_to_delete) { + IOStatus io_s = fs_->DeleteFile(file, IOOptions(), nullptr); + if (!io_s.ok()) { + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "Cannot delete file %s: %s", file.c_str(), + io_s.ToString().c_str()); + } + } } job_context.Clean(); + // Cleanup unused, obsolete files. + JobContext purge_files_job_context(0); + { + InstrumentedMutexLock lock_guard(&mutex_); + // Currently, follower instance does not create any database files, thus + // is unnecessary for the follower to force full scan. + FindObsoleteFiles(&purge_files_job_context, /*force=*/false); + } + if (purge_files_job_context.HaveSomethingToDelete()) { + PurgeObsoleteFiles(purge_files_job_context); + } + purge_files_job_context.Clean(); + + TEST_SYNC_POINT("DBImplFollower::TryCatchupWithLeader:End"); + return s; } @@ -199,6 +234,8 @@ Status DBImplFollower::Close() { catch_up_thread_.reset(); } + ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem_); + return DBImpl::Close(); } diff --git a/db/db_impl/db_impl_follower.h b/db/db_impl/db_impl_follower.h index 60992c111e..374c60d5c6 100644 --- a/db/db_impl/db_impl_follower.h +++ b/db/db_impl/db_impl_follower.h @@ -27,7 +27,7 @@ class DBImplFollower : public DBImplSecondary { bool OwnTablesAndLogs() const override { // TODO: Change this to true once we've properly implemented file // deletion for the read scaling case - return false; + return true; } Status Recover(const std::vector& column_families, @@ -49,5 +49,6 @@ class DBImplFollower : public DBImplSecondary { std::string src_path_; port::Mutex mu_; port::CondVar cv_; + std::unique_ptr::iterator> pending_outputs_inserted_elem_; }; } // namespace ROCKSDB_NAMESPACE diff --git a/db/db_impl/db_impl_secondary.cc b/db/db_impl/db_impl_secondary.cc index a5509c4f12..92944d1181 100644 --- a/db/db_impl/db_impl_secondary.cc +++ b/db/db_impl/db_impl_secondary.cc @@ -680,7 +680,8 @@ Status DBImplSecondary::TryCatchUpWithPrimary() { InstrumentedMutexLock lock_guard(&mutex_); s = static_cast_with_check(versions_.get()) ->ReadAndApply(&mutex_, &manifest_reader_, - manifest_reader_status_.get(), &cfds_changed); + manifest_reader_status_.get(), &cfds_changed, + /*files_to_delete=*/nullptr); ROCKS_LOG_INFO(immutable_db_options_.info_log, "Last sequence is %" PRIu64, static_cast(versions_->LastSequence())); diff --git a/db/version_edit_handler.cc b/db/version_edit_handler.cc index 42bdcd3c33..3284768046 100644 --- a/db/version_edit_handler.cc +++ b/db/version_edit_handler.cc @@ -152,7 +152,7 @@ Status FileChecksumRetriever::ApplyVersionEdit(VersionEdit& edit, VersionEditHandler::VersionEditHandler( bool read_only, std::vector column_families, - VersionSet* version_set, bool track_missing_files, + VersionSet* version_set, bool track_found_and_missing_files, bool no_error_if_files_missing, const std::shared_ptr& io_tracer, const ReadOptions& read_options, bool skip_load_table_files, EpochNumberRequirement epoch_number_requirement) @@ -160,7 +160,7 @@ VersionEditHandler::VersionEditHandler( read_only_(read_only), column_families_(std::move(column_families)), version_set_(version_set), - track_missing_files_(track_missing_files), + track_found_and_missing_files_(track_found_and_missing_files), no_error_if_files_missing_(no_error_if_files_missing), io_tracer_(io_tracer), skip_load_table_files_(skip_load_table_files), @@ -500,7 +500,8 @@ ColumnFamilyData* VersionEditHandler::CreateCfAndInit( assert(builders_.find(cf_id) == builders_.end()); builders_.emplace(cf_id, VersionBuilderUPtr(new BaseReferencedVersionBuilder(cfd))); - if (track_missing_files_) { + if (track_found_and_missing_files_) { + cf_to_found_files_.emplace(cf_id, std::unordered_set()); cf_to_missing_files_.emplace(cf_id, std::unordered_set()); cf_to_missing_blob_files_high_.emplace(cf_id, kInvalidBlobFileNumber); } @@ -513,7 +514,11 @@ ColumnFamilyData* VersionEditHandler::DestroyCfAndCleanup( auto builder_iter = builders_.find(cf_id); assert(builder_iter != builders_.end()); builders_.erase(builder_iter); - if (track_missing_files_) { + if (track_found_and_missing_files_) { + auto found_files_iter = cf_to_found_files_.find(cf_id); + assert(found_files_iter != cf_to_found_files_.end()); + cf_to_found_files_.erase(found_files_iter); + auto missing_files_iter = cf_to_missing_files_.find(cf_id); assert(missing_files_iter != cf_to_missing_files_.end()); cf_to_missing_files_.erase(missing_files_iter); @@ -729,7 +734,7 @@ VersionEditHandlerPointInTime::VersionEditHandlerPointInTime( const ReadOptions& read_options, EpochNumberRequirement epoch_number_requirement) : VersionEditHandler(read_only, column_families, version_set, - /*track_missing_files=*/true, + /*track_found_and_missing_files=*/true, /*no_error_if_files_missing=*/true, io_tracer, read_options, epoch_number_requirement) {} @@ -824,6 +829,12 @@ void VersionEditHandlerPointInTime::CheckIterationResult( version_set_->AppendVersion(cfd, v_iter->second); versions_.erase(v_iter); + // Let's clear found_files, since any files in that are part of the + // installed Version. Any files that got obsoleted would have already + // been moved to intermediate_files_ + auto found_files_iter = cf_to_found_files_.find(cfd->GetID()); + assert(found_files_iter != cf_to_found_files_.end()); + found_files_iter->second.clear(); } } } else { @@ -854,10 +865,16 @@ ColumnFamilyData* VersionEditHandlerPointInTime::DestroyCfAndCleanup( Status VersionEditHandlerPointInTime::MaybeCreateVersion( const VersionEdit& edit, ColumnFamilyData* cfd, bool force_create_version) { + TEST_SYNC_POINT("VersionEditHandlerPointInTime::MaybeCreateVersion:Begin1"); + TEST_SYNC_POINT("VersionEditHandlerPointInTime::MaybeCreateVersion:Begin2"); assert(cfd != nullptr); if (!force_create_version) { assert(edit.GetColumnFamily() == cfd->GetID()); } + auto found_files_iter = cf_to_found_files_.find(cfd->GetID()); + assert(found_files_iter != cf_to_found_files_.end()); + std::unordered_set& found_files = found_files_iter->second; + auto missing_files_iter = cf_to_missing_files_.find(cfd->GetID()); assert(missing_files_iter != cf_to_missing_files_.end()); std::unordered_set& missing_files = missing_files_iter->second; @@ -889,6 +906,18 @@ Status VersionEditHandlerPointInTime::MaybeCreateVersion( auto fiter = missing_files.find(file_num); if (fiter != missing_files.end()) { missing_files.erase(fiter); + } else { + fiter = found_files.find(file_num); + // Only mark new files added during this catchup attempt for deletion. + // These files were never installed in VersionStorageInfo. + // Already referenced files that are deleted by a VersionEdit will + // be added to the VersionStorageInfo's obsolete files when the old + // version is dereferenced. + if (fiter != found_files.end()) { + intermediate_files_.emplace_back( + MakeTableFileName(cfd->ioptions()->cf_paths[0].path, file_num)); + found_files.erase(fiter); + } } } @@ -904,9 +933,14 @@ Status VersionEditHandlerPointInTime::MaybeCreateVersion( s = VerifyFile(cfd, fpath, level, meta); if (s.IsPathNotFound() || s.IsNotFound() || s.IsCorruption()) { missing_files.insert(file_num); + if (s.IsCorruption()) { + found_files.insert(file_num); + } s = Status::OK(); } else if (!s.ok()) { break; + } else { + found_files.insert(file_num); } } diff --git a/db/version_edit_handler.h b/db/version_edit_handler.h index 4caa9c0898..c918c6ec98 100644 --- a/db/version_edit_handler.h +++ b/db/version_edit_handler.h @@ -104,7 +104,7 @@ using VersionBuilderUPtr = std::unique_ptr; // To use this class and its subclasses, // 1. Create an object of VersionEditHandler or its subclasses. // VersionEditHandler handler(read_only, column_families, version_set, -// track_missing_files, +// track_found_and_missing_files, // no_error_if_files_missing); // 2. Status s = handler.Iterate(reader, &db_id); // 3. Check s and handle possible errors. @@ -116,16 +116,17 @@ class VersionEditHandler : public VersionEditHandlerBase { explicit VersionEditHandler( bool read_only, const std::vector& column_families, - VersionSet* version_set, bool track_missing_files, + VersionSet* version_set, bool track_found_and_missing_files, bool no_error_if_files_missing, const std::shared_ptr& io_tracer, const ReadOptions& read_options, EpochNumberRequirement epoch_number_requirement = EpochNumberRequirement::kMustPresent) - : VersionEditHandler( - read_only, column_families, version_set, track_missing_files, - no_error_if_files_missing, io_tracer, read_options, - /*skip_load_table_files=*/false, epoch_number_requirement) {} + : VersionEditHandler(read_only, column_families, version_set, + track_found_and_missing_files, + no_error_if_files_missing, io_tracer, read_options, + /*skip_load_table_files=*/false, + epoch_number_requirement) {} ~VersionEditHandler() override {} @@ -144,7 +145,7 @@ class VersionEditHandler : public VersionEditHandlerBase { protected: explicit VersionEditHandler( bool read_only, std::vector column_families, - VersionSet* version_set, bool track_missing_files, + VersionSet* version_set, bool track_found_and_missing_files, bool no_error_if_files_missing, const std::shared_ptr& io_tracer, const ReadOptions& read_options, bool skip_load_table_files, @@ -195,7 +196,8 @@ class VersionEditHandler : public VersionEditHandlerBase { // by subsequent manifest records, Recover() will return failure status. std::unordered_map column_families_not_found_; VersionEditParams version_edit_params_; - const bool track_missing_files_; + const bool track_found_and_missing_files_; + std::unordered_map> cf_to_found_files_; std::unordered_map> cf_to_missing_files_; std::unordered_map cf_to_missing_blob_files_high_; @@ -273,6 +275,8 @@ class VersionEditHandlerPointInTime : public VersionEditHandler { bool in_atomic_group_ = false; + std::vector intermediate_files_; + private: bool AtomicUpdateVersionsCompleted(); bool AtomicUpdateVersionsContains(uint32_t cfid); @@ -310,6 +314,10 @@ class ManifestTailer : public VersionEditHandlerPointInTime { return cfds_changed_; } + std::vector& GetIntermediateFiles() { + return intermediate_files_; + } + protected: Status Initialize() override; @@ -342,7 +350,7 @@ class DumpManifestHandler : public VersionEditHandler { bool json) : VersionEditHandler( /*read_only=*/true, column_families, version_set, - /*track_missing_files=*/false, + /*track_found_and_missing_files=*/false, /*no_error_if_files_missing=*/false, io_tracer, read_options, /*skip_load_table_files=*/true), verbose_(verbose), diff --git a/db/version_set.cc b/db/version_set.cc index ffcf210dc4..f8dfff5bd8 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -6063,8 +6063,8 @@ Status VersionSet::Recover( true /* checksum */, 0 /* log_number */); VersionEditHandler handler( read_only, column_families, const_cast(this), - /*track_missing_files=*/false, no_error_if_files_missing, io_tracer_, - read_options, EpochNumberRequirement::kMightMissing); + /*track_found_and_missing_files=*/false, no_error_if_files_missing, + io_tracer_, read_options, EpochNumberRequirement::kMightMissing); handler.Iterate(reader, &log_read_status); s = handler.status(); if (s.ok()) { @@ -7439,7 +7439,8 @@ Status ReactiveVersionSet::ReadAndApply( InstrumentedMutex* mu, std::unique_ptr* manifest_reader, Status* manifest_read_status, - std::unordered_set* cfds_changed) { + std::unordered_set* cfds_changed, + std::vector* files_to_delete) { assert(manifest_reader != nullptr); assert(cfds_changed != nullptr); mu->AssertHeld(); @@ -7456,6 +7457,9 @@ Status ReactiveVersionSet::ReadAndApply( if (s.ok()) { *cfds_changed = std::move(manifest_tailer_->GetUpdatedColumnFamilies()); } + if (files_to_delete) { + *files_to_delete = std::move(manifest_tailer_->GetIntermediateFiles()); + } return s; } diff --git a/db/version_set.h b/db/version_set.h index ec98c77402..ba86878d40 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -1741,7 +1741,8 @@ class ReactiveVersionSet : public VersionSet { InstrumentedMutex* mu, std::unique_ptr* manifest_reader, Status* manifest_read_status, - std::unordered_set* cfds_changed); + std::unordered_set* cfds_changed, + std::vector* files_to_delete); Status Recover(const std::vector& column_families, std::unique_ptr* manifest_reader, diff --git a/db/version_set_test.cc b/db/version_set_test.cc index f6a983d6b2..d4b748db7b 100644 --- a/db/version_set_test.cc +++ b/db/version_set_test.cc @@ -2742,7 +2742,8 @@ TEST_F(VersionSetAtomicGroupTest, std::unordered_set cfds_changed; mu.Lock(); EXPECT_OK(reactive_versions_->ReadAndApply( - &mu, &manifest_reader, manifest_reader_status.get(), &cfds_changed)); + &mu, &manifest_reader, manifest_reader_status.get(), &cfds_changed, + /*files_to_delete=*/nullptr)); mu.Unlock(); EXPECT_TRUE(first_in_atomic_group_); EXPECT_TRUE(last_in_atomic_group_); @@ -2797,7 +2798,8 @@ TEST_F(VersionSetAtomicGroupTest, std::unordered_set cfds_changed; mu.Lock(); EXPECT_OK(reactive_versions_->ReadAndApply( - &mu, &manifest_reader, manifest_reader_status.get(), &cfds_changed)); + &mu, &manifest_reader, manifest_reader_status.get(), &cfds_changed, + /*files_to_delete=*/nullptr)); mu.Unlock(); // Reactive version set should be empty now. EXPECT_TRUE(reactive_versions_->TEST_read_edits_in_atomic_group() == 0); @@ -2826,7 +2828,8 @@ TEST_F(VersionSetAtomicGroupTest, std::unordered_set cfds_changed; mu.Lock(); EXPECT_OK(reactive_versions_->ReadAndApply( - &mu, &manifest_reader, manifest_reader_status.get(), &cfds_changed)); + &mu, &manifest_reader, manifest_reader_status.get(), &cfds_changed, + /*files_to_delete=*/nullptr)); mu.Unlock(); EXPECT_TRUE(first_in_atomic_group_); EXPECT_FALSE(last_in_atomic_group_); @@ -2882,7 +2885,8 @@ TEST_F(VersionSetAtomicGroupTest, AddNewEditsToLog(kAtomicGroupSize); mu.Lock(); EXPECT_NOK(reactive_versions_->ReadAndApply( - &mu, &manifest_reader, manifest_reader_status.get(), &cfds_changed)); + &mu, &manifest_reader, manifest_reader_status.get(), &cfds_changed, + /*files_to_delete=*/nullptr)); mu.Unlock(); EXPECT_EQ(edits_[kAtomicGroupSize / 2].DebugString(), corrupted_edit_.DebugString()); @@ -2932,7 +2936,8 @@ TEST_F(VersionSetAtomicGroupTest, AddNewEditsToLog(kAtomicGroupSize); mu.Lock(); EXPECT_NOK(reactive_versions_->ReadAndApply( - &mu, &manifest_reader, manifest_reader_status.get(), &cfds_changed)); + &mu, &manifest_reader, manifest_reader_status.get(), &cfds_changed, + /*files_to_delete=*/nullptr)); mu.Unlock(); EXPECT_EQ(edits_[1].DebugString(), edit_with_incorrect_group_size_.DebugString()); diff --git a/test_util/testutil.cc b/test_util/testutil.cc index 5372126ef5..ab926cef21 100644 --- a/test_util/testutil.cc +++ b/test_util/testutil.cc @@ -565,6 +565,30 @@ void DeleteDir(Env* env, const std::string& dirname) { TryDeleteDir(env, dirname).PermitUncheckedError(); } +FileType GetFileType(const std::string& path) { + FileType type = kTempFile; + std::size_t found = path.find_last_of('/'); + if (found == std::string::npos) { + found = 0; + } + std::string file_name = path.substr(found); + uint64_t number = 0; + ParseFileName(file_name, &number, &type); + return type; +} + +uint64_t GetFileNumber(const std::string& path) { + FileType type = kTempFile; + std::size_t found = path.find_last_of('/'); + if (found == std::string::npos) { + found = 0; + } + std::string file_name = path.substr(found); + uint64_t number = 0; + ParseFileName(file_name, &number, &type); + return number; +} + Status CreateEnvFromSystem(const ConfigOptions& config_options, Env** result, std::shared_ptr* guard) { const char* env_uri = getenv("TEST_ENV_URI"); diff --git a/test_util/testutil.h b/test_util/testutil.h index b3fa0954cb..02accdc521 100644 --- a/test_util/testutil.h +++ b/test_util/testutil.h @@ -882,6 +882,12 @@ Status TryDeleteDir(Env* env, const std::string& dirname); // Delete a directory if it exists void DeleteDir(Env* env, const std::string& dirname); +// Find the FileType from the file path +FileType GetFileType(const std::string& path); + +// Get the file number given the file path +uint64_t GetFileNumber(const std::string& path); + // Creates an Env from the system environment by looking at the system // environment variables. Status CreateEnvFromSystem(const ConfigOptions& options, Env** result,