diff --git a/db/flush_job.h b/db/flush_job.h index 8cc8821bf5..1c1f15d1b1 100644 --- a/db/flush_job.h +++ b/db/flush_job.h @@ -206,6 +206,7 @@ class FlushJob { // Variables below are set by PickMemTable(): FileMetaData meta_; // Memtables to be flushed by this job. + // Ordered by increasing memtable id, i.e., oldest memtable first. autovector mems_; VersionEdit* edit_; Version* base_; diff --git a/db/memtable_list.cc b/db/memtable_list.cc index 4e1b38b81f..05b085f878 100644 --- a/db/memtable_list.cc +++ b/db/memtable_list.cc @@ -553,7 +553,6 @@ Status MemTableList::TryInstallMemtableFlushResults( // (in that order) that have finished flushing. Memtables // are always committed in the order that they were created. uint64_t batch_file_number = 0; - size_t batch_count = 0; autovector edit_list; autovector memtables_to_flush; // enumerate from the last (earliest) element to see how many batch finished @@ -563,6 +562,7 @@ Status MemTableList::TryInstallMemtableFlushResults( break; } if (it == memlist.rbegin() || batch_file_number != m->file_number_) { + // Oldest memtable in a new batch. batch_file_number = m->file_number_; if (m->edit_.GetBlobFileAdditions().empty()) { ROCKS_LOG_BUFFER(log_buffer, @@ -578,17 +578,17 @@ Status MemTableList::TryInstallMemtableFlushResults( } edit_list.push_back(&m->edit_); - memtables_to_flush.push_back(m); std::unique_ptr info = m->ReleaseFlushJobInfo(); if (info != nullptr) { committed_flush_jobs_info->push_back(std::move(info)); } } - batch_count++; + memtables_to_flush.push_back(m); } + size_t num_mem_to_flush = memtables_to_flush.size(); // TODO(myabandeh): Not sure how batch_count could be 0 here. - if (batch_count > 0) { + if (num_mem_to_flush > 0) { VersionEdit edit; #ifdef ROCKSDB_ASSERT_STATUS_CHECKED if (memtables_to_flush.size() == memlist.size()) { @@ -612,9 +612,9 @@ Status MemTableList::TryInstallMemtableFlushResults( nullptr); edit_list.push_back(&edit); - const auto manifest_write_cb = [this, cfd, batch_count, log_buffer, + const auto manifest_write_cb = [this, cfd, num_mem_to_flush, log_buffer, to_delete, mu](const Status& status) { - RemoveMemTablesOrRestoreFlags(status, cfd, batch_count, log_buffer, + RemoveMemTablesOrRestoreFlags(status, cfd, num_mem_to_flush, log_buffer, to_delete, mu); }; if (write_edits) { @@ -627,7 +627,7 @@ Status MemTableList::TryInstallMemtableFlushResults( // If write_edit is false (e.g: successful mempurge), // then remove old memtables, wake up manifest write queue threads, // and don't commit anything to the manifest file. - RemoveMemTablesOrRestoreFlags(s, cfd, batch_count, log_buffer, + RemoveMemTablesOrRestoreFlags(s, cfd, num_mem_to_flush, log_buffer, to_delete, mu); // Note: cfd->SetLogNumber is only called when a VersionEdit // is written to MANIFEST. When mempurge is succesful, we skip @@ -735,7 +735,7 @@ void MemTableList::InstallNewVersion() { } void MemTableList::RemoveMemTablesOrRestoreFlags( - const Status& s, ColumnFamilyData* cfd, size_t batch_count, + const Status& s, ColumnFamilyData* cfd, size_t num_mem_to_flush, LogBuffer* log_buffer, autovector* to_delete, InstrumentedMutex* mu) { assert(mu); @@ -764,8 +764,11 @@ void MemTableList::RemoveMemTablesOrRestoreFlags( // read full data as long as column family handle is not deleted, even if // the column family is dropped. if (s.ok() && !cfd->IsDropped()) { // commit new state - while (batch_count-- > 0) { + while (num_mem_to_flush-- > 0) { ReadOnlyMemTable* m = current_->memlist_.back(); + // TODO: The logging can be redundant when we flush multiple memtables + // into one SST file. We should only check the edit_ of the oldest + // memtable in the group in that case. if (m->edit_.GetBlobFileAdditions().empty()) { ROCKS_LOG_BUFFER(log_buffer, "[%s] Level-0 commit flush result of table #%" PRIu64 @@ -787,7 +790,7 @@ void MemTableList::RemoveMemTablesOrRestoreFlags( ++mem_id; } } else { - for (auto it = current_->memlist_.rbegin(); batch_count-- > 0; ++it) { + for (auto it = current_->memlist_.rbegin(); num_mem_to_flush-- > 0; ++it) { ReadOnlyMemTable* m = *it; // commit failed. setup state so that we can flush again. if (m->edit_.GetBlobFileAdditions().empty()) { @@ -816,7 +819,7 @@ void MemTableList::RemoveMemTablesOrRestoreFlags( } uint64_t MemTableList::PrecomputeMinLogContainingPrepSection( - const std::unordered_set* memtables_to_flush) { + const std::unordered_set* memtables_to_flush) const { uint64_t min_log = 0; for (auto& m : current_->memlist_) { diff --git a/db/memtable_list.h b/db/memtable_list.h index 48075768cf..d21a53d6b4 100644 --- a/db/memtable_list.h +++ b/db/memtable_list.h @@ -395,11 +395,13 @@ class MemTableList { size_t* current_memory_usage() { return ¤t_memory_usage_; } - // Returns the min log containing the prep section after memtables listsed in - // `memtables_to_flush` are flushed and their status is persisted in manifest. + // Returns the WAL number of the oldest WAL that contains a prepared + // transaction that corresponds to the content in this MemTableList, + // after memtables listed in `memtables_to_flush` are flushed and their + // status is persisted in manifest. uint64_t PrecomputeMinLogContainingPrepSection( const std::unordered_set* memtables_to_flush = - nullptr); + nullptr) const; uint64_t GetEarliestMemTableID() const { auto& memlist = current_->memlist_; diff --git a/unreleased_history/bug_fixes/old_wal_2pc.md b/unreleased_history/bug_fixes/old_wal_2pc.md new file mode 100644 index 0000000000..4989c80a88 --- /dev/null +++ b/unreleased_history/bug_fixes/old_wal_2pc.md @@ -0,0 +1 @@ +* Fix a bug for transaction db with 2pc where an old WAL may be retained longer than needed (#13127). \ No newline at end of file diff --git a/utilities/transactions/transaction_test.cc b/utilities/transactions/transaction_test.cc index 37c11874aa..a089605d37 100644 --- a/utilities/transactions/transaction_test.cc +++ b/utilities/transactions/transaction_test.cc @@ -2202,6 +2202,46 @@ TEST_P(TransactionTest, TwoPhaseLogRollingTest2) { delete cfa; delete cfb; } + +TEST_P(TransactionTest, TwoPhaseLogMultiMemtableFlush) { + // Test that min log number to keep is tracked correctly when + // multiple memtables are flushed together. + DBImpl* db_impl = static_cast_with_check(db->GetRootDB()); + // So that two immutable memtable won't stall writes. + ASSERT_OK(db->SetOptions({{"max_write_buffer_number", "4"}})); + // Pause flush. + ASSERT_OK(db->PauseBackgroundWork()); + + WriteOptions wopts; + wopts.disableWAL = false; + wopts.sync = true; + TransactionOptions topts; + Transaction* txn1 = db->BeginTransaction(wopts, topts); + ASSERT_OK(txn1->Put("key1", "val1")); + ASSERT_OK(txn1->SetName("xid1")); + ASSERT_OK(txn1->Prepare()); + ASSERT_OK(txn1->Commit()); + delete txn1; + + ASSERT_OK(db_impl->TEST_SwitchMemtable()); + + Transaction* txn2 = db->BeginTransaction(wopts, topts); + ASSERT_OK(txn2->Put("key2", "val2")); + ASSERT_OK(txn2->SetName("xid2")); + ASSERT_OK(txn2->Prepare()); + ASSERT_OK(txn2->Commit()); + delete txn2; + + ASSERT_OK(db_impl->TEST_SwitchMemtable()); + + ASSERT_OK(db->ContinueBackgroundWork()); + ASSERT_OK(db->Flush({})); + + uint64_t cur_wal_num = db_impl->TEST_GetCurrentLogNumber(); + // All non-active WALs should be obsolete. + ASSERT_EQ(cur_wal_num, db_impl->MinLogNumberToKeep()); +} + /* * 1) use prepare to keep first log around to determine starting sequence * during recovery.