From c76778e2bd920265a9a025b60978992049621756 Mon Sep 17 00:00:00 2001 From: Akanksha Mahajan Date: Thu, 1 Jul 2021 19:17:21 -0700 Subject: [PATCH] Call OnCompactionCompleted API in case of DisableManualCompaction (#8469) Summary: Call OnCompactionCompleted API in case of DisableManualCompaction() with updated Status::Incomplete Pull Request resolved: https://github.com/facebook/rocksdb/pull/8469 Reviewed By: ajkr Differential Revision: D29475517 Pulled By: akankshamahajan15 fbshipit-source-id: a1726c5e6ee18c0b5097ea04f5e6975fbe108055 --- HISTORY.md | 1 + db/compaction/compaction.cc | 3 +- db/compaction/compaction.h | 14 ++ db/db_impl/db_impl_compaction_flush.cc | 8 +- db/db_test2.cc | 170 +++++++++++++++++++++++++ 5 files changed, 191 insertions(+), 5 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index 7893629dc6..d335be8cb3 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -6,6 +6,7 @@ ### Bug Fixes * Blob file checksums are now printed in hexadecimal format when using the `manifest_dump` `ldb` command. * `GetLiveFilesMetaData()` now populates the `temperature`, `oldest_ancester_time`, and `file_creation_time` fields of its `LiveFileMetaData` results when the information is available. Previously these fields always contained zero indicating unknown. +* Fix mismatches of OnCompaction{Begin,Completed} in case of DisableManualCompaction(). ### New Features * ldb has a new feature, `list_live_files_metadata`, that shows the live SST files, as well as their LSM storage level and the column family they belong to. diff --git a/db/compaction/compaction.cc b/db/compaction/compaction.cc index 10de6a6ea6..f2de4f0e86 100644 --- a/db/compaction/compaction.cc +++ b/db/compaction/compaction.cc @@ -237,7 +237,8 @@ Compaction::Compaction( is_full_compaction_(IsFullCompaction(vstorage, inputs_)), is_manual_compaction_(_manual_compaction), is_trivial_move_(false), - compaction_reason_(_compaction_reason) { + compaction_reason_(_compaction_reason), + notify_on_compaction_completion_(false) { MarkFilesBeingCompacted(true); if (is_manual_compaction_) { compaction_reason_ = CompactionReason::kManualCompaction; diff --git a/db/compaction/compaction.h b/db/compaction/compaction.h index 070657d5cc..7854c1c7a9 100644 --- a/db/compaction/compaction.h +++ b/db/compaction/compaction.h @@ -303,6 +303,16 @@ class Compaction { uint64_t MinInputFileOldestAncesterTime() const; + // Called by DBImpl::NotifyOnCompactionCompleted to make sure number of + // compaction begin and compaction completion callbacks match. + void SetNotifyOnCompactionCompleted() { + notify_on_compaction_completion_ = true; + } + + bool ShouldNotifyOnCompactionCompleted() const { + return notify_on_compaction_completion_; + } + private: // mark (or clear) all files that are being compacted void MarkFilesBeingCompacted(bool mark_as_compacted); @@ -386,6 +396,10 @@ class Compaction { // Reason for compaction CompactionReason compaction_reason_; + + // Notify on compaction completion only if listener was notified on compaction + // begin. + bool notify_on_compaction_completion_; }; // Return sum of sizes of all files in `files`. diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index d583bebc15..9c85aa7733 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -1395,6 +1395,8 @@ void DBImpl::NotifyOnCompactionBegin(ColumnFamilyData* cfd, Compaction* c, manual_compaction_paused_.load(std::memory_order_acquire) > 0) { return; } + + c->SetNotifyOnCompactionCompleted(); Version* current = cfd->current(); current->Ref(); // release lock while notifying events @@ -1430,10 +1432,8 @@ void DBImpl::NotifyOnCompactionCompleted( if (shutting_down_.load(std::memory_order_acquire)) { return; } - // TODO: Should disabling manual compaction squash compaction completed - // notifications that aren't the result of a shutdown? - if (c->is_manual_compaction() && - manual_compaction_paused_.load(std::memory_order_acquire) > 0) { + + if (c->ShouldNotifyOnCompactionCompleted() == false) { return; } diff --git a/db/db_test2.cc b/db/db_test2.cc index f209cdd038..31854d4cd5 100644 --- a/db/db_test2.cc +++ b/db/db_test2.cc @@ -3376,6 +3376,176 @@ TEST_F(DBTest2, CancelManualCompaction2) { ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); } +class CancelCompactionListener : public EventListener { + public: + CancelCompactionListener() + : num_compaction_started_(0), num_compaction_ended_(0) {} + + void OnCompactionBegin(DB* /*db*/, const CompactionJobInfo& ci) override { + ASSERT_EQ(ci.cf_name, "default"); + ASSERT_EQ(ci.base_input_level, 0); + num_compaction_started_++; + } + + void OnCompactionCompleted(DB* /*db*/, const CompactionJobInfo& ci) override { + ASSERT_EQ(ci.cf_name, "default"); + ASSERT_EQ(ci.base_input_level, 0); + ASSERT_EQ(ci.status.code(), code_); + ASSERT_EQ(ci.status.subcode(), subcode_); + num_compaction_ended_++; + } + + std::atomic num_compaction_started_; + std::atomic num_compaction_ended_; + Status::Code code_; + Status::SubCode subcode_; +}; + +TEST_F(DBTest2, CancelManualCompactionWithListener) { + CompactRangeOptions compact_options; + auto canceledPtr = + std::unique_ptr>(new std::atomic{true}); + compact_options.canceled = canceledPtr.get(); + compact_options.max_subcompactions = 1; + + Options options = CurrentOptions(); + options.disable_auto_compactions = true; + CancelCompactionListener* listener = new CancelCompactionListener(); + options.listeners.emplace_back(listener); + + DestroyAndReopen(options); + + Random rnd(301); + for (int i = 0; i < 10; i++) { + for (int j = 0; j < 10; j++) { + ASSERT_OK(Put(Key(i + j * 10), rnd.RandomString(50))); + } + Flush(); + } + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "CompactionIterator:ProcessKV", [&](void* /*arg*/) { + compact_options.canceled->store(true, std::memory_order_release); + }); + + int running_compaction = 0; + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "CompactionJob::FinishCompactionOutputFile1", + [&](void* /*arg*/) { running_compaction++; }); + + // Case I: 1 Notify begin compaction, 2 DisableManualCompaction, 3 Compaction + // not run, 4 Notify compaction end. + listener->code_ = Status::kIncomplete; + listener->subcode_ = Status::SubCode::kManualCompactionPaused; + + compact_options.canceled->store(false, std::memory_order_release); + dbfull()->CompactRange(compact_options, nullptr, nullptr); + dbfull()->TEST_WaitForCompact(true); + + ASSERT_GT(listener->num_compaction_started_, 0); + ASSERT_EQ(listener->num_compaction_started_, listener->num_compaction_ended_); + ASSERT_EQ(running_compaction, 0); + + listener->num_compaction_started_ = 0; + listener->num_compaction_ended_ = 0; + + // Case II: 1 DisableManualCompaction, 2 Notify begin compaction (return + // without notifying), 3 Notify compaction end (return without notifying). + dbfull()->CompactRange(compact_options, nullptr, nullptr); + dbfull()->TEST_WaitForCompact(true); + + ASSERT_EQ(listener->num_compaction_started_, 0); + ASSERT_EQ(listener->num_compaction_started_, listener->num_compaction_ended_); + ASSERT_EQ(running_compaction, 0); + + // Case III: 1 Notify begin compaction, 2 Compaction in between + // 3. DisableManualCompaction, , 4 Notify compaction end. + // compact_options.canceled->store(false, std::memory_order_release); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearCallBack( + "CompactionIterator:ProcessKV"); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "CompactionJob::Run:BeforeVerify", [&](void* /*arg*/) { + compact_options.canceled->store(true, std::memory_order_release); + }); + + listener->code_ = Status::kOk; + listener->subcode_ = Status::SubCode::kNone; + + compact_options.canceled->store(false, std::memory_order_release); + dbfull()->CompactRange(compact_options, nullptr, nullptr); + dbfull()->TEST_WaitForCompact(true); + + ASSERT_GT(listener->num_compaction_started_, 0); + ASSERT_EQ(listener->num_compaction_started_, listener->num_compaction_ended_); + + // Compaction job will succeed. + ASSERT_GT(running_compaction, 0); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks(); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); +} + +TEST_F(DBTest2, CompactionOnBottomPriorityWithListener) { + int num_levels = 3; + const int kNumFilesTrigger = 4; + + Options options = CurrentOptions(); + env_->SetBackgroundThreads(0, Env::Priority::HIGH); + env_->SetBackgroundThreads(0, Env::Priority::LOW); + env_->SetBackgroundThreads(1, Env::Priority::BOTTOM); + options.env = env_; + options.compaction_style = kCompactionStyleUniversal; + options.num_levels = num_levels; + options.write_buffer_size = 100 << 10; // 100KB + options.target_file_size_base = 32 << 10; // 32KB + options.level0_file_num_compaction_trigger = kNumFilesTrigger; + // Trigger compaction if size amplification exceeds 110% + options.compaction_options_universal.max_size_amplification_percent = 110; + + CancelCompactionListener* listener = new CancelCompactionListener(); + options.listeners.emplace_back(listener); + + DestroyAndReopen(options); + + int num_bottom_thread_compaction_scheduled = 0; + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::BackgroundCompaction:ForwardToBottomPriPool", + [&](void* /*arg*/) { num_bottom_thread_compaction_scheduled++; }); + + int num_compaction_jobs = 0; + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "CompactionJob::Run():End", + [&](void* /*arg*/) { num_compaction_jobs++; }); + + listener->code_ = Status::kOk; + listener->subcode_ = Status::SubCode::kNone; + + Random rnd(301); + for (int i = 0; i < 1; ++i) { + for (int num = 0; num < kNumFilesTrigger; num++) { + int key_idx = 0; + GenerateNewFile(&rnd, &key_idx, true /* no_wait */); + // use no_wait above because that one waits for flush and compaction. We + // don't want to wait for compaction because the full compaction is + // intentionally blocked while more files are flushed. + ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable()); + } + } + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + ASSERT_GT(num_bottom_thread_compaction_scheduled, 0); + ASSERT_EQ(num_compaction_jobs, 1); + ASSERT_GT(listener->num_compaction_started_, 0); + ASSERT_EQ(listener->num_compaction_started_, listener->num_compaction_ended_); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks(); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); +} + TEST_F(DBTest2, OptimizeForPointLookup) { Options options = CurrentOptions(); Close();