diff --git a/db/db_compaction_test.cc b/db/db_compaction_test.cc index c3451f78cf..05cc0e3c8b 100644 --- a/db/db_compaction_test.cc +++ b/db/db_compaction_test.cc @@ -6082,6 +6082,14 @@ TEST_F(DBCompactionTest, CompactionLimiter) { } }); + std::vector pending_compaction_cfs; + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "SchedulePendingCompaction::cfd", [&](void* arg) { + const std::string& cf_name = + static_cast(arg)->GetName(); + pending_compaction_cfs.emplace_back(cf_name); + }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); // Block all compact threads in thread pool. @@ -6136,7 +6144,29 @@ TEST_F(DBCompactionTest, CompactionLimiter) { } // All CFs are pending compaction - ASSERT_EQ(cf_count, env_->GetThreadPoolQueueLen(Env::LOW)); + unsigned int tp_len = env_->GetThreadPoolQueueLen(Env::LOW); + if (cf_count != tp_len) { + // The test is flaky and fails the assertion below. + // Print some debug information. + uint64_t num_running_flushes = 0; + if (db_->GetIntProperty(DB::Properties::kNumRunningFlushes, + &num_running_flushes)) { + fprintf(stdout, "Running flushes: %" PRIu64 "\n", num_running_flushes); + } + fprintf(stdout, + "%zu CF in compaction queue: ", pending_compaction_cfs.size()); + for (const auto& cf_name : pending_compaction_cfs) { + fprintf(stdout, "%s, ", cf_name.c_str()); + } + fprintf(stdout, "\n"); + + // print lsm + for (unsigned int cf = 0; cf < cf_count; cf++) { + fprintf(stdout, "%s: %s\n", cf_names[cf], FilesPerLevel(cf).c_str()); + } + } + + ASSERT_EQ(cf_count, tp_len); // Unblock all compaction threads for (size_t i = 0; i < kTotalCompactTasks; i++) { diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index ad88c8d09a..d7dcc06c98 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -3042,6 +3042,8 @@ void DBImpl::SchedulePendingCompaction(ColumnFamilyData* cfd) { return; } if (!cfd->queued_for_compaction() && cfd->NeedsCompaction()) { + TEST_SYNC_POINT_CALLBACK("SchedulePendingCompaction::cfd", + static_cast(cfd)); AddToCompactionQueue(cfd); ++unscheduled_compactions_; }