From 9375c3b6354770941a9c48aec40e3a2f0f815ac0 Mon Sep 17 00:00:00 2001 From: Yu Zhang Date: Wed, 2 Oct 2024 17:19:18 -0700 Subject: [PATCH] Fix `needs_flush` assertion in file ingestion (#13045) Summary: This PR makes file ingestion job's flush wait a bit further until the SuperVersion is also updated. This is necessary since follow up operations will use the current SuperVersion to do range overlapping check and level assignment. In debug mode, file ingestion job's second `NeedsFlush` call could have been invoked when the memtables are flushed but the SuperVersion hasn't been updated yet, triggering the assertion. Pull Request resolved: https://github.com/facebook/rocksdb/pull/13045 Test Plan: Existing tests Manually stress tested Reviewed By: cbi42 Differential Revision: D63671151 Pulled By: jowlyzhang fbshipit-source-id: 95a169e58a7e59f6dd4125e7296e9060fe4c63a7 --- db/column_family.h | 6 +++--- db/db_impl/db_impl.h | 11 ++++++----- db/db_impl/db_impl_compaction_flush.cc | 20 +++++++++++++++----- db/memtable_list.cc | 15 ++++++++++++++- db/memtable_list.h | 21 +++++++++++++++++++-- include/rocksdb/options.h | 1 + 6 files changed, 58 insertions(+), 16 deletions(-) diff --git a/db/column_family.h b/db/column_family.h index e4b7adde89..a72527bd29 100644 --- a/db/column_family.h +++ b/db/column_family.h @@ -385,9 +385,9 @@ class ColumnFamilyData { uint64_t GetTotalSstFilesSize() const; // REQUIRE: DB mutex held uint64_t GetLiveSstFilesSize() const; // REQUIRE: DB mutex held uint64_t GetTotalBlobFileSize() const; // REQUIRE: DB mutex held + // REQUIRE: DB mutex held void SetMemtable(MemTable* new_mem) { - uint64_t memtable_id = last_memtable_id_.fetch_add(1) + 1; - new_mem->SetID(memtable_id); + new_mem->SetID(++last_memtable_id_); mem_ = new_mem; } @@ -669,7 +669,7 @@ class ColumnFamilyData { bool allow_2pc_; // Memtable id to track flush. - std::atomic last_memtable_id_; + uint64_t last_memtable_id_; // Directories corresponding to cf_paths. std::vector> data_dirs_; diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 218c1851e6..e511c67260 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -2076,17 +2076,18 @@ class DBImpl : public DB { // memtable pending flush. // resuming_from_bg_err indicates whether the caller is attempting to resume // from background error. - Status WaitForFlushMemTable(ColumnFamilyData* cfd, - const uint64_t* flush_memtable_id = nullptr, - bool resuming_from_bg_err = false) { + Status WaitForFlushMemTable( + ColumnFamilyData* cfd, const uint64_t* flush_memtable_id = nullptr, + bool resuming_from_bg_err = false, + std::optional flush_reason = std::nullopt) { return WaitForFlushMemTables({cfd}, {flush_memtable_id}, - resuming_from_bg_err); + resuming_from_bg_err, flush_reason); } // Wait for memtables to be flushed for multiple column families. Status WaitForFlushMemTables( const autovector& cfds, const autovector& flush_memtable_ids, - bool resuming_from_bg_err); + bool resuming_from_bg_err, std::optional flush_reason); inline void WaitForPendingWrites() { mutex_.AssertHeld(); diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index 3fb8af4477..df8d6275d0 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -2407,7 +2407,8 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd, } s = WaitForFlushMemTables( cfds, flush_memtable_ids, - flush_reason == FlushReason::kErrorRecovery /* resuming_from_bg_err */); + flush_reason == FlushReason::kErrorRecovery /* resuming_from_bg_err */, + flush_reason); InstrumentedMutexLock lock_guard(&mutex_); for (auto* tmp_cfd : cfds) { tmp_cfd->UnrefAndTryDelete(); @@ -2549,7 +2550,8 @@ Status DBImpl::AtomicFlushMemTables( } s = WaitForFlushMemTables( cfds, flush_memtable_ids, - flush_reason == FlushReason::kErrorRecovery /* resuming_from_bg_err */); + flush_reason == FlushReason::kErrorRecovery /* resuming_from_bg_err */, + flush_reason); InstrumentedMutexLock lock_guard(&mutex_); for (auto* cfd : cfds) { cfd->UnrefAndTryDelete(); @@ -2612,7 +2614,7 @@ Status DBImpl::RetryFlushesForErrorRecovery(FlushReason flush_reason, flush_memtable_id_ptrs.push_back(&flush_memtable_id); } s = WaitForFlushMemTables(cfds, flush_memtable_id_ptrs, - true /* resuming_from_bg_err */); + true /* resuming_from_bg_err */, flush_reason); mutex_.Lock(); } @@ -2712,7 +2714,7 @@ Status DBImpl::WaitUntilFlushWouldNotStallWrites(ColumnFamilyData* cfd, Status DBImpl::WaitForFlushMemTables( const autovector& cfds, const autovector& flush_memtable_ids, - bool resuming_from_bg_err) { + bool resuming_from_bg_err, std::optional flush_reason) { int num = static_cast(cfds.size()); // Wait until the compaction completes InstrumentedMutexLock l(&mutex_); @@ -2750,7 +2752,15 @@ Status DBImpl::WaitForFlushMemTables( (flush_memtable_ids[i] != nullptr && cfds[i]->imm()->GetEarliestMemTableID() > *flush_memtable_ids[i])) { - ++num_finished; + // Make file ingestion's flush wait until SuperVersion is also updated + // since after flush, it does range overlapping check and file level + // assignment with the current SuperVersion. + if (!flush_reason.has_value() || + flush_reason.value() != FlushReason::kExternalFileIngestion || + cfds[i]->GetSuperVersion()->imm->GetID() == + cfds[i]->imm()->current()->GetID()) { + ++num_finished; + } } } if (1 == num_dropped && 1 == num) { diff --git a/db/memtable_list.cc b/db/memtable_list.cc index c81c096b51..8ad4efcc2b 100644 --- a/db/memtable_list.cc +++ b/db/memtable_list.cc @@ -359,11 +359,15 @@ bool MemTableListVersion::MemtableLimitExceeded(size_t usage) { } } +bool MemTableListVersion::HistoryShouldBeTrimmed(size_t usage) { + return MemtableLimitExceeded(usage) && !memlist_history_.empty(); +} + // Make sure we don't use up too much space in history bool MemTableListVersion::TrimHistory(autovector* to_delete, size_t usage) { bool ret = false; - while (MemtableLimitExceeded(usage) && !memlist_history_.empty()) { + while (HistoryShouldBeTrimmed(usage)) { MemTable* x = memlist_history_.back(); memlist_history_.pop_back(); @@ -661,8 +665,16 @@ void MemTableList::Add(MemTable* m, autovector* to_delete) { } bool MemTableList::TrimHistory(autovector* to_delete, size_t usage) { + // Check if history trim is needed first, so that we can avoid installing a + // new MemTableListVersion without installing a SuperVersion (installed based + // on return value of this function). + if (!current_->HistoryShouldBeTrimmed(usage)) { + ResetTrimHistoryNeeded(); + return false; + } InstallNewVersion(); bool ret = current_->TrimHistory(to_delete, usage); + assert(ret); UpdateCachedValuesFromMemTableListVersion(); ResetTrimHistoryNeeded(); return ret; @@ -714,6 +726,7 @@ void MemTableList::InstallNewVersion() { // somebody else holds the current version, we need to create new one MemTableListVersion* version = current_; current_ = new MemTableListVersion(¤t_memory_usage_, *version); + current_->SetID(++last_memtable_list_version_id_); current_->Ref(); version->Unref(); } diff --git a/db/memtable_list.h b/db/memtable_list.h index 390b4137dd..75afb5018d 100644 --- a/db/memtable_list.h +++ b/db/memtable_list.h @@ -141,6 +141,11 @@ class MemTableListVersion { // Return kMaxSequenceNumber if the list is empty. SequenceNumber GetFirstSequenceNumber() const; + // REQUIRES: db_mutex held. + void SetID(uint64_t id) { id_ = id; } + + uint64_t GetID() const { return id_; } + private: friend class MemTableList; @@ -161,7 +166,11 @@ class MemTableListVersion { // REQUIRE: m is an immutable memtable void Remove(MemTable* m, autovector* to_delete); - // Return true if memtable is trimmed + // Return true if the memtable list should be trimmed to get memory usage + // under budget. + bool HistoryShouldBeTrimmed(size_t usage); + + // Trim history, Return true if memtable is trimmed bool TrimHistory(autovector* to_delete, size_t usage); bool GetFromList(std::list* list, const LookupKey& key, @@ -205,6 +214,9 @@ class MemTableListVersion { int refs_ = 0; size_t* parent_memtable_list_memory_usage_; + + // MemtableListVersion id to track for flush results checking. + uint64_t id_ = 0; }; // This class stores references to all the immutable memtables. @@ -235,7 +247,8 @@ class MemTableList { flush_requested_(false), current_memory_usage_(0), current_memory_allocted_bytes_excluding_last_(0), - current_has_history_(false) { + current_has_history_(false), + last_memtable_list_version_id_(0) { current_->Ref(); } @@ -500,6 +513,10 @@ class MemTableList { // Cached value of current_->HasHistory(). std::atomic current_has_history_; + + // Last memtabe list version id, increase by 1 each time a new + // MemtableListVersion is installed. + uint64_t last_memtable_list_version_id_; }; // Installs memtable atomic flush results. diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 9700e25af5..6a444f0bde 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -2037,6 +2037,7 @@ struct FlushOptions { // is performed by someone else (foreground call or background thread). // Default: false bool allow_write_stall; + FlushOptions() : wait(true), allow_write_stall(false) {} };