Fix a bug that can retain old WAL longer than needed (#13127)

Summary:
The bug only happens for transaction db with 2pc. The main change is in `MemTableList::TryInstallMemtableFlushResults`. Before this fix, `memtables_to_flush` may not include all flushed memtables, and it causes the min_log_number for the flush to be incorrect. The code path for calculating min_log_number is `MemTableList::TryInstallMemtableFlushResults() -> GetDBRecoveryEditForObsoletingMemTables() -> PrecomputeMinLogNumberToKeep2PC() -> FindMinPrepLogReferencedByMemTable()`. Inside `FindMinPrepLogReferencedByMemTable()`, we need to exclude all memtables being flushed.

The PR also includes some documentation changes.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/13127

Test Plan: added a new unit that fails before this change.

Reviewed By: ltamasi

Differential Revision: D65679270

Pulled By: cbi42

fbshipit-source-id: 611f34bd6ef4cba51f8b54cb1be416887b5a9c5e
This commit is contained in:
Changyu Bi 2024-11-11 14:19:45 -08:00 committed by Facebook GitHub Bot
parent 1f0ccd9a15
commit 925435bbd9
5 changed files with 61 additions and 14 deletions

View File

@ -206,6 +206,7 @@ class FlushJob {
// Variables below are set by PickMemTable(): // Variables below are set by PickMemTable():
FileMetaData meta_; FileMetaData meta_;
// Memtables to be flushed by this job. // Memtables to be flushed by this job.
// Ordered by increasing memtable id, i.e., oldest memtable first.
autovector<ReadOnlyMemTable*> mems_; autovector<ReadOnlyMemTable*> mems_;
VersionEdit* edit_; VersionEdit* edit_;
Version* base_; Version* base_;

View File

@ -553,7 +553,6 @@ Status MemTableList::TryInstallMemtableFlushResults(
// (in that order) that have finished flushing. Memtables // (in that order) that have finished flushing. Memtables
// are always committed in the order that they were created. // are always committed in the order that they were created.
uint64_t batch_file_number = 0; uint64_t batch_file_number = 0;
size_t batch_count = 0;
autovector<VersionEdit*> edit_list; autovector<VersionEdit*> edit_list;
autovector<ReadOnlyMemTable*> memtables_to_flush; autovector<ReadOnlyMemTable*> memtables_to_flush;
// enumerate from the last (earliest) element to see how many batch finished // enumerate from the last (earliest) element to see how many batch finished
@ -563,6 +562,7 @@ Status MemTableList::TryInstallMemtableFlushResults(
break; break;
} }
if (it == memlist.rbegin() || batch_file_number != m->file_number_) { if (it == memlist.rbegin() || batch_file_number != m->file_number_) {
// Oldest memtable in a new batch.
batch_file_number = m->file_number_; batch_file_number = m->file_number_;
if (m->edit_.GetBlobFileAdditions().empty()) { if (m->edit_.GetBlobFileAdditions().empty()) {
ROCKS_LOG_BUFFER(log_buffer, ROCKS_LOG_BUFFER(log_buffer,
@ -578,17 +578,17 @@ Status MemTableList::TryInstallMemtableFlushResults(
} }
edit_list.push_back(&m->edit_); edit_list.push_back(&m->edit_);
memtables_to_flush.push_back(m);
std::unique_ptr<FlushJobInfo> info = m->ReleaseFlushJobInfo(); std::unique_ptr<FlushJobInfo> info = m->ReleaseFlushJobInfo();
if (info != nullptr) { if (info != nullptr) {
committed_flush_jobs_info->push_back(std::move(info)); committed_flush_jobs_info->push_back(std::move(info));
} }
} }
batch_count++; memtables_to_flush.push_back(m);
} }
size_t num_mem_to_flush = memtables_to_flush.size();
// TODO(myabandeh): Not sure how batch_count could be 0 here. // TODO(myabandeh): Not sure how batch_count could be 0 here.
if (batch_count > 0) { if (num_mem_to_flush > 0) {
VersionEdit edit; VersionEdit edit;
#ifdef ROCKSDB_ASSERT_STATUS_CHECKED #ifdef ROCKSDB_ASSERT_STATUS_CHECKED
if (memtables_to_flush.size() == memlist.size()) { if (memtables_to_flush.size() == memlist.size()) {
@ -612,9 +612,9 @@ Status MemTableList::TryInstallMemtableFlushResults(
nullptr); nullptr);
edit_list.push_back(&edit); edit_list.push_back(&edit);
const auto manifest_write_cb = [this, cfd, batch_count, log_buffer, const auto manifest_write_cb = [this, cfd, num_mem_to_flush, log_buffer,
to_delete, mu](const Status& status) { to_delete, mu](const Status& status) {
RemoveMemTablesOrRestoreFlags(status, cfd, batch_count, log_buffer, RemoveMemTablesOrRestoreFlags(status, cfd, num_mem_to_flush, log_buffer,
to_delete, mu); to_delete, mu);
}; };
if (write_edits) { if (write_edits) {
@ -627,7 +627,7 @@ Status MemTableList::TryInstallMemtableFlushResults(
// If write_edit is false (e.g: successful mempurge), // If write_edit is false (e.g: successful mempurge),
// then remove old memtables, wake up manifest write queue threads, // then remove old memtables, wake up manifest write queue threads,
// and don't commit anything to the manifest file. // and don't commit anything to the manifest file.
RemoveMemTablesOrRestoreFlags(s, cfd, batch_count, log_buffer, RemoveMemTablesOrRestoreFlags(s, cfd, num_mem_to_flush, log_buffer,
to_delete, mu); to_delete, mu);
// Note: cfd->SetLogNumber is only called when a VersionEdit // Note: cfd->SetLogNumber is only called when a VersionEdit
// is written to MANIFEST. When mempurge is succesful, we skip // is written to MANIFEST. When mempurge is succesful, we skip
@ -735,7 +735,7 @@ void MemTableList::InstallNewVersion() {
} }
void MemTableList::RemoveMemTablesOrRestoreFlags( void MemTableList::RemoveMemTablesOrRestoreFlags(
const Status& s, ColumnFamilyData* cfd, size_t batch_count, const Status& s, ColumnFamilyData* cfd, size_t num_mem_to_flush,
LogBuffer* log_buffer, autovector<ReadOnlyMemTable*>* to_delete, LogBuffer* log_buffer, autovector<ReadOnlyMemTable*>* to_delete,
InstrumentedMutex* mu) { InstrumentedMutex* mu) {
assert(mu); assert(mu);
@ -764,8 +764,11 @@ void MemTableList::RemoveMemTablesOrRestoreFlags(
// read full data as long as column family handle is not deleted, even if // read full data as long as column family handle is not deleted, even if
// the column family is dropped. // the column family is dropped.
if (s.ok() && !cfd->IsDropped()) { // commit new state if (s.ok() && !cfd->IsDropped()) { // commit new state
while (batch_count-- > 0) { while (num_mem_to_flush-- > 0) {
ReadOnlyMemTable* m = current_->memlist_.back(); ReadOnlyMemTable* m = current_->memlist_.back();
// TODO: The logging can be redundant when we flush multiple memtables
// into one SST file. We should only check the edit_ of the oldest
// memtable in the group in that case.
if (m->edit_.GetBlobFileAdditions().empty()) { if (m->edit_.GetBlobFileAdditions().empty()) {
ROCKS_LOG_BUFFER(log_buffer, ROCKS_LOG_BUFFER(log_buffer,
"[%s] Level-0 commit flush result of table #%" PRIu64 "[%s] Level-0 commit flush result of table #%" PRIu64
@ -787,7 +790,7 @@ void MemTableList::RemoveMemTablesOrRestoreFlags(
++mem_id; ++mem_id;
} }
} else { } else {
for (auto it = current_->memlist_.rbegin(); batch_count-- > 0; ++it) { for (auto it = current_->memlist_.rbegin(); num_mem_to_flush-- > 0; ++it) {
ReadOnlyMemTable* m = *it; ReadOnlyMemTable* m = *it;
// commit failed. setup state so that we can flush again. // commit failed. setup state so that we can flush again.
if (m->edit_.GetBlobFileAdditions().empty()) { if (m->edit_.GetBlobFileAdditions().empty()) {
@ -816,7 +819,7 @@ void MemTableList::RemoveMemTablesOrRestoreFlags(
} }
uint64_t MemTableList::PrecomputeMinLogContainingPrepSection( uint64_t MemTableList::PrecomputeMinLogContainingPrepSection(
const std::unordered_set<ReadOnlyMemTable*>* memtables_to_flush) { const std::unordered_set<ReadOnlyMemTable*>* memtables_to_flush) const {
uint64_t min_log = 0; uint64_t min_log = 0;
for (auto& m : current_->memlist_) { for (auto& m : current_->memlist_) {

View File

@ -395,11 +395,13 @@ class MemTableList {
size_t* current_memory_usage() { return &current_memory_usage_; } size_t* current_memory_usage() { return &current_memory_usage_; }
// Returns the min log containing the prep section after memtables listsed in // Returns the WAL number of the oldest WAL that contains a prepared
// `memtables_to_flush` are flushed and their status is persisted in manifest. // transaction that corresponds to the content in this MemTableList,
// after memtables listed in `memtables_to_flush` are flushed and their
// status is persisted in manifest.
uint64_t PrecomputeMinLogContainingPrepSection( uint64_t PrecomputeMinLogContainingPrepSection(
const std::unordered_set<ReadOnlyMemTable*>* memtables_to_flush = const std::unordered_set<ReadOnlyMemTable*>* memtables_to_flush =
nullptr); nullptr) const;
uint64_t GetEarliestMemTableID() const { uint64_t GetEarliestMemTableID() const {
auto& memlist = current_->memlist_; auto& memlist = current_->memlist_;

View File

@ -0,0 +1 @@
* Fix a bug for transaction db with 2pc where an old WAL may be retained longer than needed (#13127).

View File

@ -2202,6 +2202,46 @@ TEST_P(TransactionTest, TwoPhaseLogRollingTest2) {
delete cfa; delete cfa;
delete cfb; delete cfb;
} }
TEST_P(TransactionTest, TwoPhaseLogMultiMemtableFlush) {
// Test that min log number to keep is tracked correctly when
// multiple memtables are flushed together.
DBImpl* db_impl = static_cast_with_check<DBImpl>(db->GetRootDB());
// So that two immutable memtable won't stall writes.
ASSERT_OK(db->SetOptions({{"max_write_buffer_number", "4"}}));
// Pause flush.
ASSERT_OK(db->PauseBackgroundWork());
WriteOptions wopts;
wopts.disableWAL = false;
wopts.sync = true;
TransactionOptions topts;
Transaction* txn1 = db->BeginTransaction(wopts, topts);
ASSERT_OK(txn1->Put("key1", "val1"));
ASSERT_OK(txn1->SetName("xid1"));
ASSERT_OK(txn1->Prepare());
ASSERT_OK(txn1->Commit());
delete txn1;
ASSERT_OK(db_impl->TEST_SwitchMemtable());
Transaction* txn2 = db->BeginTransaction(wopts, topts);
ASSERT_OK(txn2->Put("key2", "val2"));
ASSERT_OK(txn2->SetName("xid2"));
ASSERT_OK(txn2->Prepare());
ASSERT_OK(txn2->Commit());
delete txn2;
ASSERT_OK(db_impl->TEST_SwitchMemtable());
ASSERT_OK(db->ContinueBackgroundWork());
ASSERT_OK(db->Flush({}));
uint64_t cur_wal_num = db_impl->TEST_GetCurrentLogNumber();
// All non-active WALs should be obsolete.
ASSERT_EQ(cur_wal_num, db_impl->MinLogNumberToKeep());
}
/* /*
* 1) use prepare to keep first log around to determine starting sequence * 1) use prepare to keep first log around to determine starting sequence
* during recovery. * during recovery.