From 80a59a03a7d63a913b7a5558cf6d342533517a33 Mon Sep 17 00:00:00 2001 From: David Devecsery Date: Mon, 7 Jun 2021 11:40:31 -0700 Subject: [PATCH] Cancel compact range (#8351) Summary: Added the ability to cancel an in-progress range compaction by storing to an atomic "canceled" variable pointed to within the CompactRangeOptions structure. Tested via two tests added to db_tests2.cc. Pull Request resolved: https://github.com/facebook/rocksdb/pull/8351 Reviewed By: ajkr Differential Revision: D28808894 Pulled By: ddevec fbshipit-source-id: cb321361c9e23b084b188bb203f11c375a22c2dd --- HISTORY.md | 1 + db/builder.cc | 3 +- db/compaction/compaction_iterator.cc | 6 +- db/compaction/compaction_iterator.h | 71 ++++----- db/compaction/compaction_iterator_test.cc | 3 +- db/compaction/compaction_job.cc | 20 ++- db/compaction/compaction_job.h | 2 + db/compaction/compaction_job_test.cc | 3 +- db/db_impl/db_impl.h | 19 +-- db/db_impl/db_impl_compaction_flush.cc | 16 +- db/db_test2.cc | 175 ++++++++++++++++++++++ include/rocksdb/options.h | 3 + 12 files changed, 265 insertions(+), 57 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index e96b6264f3..604b096735 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -26,6 +26,7 @@ * Add an experimental Remote Compaction feature, which allows the user to run Compaction on a different host or process. The feature is still under development, currently only works on some basic use cases. The interface will be changed without backward/forward compatibility support. * RocksDB would validate total entries read in flush, and compare with counter inserted into it. If flush_verify_memtable_count = true (default), flush will fail. Otherwise, only log to info logs. * Add `TableProperties::num_filter_entries`, which can be used with `TableProperties::filter_size` to calculate the effective bits per filter entry (unique user key or prefix) for a table file. +* Added a `cancel` field to `CompactRangeOptions`, allowing individual in-process manual range compactions to be cancelled. ### Performance Improvements * BlockPrefetcher is used by iterators to prefetch data if they anticipate more data to be used in future. It is enabled implicitly by rocksdb. Added change to take in account read pattern if reads are sequential. This would disable prefetching for random reads in MultiGet and iterators as readahead_size is increased exponential doing large prefetches. diff --git a/db/builder.cc b/db/builder.cc index 6314ea5895..b160ccea7f 100644 --- a/db/builder.cc +++ b/db/builder.cc @@ -191,7 +191,8 @@ Status BuildTable( /*compaction=*/nullptr, compaction_filter.get(), /*shutting_down=*/nullptr, /*preserve_deletes_seqnum=*/0, /*manual_compaction_paused=*/nullptr, - db_options.info_log, full_history_ts_low); + /*manual_compaction_canceled=*/nullptr, db_options.info_log, + full_history_ts_low); c_iter.SeekToFirst(); for (; c_iter.Valid(); c_iter.Next()) { diff --git a/db/compaction/compaction_iterator.cc b/db/compaction/compaction_iterator.cc index 11db69fb4c..e48818fd08 100644 --- a/db/compaction/compaction_iterator.cc +++ b/db/compaction/compaction_iterator.cc @@ -45,6 +45,7 @@ CompactionIterator::CompactionIterator( const std::atomic* shutting_down, const SequenceNumber preserve_deletes_seqnum, const std::atomic* manual_compaction_paused, + const std::atomic* manual_compaction_canceled, const std::shared_ptr info_log, const std::string* full_history_ts_low) : CompactionIterator( @@ -55,7 +56,8 @@ CompactionIterator::CompactionIterator( std::unique_ptr( compaction ? new RealCompaction(compaction) : nullptr), compaction_filter, shutting_down, preserve_deletes_seqnum, - manual_compaction_paused, info_log, full_history_ts_low) {} + manual_compaction_paused, manual_compaction_canceled, info_log, + full_history_ts_low) {} CompactionIterator::CompactionIterator( InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper, @@ -70,6 +72,7 @@ CompactionIterator::CompactionIterator( const std::atomic* shutting_down, const SequenceNumber preserve_deletes_seqnum, const std::atomic* manual_compaction_paused, + const std::atomic* manual_compaction_canceled, const std::shared_ptr info_log, const std::string* full_history_ts_low) : input_( @@ -91,6 +94,7 @@ CompactionIterator::CompactionIterator( compaction_filter_(compaction_filter), shutting_down_(shutting_down), manual_compaction_paused_(manual_compaction_paused), + manual_compaction_canceled_(manual_compaction_canceled), preserve_deletes_seqnum_(preserve_deletes_seqnum), info_log_(info_log), allow_data_in_errors_(allow_data_in_errors), diff --git a/db/compaction/compaction_iterator.h b/db/compaction/compaction_iterator.h index 6164342530..7c459a7671 100644 --- a/db/compaction/compaction_iterator.h +++ b/db/compaction/compaction_iterator.h @@ -150,40 +150,40 @@ class CompactionIterator { const Compaction* compaction_; }; - CompactionIterator(InternalIterator* input, const Comparator* cmp, - MergeHelper* merge_helper, SequenceNumber last_sequence, - std::vector* snapshots, - SequenceNumber earliest_write_conflict_snapshot, - const SnapshotChecker* snapshot_checker, Env* env, - bool report_detailed_time, bool expect_valid_internal_key, - CompactionRangeDelAggregator* range_del_agg, - BlobFileBuilder* blob_file_builder, - bool allow_data_in_errors, - const Compaction* compaction = nullptr, - const CompactionFilter* compaction_filter = nullptr, - const std::atomic* shutting_down = nullptr, - const SequenceNumber preserve_deletes_seqnum = 0, - const std::atomic* manual_compaction_paused = nullptr, - const std::shared_ptr info_log = nullptr, - const std::string* full_history_ts_low = nullptr); + CompactionIterator( + InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper, + SequenceNumber last_sequence, std::vector* snapshots, + SequenceNumber earliest_write_conflict_snapshot, + const SnapshotChecker* snapshot_checker, Env* env, + bool report_detailed_time, bool expect_valid_internal_key, + CompactionRangeDelAggregator* range_del_agg, + BlobFileBuilder* blob_file_builder, bool allow_data_in_errors, + const Compaction* compaction = nullptr, + const CompactionFilter* compaction_filter = nullptr, + const std::atomic* shutting_down = nullptr, + const SequenceNumber preserve_deletes_seqnum = 0, + const std::atomic* manual_compaction_paused = nullptr, + const std::atomic* manual_compaction_canceled = nullptr, + const std::shared_ptr info_log = nullptr, + const std::string* full_history_ts_low = nullptr); // Constructor with custom CompactionProxy, used for tests. - CompactionIterator(InternalIterator* input, const Comparator* cmp, - MergeHelper* merge_helper, SequenceNumber last_sequence, - std::vector* snapshots, - SequenceNumber earliest_write_conflict_snapshot, - const SnapshotChecker* snapshot_checker, Env* env, - bool report_detailed_time, bool expect_valid_internal_key, - CompactionRangeDelAggregator* range_del_agg, - BlobFileBuilder* blob_file_builder, - bool allow_data_in_errors, - std::unique_ptr compaction, - const CompactionFilter* compaction_filter = nullptr, - const std::atomic* shutting_down = nullptr, - const SequenceNumber preserve_deletes_seqnum = 0, - const std::atomic* manual_compaction_paused = nullptr, - const std::shared_ptr info_log = nullptr, - const std::string* full_history_ts_low = nullptr); + CompactionIterator( + InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper, + SequenceNumber last_sequence, std::vector* snapshots, + SequenceNumber earliest_write_conflict_snapshot, + const SnapshotChecker* snapshot_checker, Env* env, + bool report_detailed_time, bool expect_valid_internal_key, + CompactionRangeDelAggregator* range_del_agg, + BlobFileBuilder* blob_file_builder, bool allow_data_in_errors, + std::unique_ptr compaction, + const CompactionFilter* compaction_filter = nullptr, + const std::atomic* shutting_down = nullptr, + const SequenceNumber preserve_deletes_seqnum = 0, + const std::atomic* manual_compaction_paused = nullptr, + const std::atomic* manual_compaction_canceled = nullptr, + const std::shared_ptr info_log = nullptr, + const std::string* full_history_ts_low = nullptr); ~CompactionIterator(); @@ -303,6 +303,7 @@ class CompactionIterator { const CompactionFilter* compaction_filter_; const std::atomic* shutting_down_; const std::atomic* manual_compaction_paused_; + const std::atomic* manual_compaction_canceled_; const SequenceNumber preserve_deletes_seqnum_; bool bottommost_level_; bool valid_ = false; @@ -399,8 +400,10 @@ class CompactionIterator { bool IsPausingManualCompaction() { // This is a best-effort facility, so memory_order_relaxed is sufficient. - return manual_compaction_paused_ && - manual_compaction_paused_->load(std::memory_order_relaxed) > 0; + return (manual_compaction_paused_ && + manual_compaction_paused_->load(std::memory_order_relaxed) > 0) || + (manual_compaction_canceled_ && + manual_compaction_canceled_->load(std::memory_order_relaxed)); } }; } // namespace ROCKSDB_NAMESPACE diff --git a/db/compaction/compaction_iterator_test.cc b/db/compaction/compaction_iterator_test.cc index e14e5ec12d..fef7b54178 100644 --- a/db/compaction/compaction_iterator_test.cc +++ b/db/compaction/compaction_iterator_test.cc @@ -282,7 +282,8 @@ class CompactionIteratorTest : public testing::TestWithParam { range_del_agg_.get(), nullptr /* blob_file_builder */, true /*allow_data_in_errors*/, std::move(compaction), filter, &shutting_down_, /*preserve_deletes_seqnum=*/0, - /*manual_compaction_paused=*/nullptr, /*info_log=*/nullptr, + /*manual_compaction_paused=*/nullptr, + /*manual_compaction_canceled=*/nullptr, /*info_log=*/nullptr, full_history_ts_low)); } diff --git a/db/compaction/compaction_job.cc b/db/compaction/compaction_job.cc index c1819af10c..1fb2c63e2c 100644 --- a/db/compaction/compaction_job.cc +++ b/db/compaction/compaction_job.cc @@ -313,9 +313,10 @@ CompactionJob::CompactionJob( EventLogger* event_logger, bool paranoid_file_checks, bool measure_io_stats, const std::string& dbname, CompactionJobStats* compaction_job_stats, Env::Priority thread_pri, const std::shared_ptr& io_tracer, - const std::atomic* manual_compaction_paused, const std::string& db_id, - const std::string& db_session_id, std::string full_history_ts_low, - BlobFileCompletionCallback* blob_callback) + const std::atomic* manual_compaction_paused, + const std::atomic* manual_compaction_canceled, + const std::string& db_id, const std::string& db_session_id, + std::string full_history_ts_low, BlobFileCompletionCallback* blob_callback) : compact_(new CompactionState(compaction)), compaction_stats_(compaction->compaction_reason(), 1), db_options_(db_options), @@ -339,6 +340,7 @@ CompactionJob::CompactionJob( versions_(versions), shutting_down_(shutting_down), manual_compaction_paused_(manual_compaction_paused), + manual_compaction_canceled_(manual_compaction_canceled), preserve_deletes_seqnum_(preserve_deletes_seqnum), db_directory_(db_directory), blob_output_directory_(blob_output_directory), @@ -1172,8 +1174,8 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { /*expect_valid_internal_key=*/true, &range_del_agg, blob_file_builder.get(), db_options_.allow_data_in_errors, sub_compact->compaction, compaction_filter, shutting_down_, - preserve_deletes_seqnum_, manual_compaction_paused_, db_options_.info_log, - full_history_ts_low)); + preserve_deletes_seqnum_, manual_compaction_paused_, + manual_compaction_canceled_, db_options_.info_log, full_history_ts_low)); auto c_iter = sub_compact->c_iter.get(); c_iter->SeekToFirst(); if (c_iter->Valid() && sub_compact->compaction->output_level() != 0) { @@ -1317,8 +1319,10 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { status = Status::ShutdownInProgress("Database shutdown"); } if ((status.ok() || status.IsColumnFamilyDropped()) && - (manual_compaction_paused_ && - manual_compaction_paused_->load(std::memory_order_relaxed) > 0)) { + ((manual_compaction_paused_ && + manual_compaction_paused_->load(std::memory_order_relaxed) > 0) || + (manual_compaction_canceled_ && + manual_compaction_canceled_->load(std::memory_order_relaxed)))) { status = Status::Incomplete(Status::SubCode::kManualCompactionPaused); } if (status.ok()) { @@ -2126,7 +2130,7 @@ CompactionServiceCompactionJob::CompactionServiceCompactionJob( compaction->mutable_cf_options()->paranoid_file_checks, compaction->mutable_cf_options()->report_bg_io_stats, dbname, &(compaction_service_result->stats), Env::Priority::USER, io_tracer, - nullptr, db_id, db_session_id, + nullptr, nullptr, db_id, db_session_id, compaction->column_family_data()->GetFullHistoryTsLow()), output_path_(output_path), compaction_input_(compaction_service_input), diff --git a/db/compaction/compaction_job.h b/db/compaction/compaction_job.h index 197f7e93b7..0f71fd57b8 100644 --- a/db/compaction/compaction_job.h +++ b/db/compaction/compaction_job.h @@ -81,6 +81,7 @@ class CompactionJob { const std::string& dbname, CompactionJobStats* compaction_job_stats, Env::Priority thread_pri, const std::shared_ptr& io_tracer, const std::atomic* manual_compaction_paused = nullptr, + const std::atomic* manual_compaction_canceled = nullptr, const std::string& db_id = "", const std::string& db_session_id = "", std::string full_history_ts_low = "", BlobFileCompletionCallback* blob_callback = nullptr); @@ -185,6 +186,7 @@ class CompactionJob { VersionSet* versions_; const std::atomic* shutting_down_; const std::atomic* manual_compaction_paused_; + const std::atomic* manual_compaction_canceled_; const SequenceNumber preserve_deletes_seqnum_; FSDirectory* db_directory_; FSDirectory* blob_output_directory_; diff --git a/db/compaction/compaction_job_test.cc b/db/compaction/compaction_job_test.cc index 062aa7d156..e7f985b380 100644 --- a/db/compaction/compaction_job_test.cc +++ b/db/compaction/compaction_job_test.cc @@ -350,7 +350,8 @@ class CompactionJobTestBase : public testing::Test { earliest_write_conflict_snapshot, snapshot_checker, table_cache_, &event_logger, false, false, dbname_, &compaction_job_stats_, Env::Priority::USER, nullptr /* IOTracer */, - /*manual_compaction_paused=*/nullptr, /*db_id=*/"", + /*manual_compaction_paused=*/nullptr, + /*manual_compaction_canceled=*/nullptr, /*db_id=*/"", /*db_session_id=*/"", full_history_ts_low_); VerifyInitializationOfCompactionJobStats(compaction_job_stats_); diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 00210d6bbc..d787f66f4c 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -1454,15 +1454,16 @@ class DBImpl : public DB { uint32_t output_path_id; Status status; bool done; - bool in_progress; // compaction request being processed? - bool incomplete; // only part of requested range compacted - bool exclusive; // current behavior of only one manual - bool disallow_trivial_move; // Force actual compaction to run - const InternalKey* begin; // nullptr means beginning of key range - const InternalKey* end; // nullptr means end of key range - InternalKey* manual_end; // how far we are compacting - InternalKey tmp_storage; // Used to keep track of compaction progress - InternalKey tmp_storage1; // Used to keep track of compaction progress + bool in_progress; // compaction request being processed? + bool incomplete; // only part of requested range compacted + bool exclusive; // current behavior of only one manual + bool disallow_trivial_move; // Force actual compaction to run + const InternalKey* begin; // nullptr means beginning of key range + const InternalKey* end; // nullptr means end of key range + InternalKey* manual_end; // how far we are compacting + InternalKey tmp_storage; // Used to keep track of compaction progress + InternalKey tmp_storage1; // Used to keep track of compaction progress + std::atomic* canceled; // Compaction canceled by the user? }; struct PrepickedCompaction { // background compaction takes ownership of `compaction`. diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index 6ec2f1c63a..ec876d91ec 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -807,6 +807,10 @@ Status DBImpl::CompactRange(const CompactRangeOptions& options, return Status::Incomplete(Status::SubCode::kManualCompactionPaused); } + if (options.canceled && options.canceled->load(std::memory_order_acquire)) { + return Status::Incomplete(Status::SubCode::kManualCompactionPaused); + } + const Comparator* const ucmp = column_family->GetComparator(); assert(ucmp); size_t ts_sz = ucmp->timestamp_size(); @@ -1253,7 +1257,7 @@ Status DBImpl::CompactFilesImpl( c->mutable_cf_options()->paranoid_file_checks, c->mutable_cf_options()->report_bg_io_stats, dbname_, &compaction_job_stats, Env::Priority::USER, io_tracer_, - &manual_compaction_paused_, db_id_, db_session_id_, + &manual_compaction_paused_, nullptr, db_id_, db_session_id_, c->column_family_data()->GetFullHistoryTsLow()); // Creating a compaction influences the compaction score because the score @@ -1426,10 +1430,13 @@ void DBImpl::NotifyOnCompactionCompleted( if (shutting_down_.load(std::memory_order_acquire)) { return; } + // TODO: Should disabling manual compaction squash compaction completed + // notifications that aren't the result of a shutdown? if (c->is_manual_compaction() && manual_compaction_paused_.load(std::memory_order_acquire) > 0) { return; } + Version* current = cfd->current(); current->Ref(); // release lock while notifying events @@ -1654,6 +1661,7 @@ Status DBImpl::RunManualCompaction( manual.incomplete = false; manual.exclusive = exclusive; manual.disallow_trivial_move = disallow_trivial_move; + manual.canceled = compact_range_options.canceled; // For universal compaction, we enforce every manual compaction to compact // all files. if (begin == nullptr || @@ -2819,6 +2827,9 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, } else if (is_manual && manual_compaction_paused_.load(std::memory_order_acquire) > 0) { status = Status::Incomplete(Status::SubCode::kManualCompactionPaused); + } else if (is_manual && manual_compaction->canceled && + manual_compaction->canceled->load(std::memory_order_acquire)) { + status = Status::Incomplete(Status::SubCode::kManualCompactionPaused); } } else { status = error_handler_.GetBGError(); @@ -3140,7 +3151,8 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, c->mutable_cf_options()->paranoid_file_checks, c->mutable_cf_options()->report_bg_io_stats, dbname_, &compaction_job_stats, thread_pri, io_tracer_, - is_manual ? &manual_compaction_paused_ : nullptr, db_id_, + is_manual ? &manual_compaction_paused_ : nullptr, + is_manual ? manual_compaction->canceled : nullptr, db_id_, db_session_id_, c->column_family_data()->GetFullHistoryTsLow()); compaction_job.Prepare(); diff --git a/db/db_test2.cc b/db/db_test2.cc index 5f87955e03..f209cdd038 100644 --- a/db/db_test2.cc +++ b/db/db_test2.cc @@ -9,6 +9,7 @@ #include #include #include +#include #include "db/db_test_util.h" #include "db/read_callback.h" @@ -3201,6 +3202,180 @@ TEST_F(DBTest2, PausingManualCompaction4) { ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); } +TEST_F(DBTest2, CancelManualCompaction1) { + CompactRangeOptions compact_options; + auto canceledPtr = + std::unique_ptr>(new std::atomic{true}); + compact_options.canceled = canceledPtr.get(); + + Options options = CurrentOptions(); + options.disable_auto_compactions = true; + options.num_levels = 7; + + Random rnd(301); + auto generate_files = [&]() { + for (int i = 0; i < options.num_levels; i++) { + for (int j = 0; j < options.num_levels - i + 1; j++) { + for (int k = 0; k < 1000; k++) { + ASSERT_OK(Put(Key(k + j * 1000), rnd.RandomString(50))); + } + Flush(); + } + + for (int l = 1; l < options.num_levels - i; l++) { + MoveFilesToLevel(l); + } + } + }; + + DestroyAndReopen(options); + generate_files(); +#ifndef ROCKSDB_LITE + ASSERT_EQ("2,3,4,5,6,7,8", FilesPerLevel()); +#endif // !ROCKSDB_LITE + + int run_manual_compactions = 0; + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "CompactionJob::Run():PausingManualCompaction:1", + [&](void* /*arg*/) { run_manual_compactions++; }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + + // Setup a callback to disable compactions after a couple of levels are + // compacted + int compactions_run = 0; + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::RunManualCompaction()::1", + [&](void* /*arg*/) { ++compactions_run; }); + + dbfull()->CompactRange(compact_options, nullptr, nullptr); + dbfull()->TEST_WaitForCompact(true); + + // Since compactions are disabled, we shouldn't start compacting. + // E.g. we should call the compaction function exactly one time. + ASSERT_EQ(compactions_run, 0); + ASSERT_EQ(run_manual_compactions, 0); +#ifndef ROCKSDB_LITE + ASSERT_EQ("2,3,4,5,6,7,8", FilesPerLevel()); +#endif // !ROCKSDB_LITE + + compactions_run = 0; + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearCallBack( + "DBImpl::RunManualCompaction()::1"); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::RunManualCompaction()::1", [&](void* /*arg*/) { + ++compactions_run; + // After 3 compactions disable + if (compactions_run == 3) { + compact_options.canceled->store(true, std::memory_order_release); + } + }); + + compact_options.canceled->store(false, std::memory_order_release); + dbfull()->CompactRange(compact_options, nullptr, nullptr); + dbfull()->TEST_WaitForCompact(true); + + ASSERT_EQ(compactions_run, 3); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearCallBack( + "DBImpl::RunManualCompaction()::1"); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearCallBack( + "CompactionJob::Run():PausingManualCompaction:1"); + + // Compactions should work again if we re-enable them.. + compact_options.canceled->store(false, std::memory_order_relaxed); + dbfull()->CompactRange(compact_options, nullptr, nullptr); + dbfull()->TEST_WaitForCompact(true); +#ifndef ROCKSDB_LITE + ASSERT_EQ("0,0,0,0,0,0,2", FilesPerLevel()); +#endif // !ROCKSDB_LITE + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); +} + +TEST_F(DBTest2, CancelManualCompaction2) { + CompactRangeOptions compact_options; + auto canceledPtr = + std::unique_ptr>(new std::atomic{true}); + compact_options.canceled = canceledPtr.get(); + compact_options.max_subcompactions = 1; + + Options options = CurrentOptions(); + options.disable_auto_compactions = true; + options.num_levels = 7; + + Random rnd(301); + auto generate_files = [&]() { + for (int i = 0; i < options.num_levels; i++) { + for (int j = 0; j < options.num_levels - i + 1; j++) { + for (int k = 0; k < 1000; k++) { + ASSERT_OK(Put(Key(k + j * 1000), rnd.RandomString(50))); + } + Flush(); + } + + for (int l = 1; l < options.num_levels - i; l++) { + MoveFilesToLevel(l); + } + } + }; + + DestroyAndReopen(options); + generate_files(); +#ifndef ROCKSDB_LITE + ASSERT_EQ("2,3,4,5,6,7,8", FilesPerLevel()); +#endif // !ROCKSDB_LITE + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + + int compactions_run = 0; + std::atomic kv_compactions{0}; + int compactions_stopped_at = 0; + int kv_compactions_stopped_at = 0; + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::RunManualCompaction()::1", [&](void* /*arg*/) { + ++compactions_run; + // After 3 compactions disable + }); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "CompactionIterator:ProcessKV", [&](void* /*arg*/) { + int kv_compactions_run = + kv_compactions.fetch_add(1, std::memory_order_release); + if (kv_compactions_run == 5) { + compact_options.canceled->store(true, std::memory_order_release); + kv_compactions_stopped_at = kv_compactions_run; + compactions_stopped_at = compactions_run; + } + }); + + compact_options.canceled->store(false, std::memory_order_release); + dbfull()->CompactRange(compact_options, nullptr, nullptr); + dbfull()->TEST_WaitForCompact(true); + + // NOTE: as we set compact_options.max_subcompacitons = 1, and store true to + // the canceled variable from the single compacting thread (via callback), + // this value is deterministically kv_compactions_stopped_at + 1. + ASSERT_EQ(kv_compactions, kv_compactions_stopped_at + 1); + ASSERT_EQ(compactions_run, compactions_stopped_at); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearCallBack( + "CompactionIterator::ProcessKV"); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearCallBack( + "DBImpl::RunManualCompaction()::1"); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearCallBack( + "CompactionJob::Run():PausingManualCompaction:1"); + + // Compactions should work again if we re-enable them.. + compact_options.canceled->store(false, std::memory_order_relaxed); + dbfull()->CompactRange(compact_options, nullptr, nullptr); + dbfull()->TEST_WaitForCompact(true); +#ifndef ROCKSDB_LITE + ASSERT_EQ("0,0,0,0,0,0,2", FilesPerLevel()); +#endif // !ROCKSDB_LITE + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); +} + TEST_F(DBTest2, OptimizeForPointLookup) { Options options = CurrentOptions(); Close(); diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 1669997bd1..f5f3c93633 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -1656,6 +1656,9 @@ struct CompactRangeOptions { // Set user-defined timestamp low bound, the data with older timestamp than // low bound maybe GCed by compaction. Default: nullptr Slice* full_history_ts_low = nullptr; + + // Allows cancellation of an in-progress manual compaction. + std::atomic* canceled = nullptr; }; // IngestExternalFileOptions is used by IngestExternalFile()