diff --git a/HISTORY.md b/HISTORY.md index 5430d20cad..28ae4eac9b 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -1,5 +1,7 @@ # Rocksdb Change Log ## Unreleased +### Bug Fixes +* Fixed a data race on `ColumnFamilyData::flush_reason` caused by concurrent flushes. ## 7.10.0 (01/23/2023) ### Behavior changes diff --git a/db/column_family.cc b/db/column_family.cc index 8124b23cd7..3a1d22c3df 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -557,7 +557,6 @@ ColumnFamilyData::ColumnFamilyData( next_(nullptr), prev_(nullptr), log_number_(0), - flush_reason_(FlushReason::kOthers), column_family_set_(column_family_set), queued_for_flush_(false), queued_for_compaction_(false), diff --git a/db/column_family.h b/db/column_family.h index ff4eca514b..0c696ed4e2 100644 --- a/db/column_family.h +++ b/db/column_family.h @@ -310,10 +310,6 @@ class ColumnFamilyData { void SetLogNumber(uint64_t log_number) { log_number_ = log_number; } uint64_t GetLogNumber() const { return log_number_; } - void SetFlushReason(FlushReason flush_reason) { - flush_reason_ = flush_reason; - } - FlushReason GetFlushReason() const { return flush_reason_; } // thread-safe const FileOptions* soptions() const; const ImmutableOptions* ioptions() const { return &ioptions_; } @@ -616,8 +612,6 @@ class ColumnFamilyData { // recovered from uint64_t log_number_; - std::atomic flush_reason_; - // An object that keeps all the compaction stats // and picks the next compaction std::unique_ptr compaction_picker_; diff --git a/db/db_flush_test.cc b/db/db_flush_test.cc index 3b3f7e1836..0587afb3a5 100644 --- a/db/db_flush_test.cc +++ b/db/db_flush_test.cc @@ -746,6 +746,64 @@ class TestFlushListener : public EventListener { }; #endif // !ROCKSDB_LITE +// RocksDB lite does not support GetLiveFiles() +#ifndef ROCKSDB_LITE +TEST_F(DBFlushTest, FixFlushReasonRaceFromConcurrentFlushes) { + Options options = CurrentOptions(); + options.atomic_flush = true; + options.disable_auto_compactions = true; + CreateAndReopenWithCF({"cf1"}, options); + + for (int idx = 0; idx < 1; ++idx) { + ASSERT_OK(Put(0, Key(idx), std::string(1, 'v'))); + ASSERT_OK(Put(1, Key(idx), std::string(1, 'v'))); + } + + // To coerce a manual flush happenning in the middle of GetLiveFiles's flush, + // we need to pause background flush thread and enable it later. + std::shared_ptr sleeping_task = + std::make_shared(); + env_->SetBackgroundThreads(1, Env::HIGH); + env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, + sleeping_task.get(), Env::Priority::HIGH); + sleeping_task->WaitUntilSleeping(); + + // Coerce a manual flush happenning in the middle of GetLiveFiles's flush + bool get_live_files_paused_at_sync_point = false; + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::AtomicFlushMemTables:AfterScheduleFlush", [&](void* /* arg */) { + if (get_live_files_paused_at_sync_point) { + // To prevent non-GetLiveFiles() flush from pausing at this sync point + return; + } + get_live_files_paused_at_sync_point = true; + + FlushOptions fo; + fo.wait = false; + fo.allow_write_stall = true; + ASSERT_OK(dbfull()->Flush(fo)); + + // Resume background flush thread so GetLiveFiles() can finish + sleeping_task->WakeUp(); + sleeping_task->WaitUntilDone(); + }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + + std::vector files; + uint64_t manifest_file_size; + // Before the fix, a race condition on default cf's flush reason due to + // concurrent GetLiveFiles's flush and manual flush will fail + // an internal assertion. + // After the fix, such race condition is fixed and there is no assertion + // failure. + ASSERT_OK(db_->GetLiveFiles(files, &manifest_file_size, /*flush*/ true)); + ASSERT_TRUE(get_live_files_paused_at_sync_point); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks(); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); +} +#endif // !ROCKSDB_LITE + TEST_F(DBFlushTest, MemPurgeBasic) { Options options = CurrentOptions(); @@ -2440,7 +2498,9 @@ TEST_P(DBAtomicFlushTest, ManualFlushUnder2PC) { options.atomic_flush = GetParam(); // 64MB so that memtable flush won't be trigger by the small writes. options.write_buffer_size = (static_cast(64) << 20); - + auto flush_listener = std::make_shared(); + flush_listener->expected_flush_reason = FlushReason::kManualFlush; + options.listeners.push_back(flush_listener); // Destroy the DB to recreate as a TransactionDB. Close(); Destroy(options, true); @@ -2507,7 +2567,6 @@ TEST_P(DBAtomicFlushTest, ManualFlushUnder2PC) { auto cfh = static_cast(handles_[i]); ASSERT_EQ(0, cfh->cfd()->imm()->NumNotFlushed()); ASSERT_TRUE(cfh->cfd()->mem()->IsEmpty()); - ASSERT_EQ(cfh->cfd()->GetFlushReason(), FlushReason::kManualFlush); } // The recovered min log number with prepared data should be non-zero. @@ -2520,13 +2579,15 @@ TEST_P(DBAtomicFlushTest, ManualFlushUnder2PC) { ASSERT_TRUE(db_impl->allow_2pc()); ASSERT_NE(db_impl->MinLogNumberToKeep(), 0); } -#endif // ROCKSDB_LITE TEST_P(DBAtomicFlushTest, ManualAtomicFlush) { Options options = CurrentOptions(); options.create_if_missing = true; options.atomic_flush = GetParam(); options.write_buffer_size = (static_cast(64) << 20); + auto flush_listener = std::make_shared(); + flush_listener->expected_flush_reason = FlushReason::kManualFlush; + options.listeners.push_back(flush_listener); CreateAndReopenWithCF({"pikachu", "eevee"}, options); size_t num_cfs = handles_.size(); @@ -2551,11 +2612,11 @@ TEST_P(DBAtomicFlushTest, ManualAtomicFlush) { for (size_t i = 0; i != num_cfs; ++i) { auto cfh = static_cast(handles_[i]); - ASSERT_EQ(cfh->cfd()->GetFlushReason(), FlushReason::kManualFlush); ASSERT_EQ(0, cfh->cfd()->imm()->NumNotFlushed()); ASSERT_TRUE(cfh->cfd()->mem()->IsEmpty()); } } +#endif // ROCKSDB_LITE TEST_P(DBAtomicFlushTest, PrecomputeMinLogNumberToKeepNon2PC) { Options options = CurrentOptions(); diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index f930878dee..24fb538906 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -604,7 +604,7 @@ Status DBImpl::CloseHelper() { while (!flush_queue_.empty()) { const FlushRequest& flush_req = PopFirstFromFlushQueue(); - for (const auto& iter : flush_req) { + for (const auto& iter : flush_req.cfd_to_max_mem_id_to_persist) { iter.first->UnrefAndTryDelete(); } } diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 9fcd9efea2..e56f97ecec 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -16,6 +16,7 @@ #include #include #include +#include #include #include @@ -1383,7 +1384,7 @@ class DBImpl : public DB { void NotifyOnFlushBegin(ColumnFamilyData* cfd, FileMetaData* file_meta, const MutableCFOptions& mutable_cf_options, - int job_id); + int job_id, FlushReason flush_reason); void NotifyOnFlushCompleted( ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options, @@ -1675,12 +1676,17 @@ class DBImpl : public DB { // Argument required by background flush thread. struct BGFlushArg { BGFlushArg() - : cfd_(nullptr), max_memtable_id_(0), superversion_context_(nullptr) {} + : cfd_(nullptr), + max_memtable_id_(0), + superversion_context_(nullptr), + flush_reason_(FlushReason::kOthers) {} BGFlushArg(ColumnFamilyData* cfd, uint64_t max_memtable_id, - SuperVersionContext* superversion_context) + SuperVersionContext* superversion_context, + FlushReason flush_reason) : cfd_(cfd), max_memtable_id_(max_memtable_id), - superversion_context_(superversion_context) {} + superversion_context_(superversion_context), + flush_reason_(flush_reason) {} // Column family to flush. ColumnFamilyData* cfd_; @@ -1691,6 +1697,7 @@ class DBImpl : public DB { // installs a new superversion for the column family. This operation // requires a SuperVersionContext object (currently embedded in JobContext). SuperVersionContext* superversion_context_; + FlushReason flush_reason_; }; // Argument passed to flush thread. @@ -1819,7 +1826,7 @@ class DBImpl : public DB { // installs a new super version for the column family. Status FlushMemTableToOutputFile( ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options, - bool* madeProgress, JobContext* job_context, + bool* madeProgress, JobContext* job_context, FlushReason flush_reason, SuperVersionContext* superversion_context, std::vector& snapshot_seqs, SequenceNumber earliest_write_conflict_snapshot, @@ -2029,18 +2036,22 @@ class DBImpl : public DB { void MaybeScheduleFlushOrCompaction(); - // A flush request specifies the column families to flush as well as the - // largest memtable id to persist for each column family. Once all the - // memtables whose IDs are smaller than or equal to this per-column-family - // specified value, this flush request is considered to have completed its - // work of flushing this column family. After completing the work for all - // column families in this request, this flush is considered complete. - using FlushRequest = std::vector>; + struct FlushRequest { + FlushReason flush_reason; + // A map from column family to flush to largest memtable id to persist for + // each column family. Once all the memtables whose IDs are smaller than or + // equal to this per-column-family specified value, this flush request is + // considered to have completed its work of flushing this column family. + // After completing the work for all column families in this request, this + // flush is considered complete. + std::unordered_map + cfd_to_max_mem_id_to_persist; + }; void GenerateFlushRequest(const autovector& cfds, - FlushRequest* req); + FlushReason flush_reason, FlushRequest* req); - void SchedulePendingFlush(const FlushRequest& req, FlushReason flush_reason); + void SchedulePendingFlush(const FlushRequest& req); void SchedulePendingCompaction(ColumnFamilyData* cfd); void SchedulePendingPurge(std::string fname, std::string dir_to_sync, diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index d14b67876f..b52ad58965 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -155,7 +155,7 @@ IOStatus DBImpl::SyncClosedLogs(JobContext* job_context, Status DBImpl::FlushMemTableToOutputFile( ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options, - bool* made_progress, JobContext* job_context, + bool* made_progress, JobContext* job_context, FlushReason flush_reason, SuperVersionContext* superversion_context, std::vector& snapshot_seqs, SequenceNumber earliest_write_conflict_snapshot, @@ -215,7 +215,8 @@ Status DBImpl::FlushMemTableToOutputFile( dbname_, cfd, immutable_db_options_, mutable_cf_options, max_memtable_id, file_options_for_compaction_, versions_.get(), &mutex_, &shutting_down_, snapshot_seqs, earliest_write_conflict_snapshot, snapshot_checker, - job_context, log_buffer, directories_.GetDbDir(), GetDataDir(cfd, 0U), + job_context, flush_reason, log_buffer, directories_.GetDbDir(), + GetDataDir(cfd, 0U), GetCompressionFlush(*cfd->ioptions(), mutable_cf_options), stats_, &event_logger_, mutable_cf_options.report_bg_io_stats, true /* sync_output_directory */, true /* write_manifest */, thread_pri, @@ -260,7 +261,8 @@ Status DBImpl::FlushMemTableToOutputFile( #ifndef ROCKSDB_LITE // may temporarily unlock and lock the mutex. - NotifyOnFlushBegin(cfd, &file_meta, mutable_cf_options, job_context->job_id); + NotifyOnFlushBegin(cfd, &file_meta, mutable_cf_options, job_context->job_id, + flush_reason); #endif // ROCKSDB_LITE bool switched_to_mempurge = false; @@ -390,8 +392,9 @@ Status DBImpl::FlushMemTablesToOutputFiles( MutableCFOptions mutable_cf_options_copy = *cfd->GetLatestMutableCFOptions(); SuperVersionContext* superversion_context = bg_flush_arg.superversion_context_; + FlushReason flush_reason = bg_flush_arg.flush_reason_; Status s = FlushMemTableToOutputFile( - cfd, mutable_cf_options_copy, made_progress, job_context, + cfd, mutable_cf_options_copy, made_progress, job_context, flush_reason, superversion_context, snapshot_seqs, earliest_write_conflict_snapshot, snapshot_checker, log_buffer, thread_pri); return s; @@ -420,7 +423,9 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( for (const auto cfd : cfds) { assert(cfd->imm()->NumNotFlushed() != 0); assert(cfd->imm()->IsFlushPending()); - assert(cfd->GetFlushReason() == cfds[0]->GetFlushReason()); + } + for (const auto bg_flush_arg : bg_flush_args) { + assert(bg_flush_arg.flush_reason_ == bg_flush_args[0].flush_reason_); } #endif /* !NDEBUG */ @@ -459,13 +464,15 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( all_mutable_cf_options.emplace_back(*cfd->GetLatestMutableCFOptions()); const MutableCFOptions& mutable_cf_options = all_mutable_cf_options.back(); uint64_t max_memtable_id = bg_flush_args[i].max_memtable_id_; + FlushReason flush_reason = bg_flush_args[i].flush_reason_; jobs.emplace_back(new FlushJob( dbname_, cfd, immutable_db_options_, mutable_cf_options, max_memtable_id, file_options_for_compaction_, versions_.get(), &mutex_, &shutting_down_, snapshot_seqs, earliest_write_conflict_snapshot, - snapshot_checker, job_context, log_buffer, directories_.GetDbDir(), - data_dir, GetCompressionFlush(*cfd->ioptions(), mutable_cf_options), - stats_, &event_logger_, mutable_cf_options.report_bg_io_stats, + snapshot_checker, job_context, flush_reason, log_buffer, + directories_.GetDbDir(), data_dir, + GetCompressionFlush(*cfd->ioptions(), mutable_cf_options), stats_, + &event_logger_, mutable_cf_options.report_bg_io_stats, false /* sync_output_directory */, false /* write_manifest */, thread_pri, io_tracer_, seqno_time_mapping_, db_id_, db_session_id_, cfd->GetFullHistoryTsLow(), &blob_callback_)); @@ -483,8 +490,9 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( for (int i = 0; i != num_cfs; ++i) { const MutableCFOptions& mutable_cf_options = all_mutable_cf_options.at(i); // may temporarily unlock and lock the mutex. + FlushReason flush_reason = bg_flush_args[i].flush_reason_; NotifyOnFlushBegin(cfds[i], &file_meta[i], mutable_cf_options, - job_context->job_id); + job_context->job_id, flush_reason); } #endif /* !ROCKSDB_LITE */ @@ -642,8 +650,9 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( bool resuming_from_bg_err = error_handler_.IsDBStopped() || - (cfds[0]->GetFlushReason() == FlushReason::kErrorRecovery || - cfds[0]->GetFlushReason() == FlushReason::kErrorRecoveryRetryFlush); + (bg_flush_args[0].flush_reason_ == FlushReason::kErrorRecovery || + bg_flush_args[0].flush_reason_ == + FlushReason::kErrorRecoveryRetryFlush); while ((!resuming_from_bg_err || error_handler_.GetRecoveryError().ok())) { std::pair res = wait_to_install_func(); @@ -660,8 +669,9 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( resuming_from_bg_err = error_handler_.IsDBStopped() || - (cfds[0]->GetFlushReason() == FlushReason::kErrorRecovery || - cfds[0]->GetFlushReason() == FlushReason::kErrorRecoveryRetryFlush); + (bg_flush_args[0].flush_reason_ == FlushReason::kErrorRecovery || + bg_flush_args[0].flush_reason_ == + FlushReason::kErrorRecoveryRetryFlush); } if (!resuming_from_bg_err) { @@ -816,7 +826,7 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( void DBImpl::NotifyOnFlushBegin(ColumnFamilyData* cfd, FileMetaData* file_meta, const MutableCFOptions& mutable_cf_options, - int job_id) { + int job_id, FlushReason flush_reason) { #ifndef ROCKSDB_LITE if (immutable_db_options_.listeners.size() == 0U) { return; @@ -849,7 +859,7 @@ void DBImpl::NotifyOnFlushBegin(ColumnFamilyData* cfd, FileMetaData* file_meta, info.triggered_writes_stop = triggered_writes_stop; info.smallest_seqno = file_meta->fd.smallest_seqno; info.largest_seqno = file_meta->fd.largest_seqno; - info.flush_reason = cfd->GetFlushReason(); + info.flush_reason = flush_reason; for (auto listener : immutable_db_options_.listeners) { listener->OnFlushBegin(this, info); } @@ -862,6 +872,7 @@ void DBImpl::NotifyOnFlushBegin(ColumnFamilyData* cfd, FileMetaData* file_meta, (void)file_meta; (void)mutable_cf_options; (void)job_id; + (void)flush_reason; #endif // ROCKSDB_LITE } @@ -2102,16 +2113,17 @@ Status DBImpl::RunManualCompaction( } void DBImpl::GenerateFlushRequest(const autovector& cfds, - FlushRequest* req) { + FlushReason flush_reason, FlushRequest* req) { assert(req != nullptr); - req->reserve(cfds.size()); + req->flush_reason = flush_reason; + req->cfd_to_max_mem_id_to_persist.reserve(cfds.size()); for (const auto cfd : cfds) { if (nullptr == cfd) { // cfd may be null, see DBImpl::ScheduleFlushes continue; } uint64_t max_memtable_id = cfd->imm()->GetLatestMemTableID(); - req->emplace_back(cfd, max_memtable_id); + req->cfd_to_max_mem_id_to_persist.emplace(cfd, max_memtable_id); } } @@ -2169,7 +2181,7 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd, if (s.ok()) { if (cfd->imm()->NumNotFlushed() != 0 || !cfd->mem()->IsEmpty() || !cached_recoverable_state_empty_.load()) { - FlushRequest req{{cfd, flush_memtable_id}}; + FlushRequest req{flush_reason, {{cfd, flush_memtable_id}}}; flush_reqs.emplace_back(std::move(req)); memtable_ids_to_wait.emplace_back(cfd->imm()->GetLatestMemTableID()); } @@ -2197,7 +2209,7 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd, "to avoid holding old logs", cfd->GetName().c_str()); s = SwitchMemtable(cfd_stats, &context); - FlushRequest req{{cfd_stats, flush_memtable_id}}; + FlushRequest req{flush_reason, {{cfd_stats, flush_memtable_id}}}; flush_reqs.emplace_back(std::move(req)); memtable_ids_to_wait.emplace_back( cfd_stats->imm()->GetLatestMemTableID()); @@ -2208,8 +2220,9 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd, if (s.ok() && !flush_reqs.empty()) { for (const auto& req : flush_reqs) { - assert(req.size() == 1); - ColumnFamilyData* loop_cfd = req[0].first; + assert(req.cfd_to_max_mem_id_to_persist.size() == 1); + ColumnFamilyData* loop_cfd = + req.cfd_to_max_mem_id_to_persist.begin()->first; loop_cfd->imm()->FlushRequested(); } // If the caller wants to wait for this flush to complete, it indicates @@ -2218,13 +2231,14 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd, // Therefore, we increase the cfd's ref count. if (flush_options.wait) { for (const auto& req : flush_reqs) { - assert(req.size() == 1); - ColumnFamilyData* loop_cfd = req[0].first; + assert(req.cfd_to_max_mem_id_to_persist.size() == 1); + ColumnFamilyData* loop_cfd = + req.cfd_to_max_mem_id_to_persist.begin()->first; loop_cfd->Ref(); } } for (const auto& req : flush_reqs) { - SchedulePendingFlush(req, flush_reason); + SchedulePendingFlush(req); } MaybeScheduleFlushOrCompaction(); } @@ -2243,8 +2257,8 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd, autovector flush_memtable_ids; assert(flush_reqs.size() == memtable_ids_to_wait.size()); for (size_t i = 0; i < flush_reqs.size(); ++i) { - assert(flush_reqs[i].size() == 1); - cfds.push_back(flush_reqs[i][0].first); + assert(flush_reqs[i].cfd_to_max_mem_id_to_persist.size() == 1); + cfds.push_back(flush_reqs[i].cfd_to_max_mem_id_to_persist.begin()->first); flush_memtable_ids.push_back(&(memtable_ids_to_wait[i])); } s = WaitForFlushMemTables( @@ -2341,8 +2355,8 @@ Status DBImpl::AtomicFlushMemTables( cfd->Ref(); } } - GenerateFlushRequest(cfds, &flush_req); - SchedulePendingFlush(flush_req, flush_reason); + GenerateFlushRequest(cfds, flush_reason, &flush_req); + SchedulePendingFlush(flush_req); MaybeScheduleFlushOrCompaction(); } @@ -2357,7 +2371,7 @@ Status DBImpl::AtomicFlushMemTables( TEST_SYNC_POINT("DBImpl::AtomicFlushMemTables:BeforeWaitForBgFlush"); if (s.ok() && flush_options.wait) { autovector flush_memtable_ids; - for (auto& iter : flush_req) { + for (auto& iter : flush_req.cfd_to_max_mem_id_to_persist) { flush_memtable_ids.push_back(&(iter.second)); } s = WaitForFlushMemTables( @@ -2704,9 +2718,9 @@ DBImpl::FlushRequest DBImpl::PopFirstFromFlushQueue() { FlushRequest flush_req = flush_queue_.front(); flush_queue_.pop_front(); if (!immutable_db_options_.atomic_flush) { - assert(flush_req.size() == 1); + assert(flush_req.cfd_to_max_mem_id_to_persist.size() == 1); } - for (const auto& elem : flush_req) { + for (const auto& elem : flush_req.cfd_to_max_mem_id_to_persist) { if (!immutable_db_options_.atomic_flush) { ColumnFamilyData* cfd = elem.first; assert(cfd); @@ -2714,7 +2728,6 @@ DBImpl::FlushRequest DBImpl::PopFirstFromFlushQueue() { cfd->set_queued_for_flush(false); } } - // TODO: need to unset flush reason? return flush_req; } @@ -2744,31 +2757,29 @@ ColumnFamilyData* DBImpl::PickCompactionFromQueue( return cfd; } -void DBImpl::SchedulePendingFlush(const FlushRequest& flush_req, - FlushReason flush_reason) { +void DBImpl::SchedulePendingFlush(const FlushRequest& flush_req) { mutex_.AssertHeld(); - if (flush_req.empty()) { + if (flush_req.cfd_to_max_mem_id_to_persist.empty()) { return; } if (!immutable_db_options_.atomic_flush) { // For the non-atomic flush case, we never schedule multiple column // families in the same flush request. - assert(flush_req.size() == 1); - ColumnFamilyData* cfd = flush_req[0].first; + assert(flush_req.cfd_to_max_mem_id_to_persist.size() == 1); + ColumnFamilyData* cfd = + flush_req.cfd_to_max_mem_id_to_persist.begin()->first; assert(cfd); if (!cfd->queued_for_flush() && cfd->imm()->IsFlushPending()) { cfd->Ref(); cfd->set_queued_for_flush(true); - cfd->SetFlushReason(flush_reason); ++unscheduled_flushes_; flush_queue_.push_back(flush_req); } } else { - for (auto& iter : flush_req) { + for (auto& iter : flush_req.cfd_to_max_mem_id_to_persist) { ColumnFamilyData* cfd = iter.first; cfd->Ref(); - cfd->SetFlushReason(flush_reason); } ++unscheduled_flushes_; flush_queue_.push_back(flush_req); @@ -2900,10 +2911,12 @@ Status DBImpl::BackgroundFlush(bool* made_progress, JobContext* job_context, while (!flush_queue_.empty()) { // This cfd is already referenced const FlushRequest& flush_req = PopFirstFromFlushQueue(); + FlushReason flush_reason = flush_req.flush_reason; superversion_contexts.clear(); - superversion_contexts.reserve(flush_req.size()); + superversion_contexts.reserve( + flush_req.cfd_to_max_mem_id_to_persist.size()); - for (const auto& iter : flush_req) { + for (const auto& iter : flush_req.cfd_to_max_mem_id_to_persist) { ColumnFamilyData* cfd = iter.first; if (cfd->GetMempurgeUsed()) { // If imm() contains silent memtables (e.g.: because @@ -2919,7 +2932,7 @@ Status DBImpl::BackgroundFlush(bool* made_progress, JobContext* job_context, } superversion_contexts.emplace_back(SuperVersionContext(true)); bg_flush_args.emplace_back(cfd, iter.second, - &(superversion_contexts.back())); + &(superversion_contexts.back()), flush_reason); } if (!bg_flush_args.empty()) { break; @@ -2943,9 +2956,14 @@ Status DBImpl::BackgroundFlush(bool* made_progress, JobContext* job_context, status = FlushMemTablesToOutputFiles(bg_flush_args, made_progress, job_context, log_buffer, thread_pri); TEST_SYNC_POINT("DBImpl::BackgroundFlush:BeforeFlush"); - // All the CFDs in the FlushReq must have the same flush reason, so just - // grab the first one - *reason = bg_flush_args[0].cfd_->GetFlushReason(); +// All the CFD/bg_flush_arg in the FlushReq must have the same flush reason, so +// just grab the first one +#ifndef NDEBUG + for (const auto bg_flush_arg : bg_flush_args) { + assert(bg_flush_arg.flush_reason_ == bg_flush_args[0].flush_reason_); + } +#endif /* !NDEBUG */ + *reason = bg_flush_args[0].flush_reason_; for (auto& arg : bg_flush_args) { ColumnFamilyData* cfd = arg.cfd_; if (cfd->UnrefAndTryDelete()) { diff --git a/db/db_impl/db_impl_write.cc b/db/db_impl/db_impl_write.cc index cbeab046fd..59c29e4248 100644 --- a/db/db_impl/db_impl_write.cc +++ b/db/db_impl/db_impl_write.cc @@ -1653,14 +1653,14 @@ Status DBImpl::SwitchWAL(WriteContext* write_context) { cfd->imm()->FlushRequested(); if (!immutable_db_options_.atomic_flush) { FlushRequest flush_req; - GenerateFlushRequest({cfd}, &flush_req); - SchedulePendingFlush(flush_req, FlushReason::kWalFull); + GenerateFlushRequest({cfd}, FlushReason::kWalFull, &flush_req); + SchedulePendingFlush(flush_req); } } if (immutable_db_options_.atomic_flush) { FlushRequest flush_req; - GenerateFlushRequest(cfds, &flush_req); - SchedulePendingFlush(flush_req, FlushReason::kWalFull); + GenerateFlushRequest(cfds, FlushReason::kWalFull, &flush_req); + SchedulePendingFlush(flush_req); } MaybeScheduleFlushOrCompaction(); } @@ -1744,14 +1744,15 @@ Status DBImpl::HandleWriteBufferManagerFlush(WriteContext* write_context) { cfd->imm()->FlushRequested(); if (!immutable_db_options_.atomic_flush) { FlushRequest flush_req; - GenerateFlushRequest({cfd}, &flush_req); - SchedulePendingFlush(flush_req, FlushReason::kWriteBufferManager); + GenerateFlushRequest({cfd}, FlushReason::kWriteBufferManager, + &flush_req); + SchedulePendingFlush(flush_req); } } if (immutable_db_options_.atomic_flush) { FlushRequest flush_req; - GenerateFlushRequest(cfds, &flush_req); - SchedulePendingFlush(flush_req, FlushReason::kWriteBufferManager); + GenerateFlushRequest(cfds, FlushReason::kWriteBufferManager, &flush_req); + SchedulePendingFlush(flush_req); } MaybeScheduleFlushOrCompaction(); } @@ -2008,13 +2009,13 @@ Status DBImpl::ScheduleFlushes(WriteContext* context) { if (immutable_db_options_.atomic_flush) { AssignAtomicFlushSeq(cfds); FlushRequest flush_req; - GenerateFlushRequest(cfds, &flush_req); - SchedulePendingFlush(flush_req, FlushReason::kWriteBufferFull); + GenerateFlushRequest(cfds, FlushReason::kWriteBufferFull, &flush_req); + SchedulePendingFlush(flush_req); } else { for (auto* cfd : cfds) { FlushRequest flush_req; - GenerateFlushRequest({cfd}, &flush_req); - SchedulePendingFlush(flush_req, FlushReason::kWriteBufferFull); + GenerateFlushRequest({cfd}, FlushReason::kWriteBufferFull, &flush_req); + SchedulePendingFlush(flush_req); } } MaybeScheduleFlushOrCompaction(); diff --git a/db/flush_job.cc b/db/flush_job.cc index ac84da4cae..e99497a1c6 100644 --- a/db/flush_job.cc +++ b/db/flush_job.cc @@ -91,7 +91,7 @@ FlushJob::FlushJob( std::vector existing_snapshots, SequenceNumber earliest_write_conflict_snapshot, SnapshotChecker* snapshot_checker, JobContext* job_context, - LogBuffer* log_buffer, FSDirectory* db_directory, + FlushReason flush_reason, LogBuffer* log_buffer, FSDirectory* db_directory, FSDirectory* output_file_directory, CompressionType output_compression, Statistics* stats, EventLogger* event_logger, bool measure_io_stats, const bool sync_output_directory, const bool write_manifest, @@ -114,6 +114,7 @@ FlushJob::FlushJob( earliest_write_conflict_snapshot_(earliest_write_conflict_snapshot), snapshot_checker_(snapshot_checker), job_context_(job_context), + flush_reason_(flush_reason), log_buffer_(log_buffer), db_directory_(db_directory), output_file_directory_(output_file_directory), @@ -245,9 +246,8 @@ Status FlushJob::Run(LogsWithPrepTracker* prep_tracker, FileMetaData* file_meta, } Status mempurge_s = Status::NotFound("No MemPurge."); if ((mempurge_threshold > 0.0) && - (cfd_->GetFlushReason() == FlushReason::kWriteBufferFull) && - (!mems_.empty()) && MemPurgeDecider(mempurge_threshold) && - !(db_options_.atomic_flush)) { + (flush_reason_ == FlushReason::kWriteBufferFull) && (!mems_.empty()) && + MemPurgeDecider(mempurge_threshold) && !(db_options_.atomic_flush)) { cfd_->SetMempurgeUsed(); mempurge_s = MemPurge(); if (!mempurge_s.ok()) { @@ -878,7 +878,7 @@ Status FlushJob::WriteLevel0Table() { << total_num_deletes << "total_data_size" << total_data_size << "memory_usage" << total_memory_usage << "flush_reason" - << GetFlushReasonString(cfd_->GetFlushReason()); + << GetFlushReasonString(flush_reason_); { ScopedArenaIterator iter( @@ -1076,7 +1076,7 @@ std::unique_ptr FlushJob::GetFlushJobInfo() const { info->smallest_seqno = meta_.fd.smallest_seqno; info->largest_seqno = meta_.fd.largest_seqno; info->table_properties = table_properties_; - info->flush_reason = cfd_->GetFlushReason(); + info->flush_reason = flush_reason_; info->blob_compression_type = mutable_cf_options_.blob_compression_type; // Update BlobFilesInfo. diff --git a/db/flush_job.h b/db/flush_job.h index 60c272aec3..062ef29976 100644 --- a/db/flush_job.h +++ b/db/flush_job.h @@ -67,8 +67,8 @@ class FlushJob { std::vector existing_snapshots, SequenceNumber earliest_write_conflict_snapshot, SnapshotChecker* snapshot_checker, JobContext* job_context, - LogBuffer* log_buffer, FSDirectory* db_directory, - FSDirectory* output_file_directory, + FlushReason flush_reason, LogBuffer* log_buffer, + FSDirectory* db_directory, FSDirectory* output_file_directory, CompressionType output_compression, Statistics* stats, EventLogger* event_logger, bool measure_io_stats, const bool sync_output_directory, const bool write_manifest, @@ -150,6 +150,7 @@ class FlushJob { SequenceNumber earliest_write_conflict_snapshot_; SnapshotChecker* snapshot_checker_; JobContext* job_context_; + FlushReason flush_reason_; LogBuffer* log_buffer_; FSDirectory* db_directory_; FSDirectory* output_file_directory_; diff --git a/db/flush_job_test.cc b/db/flush_job_test.cc index f994b4e9b5..003a1a6570 100644 --- a/db/flush_job_test.cc +++ b/db/flush_job_test.cc @@ -164,15 +164,15 @@ TEST_F(FlushJobTest, Empty) { auto cfd = versions_->GetColumnFamilySet()->GetDefault(); EventLogger event_logger(db_options_.info_log.get()); SnapshotChecker* snapshot_checker = nullptr; // not relavant - FlushJob flush_job(dbname_, versions_->GetColumnFamilySet()->GetDefault(), - db_options_, *cfd->GetLatestMutableCFOptions(), - std::numeric_limits::max() /* memtable_id */, - env_options_, versions_.get(), &mutex_, &shutting_down_, - {}, kMaxSequenceNumber, snapshot_checker, &job_context, - nullptr, nullptr, nullptr, kNoCompression, nullptr, - &event_logger, false, true /* sync_output_directory */, - true /* write_manifest */, Env::Priority::USER, - nullptr /*IOTracer*/, empty_seqno_to_time_mapping_); + FlushJob flush_job( + dbname_, versions_->GetColumnFamilySet()->GetDefault(), db_options_, + *cfd->GetLatestMutableCFOptions(), + std::numeric_limits::max() /* memtable_id */, env_options_, + versions_.get(), &mutex_, &shutting_down_, {}, kMaxSequenceNumber, + snapshot_checker, &job_context, FlushReason::kTest, nullptr, nullptr, + nullptr, kNoCompression, nullptr, &event_logger, false, + true /* sync_output_directory */, true /* write_manifest */, + Env::Priority::USER, nullptr /*IOTracer*/, empty_seqno_to_time_mapping_); { InstrumentedMutexLock l(&mutex_); flush_job.PickMemTable(); @@ -255,9 +255,9 @@ TEST_F(FlushJobTest, NonEmpty) { *cfd->GetLatestMutableCFOptions(), std::numeric_limits::max() /* memtable_id */, env_options_, versions_.get(), &mutex_, &shutting_down_, {}, kMaxSequenceNumber, - snapshot_checker, &job_context, nullptr, nullptr, nullptr, kNoCompression, - db_options_.statistics.get(), &event_logger, true, - true /* sync_output_directory */, true /* write_manifest */, + snapshot_checker, &job_context, FlushReason::kTest, nullptr, nullptr, + nullptr, kNoCompression, db_options_.statistics.get(), &event_logger, + true, true /* sync_output_directory */, true /* write_manifest */, Env::Priority::USER, nullptr /*IOTracer*/, empty_seqno_to_time_mapping_); HistogramData hist; @@ -318,9 +318,9 @@ TEST_F(FlushJobTest, FlushMemTablesSingleColumnFamily) { dbname_, versions_->GetColumnFamilySet()->GetDefault(), db_options_, *cfd->GetLatestMutableCFOptions(), flush_memtable_id, env_options_, versions_.get(), &mutex_, &shutting_down_, {}, kMaxSequenceNumber, - snapshot_checker, &job_context, nullptr, nullptr, nullptr, kNoCompression, - db_options_.statistics.get(), &event_logger, true, - true /* sync_output_directory */, true /* write_manifest */, + snapshot_checker, &job_context, FlushReason::kTest, nullptr, nullptr, + nullptr, kNoCompression, db_options_.statistics.get(), &event_logger, + true, true /* sync_output_directory */, true /* write_manifest */, Env::Priority::USER, nullptr /*IOTracer*/, empty_seqno_to_time_mapping_); HistogramData hist; FileMetaData file_meta; @@ -391,8 +391,8 @@ TEST_F(FlushJobTest, FlushMemtablesMultipleColumnFamilies) { dbname_, cfd, db_options_, *cfd->GetLatestMutableCFOptions(), memtable_ids[k], env_options_, versions_.get(), &mutex_, &shutting_down_, snapshot_seqs, kMaxSequenceNumber, snapshot_checker, - &job_context, nullptr, nullptr, nullptr, kNoCompression, - db_options_.statistics.get(), &event_logger, true, + &job_context, FlushReason::kTest, nullptr, nullptr, nullptr, + kNoCompression, db_options_.statistics.get(), &event_logger, true, false /* sync_output_directory */, false /* write_manifest */, Env::Priority::USER, nullptr /*IOTracer*/, empty_seqno_to_time_mapping_)); @@ -520,9 +520,9 @@ TEST_F(FlushJobTest, Snapshots) { *cfd->GetLatestMutableCFOptions(), std::numeric_limits::max() /* memtable_id */, env_options_, versions_.get(), &mutex_, &shutting_down_, snapshots, kMaxSequenceNumber, - snapshot_checker, &job_context, nullptr, nullptr, nullptr, kNoCompression, - db_options_.statistics.get(), &event_logger, true, - true /* sync_output_directory */, true /* write_manifest */, + snapshot_checker, &job_context, FlushReason::kTest, nullptr, nullptr, + nullptr, kNoCompression, db_options_.statistics.get(), &event_logger, + true, true /* sync_output_directory */, true /* write_manifest */, Env::Priority::USER, nullptr /*IOTracer*/, empty_seqno_to_time_mapping_); mutex_.Lock(); flush_job.PickMemTable(); @@ -576,9 +576,9 @@ TEST_F(FlushJobTest, GetRateLimiterPriorityForWrite) { dbname_, versions_->GetColumnFamilySet()->GetDefault(), db_options_, *cfd->GetLatestMutableCFOptions(), flush_memtable_id, env_options_, versions_.get(), &mutex_, &shutting_down_, {}, kMaxSequenceNumber, - snapshot_checker, &job_context, nullptr, nullptr, nullptr, kNoCompression, - db_options_.statistics.get(), &event_logger, true, - true /* sync_output_directory */, true /* write_manifest */, + snapshot_checker, &job_context, FlushReason::kTest, nullptr, nullptr, + nullptr, kNoCompression, db_options_.statistics.get(), &event_logger, + true, true /* sync_output_directory */, true /* write_manifest */, Env::Priority::USER, nullptr /*IOTracer*/, empty_seqno_to_time_mapping_); // When the state from WriteController is normal. @@ -656,9 +656,9 @@ TEST_F(FlushJobTimestampTest, AllKeysExpired) { dbname_, cfd, db_options_, *cfd->GetLatestMutableCFOptions(), std::numeric_limits::max() /* memtable_id */, env_options_, versions_.get(), &mutex_, &shutting_down_, snapshots, kMaxSequenceNumber, - snapshot_checker, &job_context, nullptr, nullptr, nullptr, kNoCompression, - db_options_.statistics.get(), &event_logger, true, - true /* sync_output_directory */, true /* write_manifest */, + snapshot_checker, &job_context, FlushReason::kTest, nullptr, nullptr, + nullptr, kNoCompression, db_options_.statistics.get(), &event_logger, + true, true /* sync_output_directory */, true /* write_manifest */, Env::Priority::USER, nullptr /*IOTracer*/, empty_seqno_to_time_mapping_, /*db_id=*/"", /*db_session_id=*/"", full_history_ts_low); @@ -709,9 +709,9 @@ TEST_F(FlushJobTimestampTest, NoKeyExpired) { dbname_, cfd, db_options_, *cfd->GetLatestMutableCFOptions(), std::numeric_limits::max() /* memtable_id */, env_options_, versions_.get(), &mutex_, &shutting_down_, snapshots, kMaxSequenceNumber, - snapshot_checker, &job_context, nullptr, nullptr, nullptr, kNoCompression, - db_options_.statistics.get(), &event_logger, true, - true /* sync_output_directory */, true /* write_manifest */, + snapshot_checker, &job_context, FlushReason::kTest, nullptr, nullptr, + nullptr, kNoCompression, db_options_.statistics.get(), &event_logger, + true, true /* sync_output_directory */, true /* write_manifest */, Env::Priority::USER, nullptr /*IOTracer*/, empty_seqno_to_time_mapping_, /*db_id=*/"", /*db_session_id=*/"", full_history_ts_low);