From c73cf7a878ee217059b5c207f9ebf8507e9be6f1 Mon Sep 17 00:00:00 2001 From: Yu Zhang Date: Tue, 18 Jun 2024 10:51:29 -0700 Subject: [PATCH] Add CompactForTieringCollector to support automatically trigger compaction for tiering use case (#12760) Summary: This PR adds user property collector factory `CompactForTieringCollectorFactory` to support observe SST file and mark it as need compaction for fast tracking data to the proper tier. A triggering ratio `compaction_trigger_ratio_` can be configured to achieve the following: 1) Setting the ratio to be equal to or smaller than 0 disables this collector 2) Setting the ratio to be within (0, 1] will write the number of observed eligible entries into a user property and marks a file as need-compaction when aforementioned condition is met. 3) Setting the ratio to be higher than 1 can be used to just writes the user table property, and not mark any file as need compaction. For a column family that does not enable tiering feature, even if an effective configuration is provided, this collector is still disabled. For a file that is already on the last level, this collector is also disabled. Pull Request resolved: https://github.com/facebook/rocksdb/pull/12760 Test Plan: Added unit tests Reviewed By: pdillinger Differential Revision: D58734976 Pulled By: jowlyzhang fbshipit-source-id: 6daab2c4f62b5c6689c3c03e3b3907bbbe6b7a81 --- CMakeLists.txt | 2 + Makefile | 3 + TARGETS | 7 + db/builder.cc | 4 +- db/builder.h | 2 +- db/compaction/compaction_iterator.cc | 8 +- db/compaction/compaction_iterator.h | 2 + db/compaction/compaction_iterator_test.cc | 1 + db/compaction/compaction_job.cc | 41 ++--- db/compaction/compaction_job.h | 2 + db/compaction/tiered_compaction_test.cc | 77 +++++++++ db/db_impl/db_impl_open.cc | 28 ++-- db/flush_job.cc | 33 +++- db/flush_job.h | 14 ++ db/repair.cc | 7 +- db/seqno_to_time_mapping.cc | 30 ++++ db/seqno_to_time_mapping.h | 9 ++ db/table_properties_collector.h | 11 +- db/table_properties_collector_test.cc | 4 +- include/rocksdb/table_properties.h | 17 ++ .../utilities/table_properties_collectors.h | 46 ++++++ src.mk | 2 + .../block_based/block_based_table_builder.cc | 6 +- table/plain/plain_table_builder.cc | 4 +- table/sst_file_writer_collectors.h | 4 +- table/table_builder.h | 15 +- .../auto_trigger_compaction_for_tiering.md | 1 + .../compact_for_tiering_collector.cc | 146 ++++++++++++++++++ .../compact_for_tiering_collector.h | 45 ++++++ .../compact_for_tiering_collector_test.cc | 139 +++++++++++++++++ .../compact_on_deletion_collector.cc | 12 ++ .../compact_on_deletion_collector_test.cc | 3 + 32 files changed, 660 insertions(+), 65 deletions(-) create mode 100644 unreleased_history/new_features/auto_trigger_compaction_for_tiering.md create mode 100644 utilities/table_properties_collectors/compact_for_tiering_collector.cc create mode 100644 utilities/table_properties_collectors/compact_for_tiering_collector.h create mode 100644 utilities/table_properties_collectors/compact_for_tiering_collector_test.cc diff --git a/CMakeLists.txt b/CMakeLists.txt index c2f5ecd011..b4f1747c0d 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -939,6 +939,7 @@ set(SOURCES utilities/persistent_cache/volatile_tier_impl.cc utilities/simulator_cache/cache_simulator.cc utilities/simulator_cache/sim_cache.cc + utilities/table_properties_collectors/compact_for_tiering_collector.cc utilities/table_properties_collectors/compact_on_deletion_collector.cc utilities/trace/file_trace_reader_writer.cc utilities/trace/replayer_impl.cc @@ -1481,6 +1482,7 @@ if(WITH_TESTS) utilities/persistent_cache/persistent_cache_test.cc utilities/simulator_cache/cache_simulator_test.cc utilities/simulator_cache/sim_cache_test.cc + utilities/table_properties_collectors/compact_for_tiering_collector_test.cc utilities/table_properties_collectors/compact_on_deletion_collector_test.cc utilities/transactions/optimistic_transaction_test.cc utilities/transactions/transaction_test.cc diff --git a/Makefile b/Makefile index fd12560440..1dbaedf811 100644 --- a/Makefile +++ b/Makefile @@ -1642,6 +1642,9 @@ compaction_job_stats_test: $(OBJ_DIR)/db/compaction/compaction_job_stats_test.o compaction_service_test: $(OBJ_DIR)/db/compaction/compaction_service_test.o $(TEST_LIBRARY) $(LIBRARY) $(AM_LINK) +compact_for_tiering_collector_test: $(OBJ_DIR)/utilities/table_properties_collectors/compact_for_tiering_collector_test.o $(TEST_LIBRARY) $(LIBRARY) + $(AM_LINK) + compact_on_deletion_collector_test: $(OBJ_DIR)/utilities/table_properties_collectors/compact_on_deletion_collector_test.o $(TEST_LIBRARY) $(LIBRARY) $(AM_LINK) diff --git a/TARGETS b/TARGETS index a274da1518..b9cc254fbf 100644 --- a/TARGETS +++ b/TARGETS @@ -317,6 +317,7 @@ cpp_library_wrapper(name="rocksdb_lib", srcs=[ "utilities/persistent_cache/volatile_tier_impl.cc", "utilities/simulator_cache/cache_simulator.cc", "utilities/simulator_cache/sim_cache.cc", + "utilities/table_properties_collectors/compact_for_tiering_collector.cc", "utilities/table_properties_collectors/compact_on_deletion_collector.cc", "utilities/trace/file_trace_reader_writer.cc", "utilities/trace/replayer_impl.cc", @@ -4626,6 +4627,12 @@ cpp_unittest_wrapper(name="compact_files_test", extra_compiler_flags=[]) +cpp_unittest_wrapper(name="compact_for_tiering_collector_test", + srcs=["utilities/table_properties_collectors/compact_for_tiering_collector_test.cc"], + deps=[":rocksdb_test_lib"], + extra_compiler_flags=[]) + + cpp_unittest_wrapper(name="compact_on_deletion_collector_test", srcs=["utilities/table_properties_collectors/compact_on_deletion_collector_test.cc"], deps=[":rocksdb_test_lib"], diff --git a/db/builder.cc b/db/builder.cc index d2557a0b0c..5bc75b0603 100644 --- a/db/builder.cc +++ b/db/builder.cc @@ -64,7 +64,7 @@ Status BuildTable( std::vector> range_del_iters, FileMetaData* meta, std::vector* blob_file_additions, - std::vector snapshots, + std::vector snapshots, SequenceNumber earliest_snapshot, SequenceNumber earliest_write_conflict_snapshot, SequenceNumber job_snapshot, SnapshotChecker* snapshot_checker, bool paranoid_file_checks, InternalStats* internal_stats, @@ -195,7 +195,7 @@ Status BuildTable( const std::atomic kManualCompactionCanceledFalse{false}; CompactionIterator c_iter( - iter, ucmp, &merge, kMaxSequenceNumber, &snapshots, + iter, ucmp, &merge, kMaxSequenceNumber, &snapshots, earliest_snapshot, earliest_write_conflict_snapshot, job_snapshot, snapshot_checker, env, ShouldReportDetailedTime(env, ioptions.stats), true /* internal key corruption is not ok */, range_del_agg.get(), diff --git a/db/builder.h b/db/builder.h index f228f8d0fe..08dd5fcab0 100644 --- a/db/builder.h +++ b/db/builder.h @@ -57,7 +57,7 @@ Status BuildTable( std::vector> range_del_iters, FileMetaData* meta, std::vector* blob_file_additions, - std::vector snapshots, + std::vector snapshots, SequenceNumber earliest_snapshot, SequenceNumber earliest_write_conflict_snapshot, SequenceNumber job_snapshot, SnapshotChecker* snapshot_checker, bool paranoid_file_checks, InternalStats* internal_stats, diff --git a/db/compaction/compaction_iterator.cc b/db/compaction/compaction_iterator.cc index 60592489bc..1b69a9ecfd 100644 --- a/db/compaction/compaction_iterator.cc +++ b/db/compaction/compaction_iterator.cc @@ -25,6 +25,7 @@ namespace ROCKSDB_NAMESPACE { CompactionIterator::CompactionIterator( InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper, SequenceNumber last_sequence, std::vector* snapshots, + SequenceNumber earliest_snapshot, SequenceNumber earliest_write_conflict_snapshot, SequenceNumber job_snapshot, const SnapshotChecker* snapshot_checker, Env* env, bool report_detailed_time, bool expect_valid_internal_key, @@ -40,7 +41,7 @@ CompactionIterator::CompactionIterator( const SequenceNumber preserve_time_min_seqno, const SequenceNumber preclude_last_level_min_seqno) : CompactionIterator( - input, cmp, merge_helper, last_sequence, snapshots, + input, cmp, merge_helper, last_sequence, snapshots, earliest_snapshot, earliest_write_conflict_snapshot, job_snapshot, snapshot_checker, env, report_detailed_time, expect_valid_internal_key, range_del_agg, blob_file_builder, allow_data_in_errors, enforce_single_del_contracts, @@ -54,6 +55,7 @@ CompactionIterator::CompactionIterator( CompactionIterator::CompactionIterator( InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper, SequenceNumber /*last_sequence*/, std::vector* snapshots, + SequenceNumber earliest_snapshot, SequenceNumber earliest_write_conflict_snapshot, SequenceNumber job_snapshot, const SnapshotChecker* snapshot_checker, Env* env, bool report_detailed_time, bool expect_valid_internal_key, @@ -91,9 +93,7 @@ CompactionIterator::CompactionIterator( // snapshots_ cannot be nullptr, but we will assert later in the body of // the constructor. visible_at_tip_(snapshots_ ? snapshots_->empty() : false), - earliest_snapshot_(!snapshots_ || snapshots_->empty() - ? kMaxSequenceNumber - : snapshots_->at(0)), + earliest_snapshot_(earliest_snapshot), info_log_(info_log), allow_data_in_errors_(allow_data_in_errors), enforce_single_del_contracts_(enforce_single_del_contracts), diff --git a/db/compaction/compaction_iterator.h b/db/compaction/compaction_iterator.h index 5c546feca9..39def1ebc1 100644 --- a/db/compaction/compaction_iterator.h +++ b/db/compaction/compaction_iterator.h @@ -203,6 +203,7 @@ class CompactionIterator { CompactionIterator( InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper, SequenceNumber last_sequence, std::vector* snapshots, + SequenceNumber earliest_snapshot, SequenceNumber earliest_write_conflict_snapshot, SequenceNumber job_snapshot, const SnapshotChecker* snapshot_checker, Env* env, bool report_detailed_time, bool expect_valid_internal_key, @@ -222,6 +223,7 @@ class CompactionIterator { CompactionIterator( InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper, SequenceNumber last_sequence, std::vector* snapshots, + SequenceNumber earliest_snapshot, SequenceNumber earliest_write_conflict_snapshot, SequenceNumber job_snapshot, const SnapshotChecker* snapshot_checker, Env* env, bool report_detailed_time, bool expect_valid_internal_key, diff --git a/db/compaction/compaction_iterator_test.cc b/db/compaction/compaction_iterator_test.cc index f2fb7e4bb0..e3b4575f56 100644 --- a/db/compaction/compaction_iterator_test.cc +++ b/db/compaction/compaction_iterator_test.cc @@ -296,6 +296,7 @@ class CompactionIteratorTest : public testing::TestWithParam { iter_->SeekToFirst(); c_iter_.reset(new CompactionIterator( iter_.get(), cmp_, merge_helper_.get(), last_sequence, &snapshots_, + snapshots_.empty() ? kMaxSequenceNumber : snapshots_.at(0), earliest_write_conflict_snapshot, kMaxSequenceNumber, snapshot_checker_.get(), Env::Default(), false /* report_detailed_time */, false, range_del_agg_.get(), diff --git a/db/compaction/compaction_job.cc b/db/compaction/compaction_job.cc index 0bdc9fd522..97c1886ff1 100644 --- a/db/compaction/compaction_job.cc +++ b/db/compaction/compaction_job.cc @@ -174,6 +174,9 @@ CompactionJob::CompactionJob( db_mutex_(db_mutex), db_error_handler_(db_error_handler), existing_snapshots_(std::move(existing_snapshots)), + earliest_snapshot_(existing_snapshots_.empty() + ? kMaxSequenceNumber + : existing_snapshots_.at(0)), earliest_write_conflict_snapshot_(earliest_write_conflict_snapshot), snapshot_checker_(snapshot_checker), job_context_(job_context), @@ -282,6 +285,7 @@ void CompactionJob::Prepare() { // collect all seqno->time information from the input files which will be used // to encode seqno->time to the output files. + uint64_t preserve_time_duration = std::max(c->immutable_options()->preserve_internal_time_seconds, c->immutable_options()->preclude_last_level_data_seconds); @@ -319,28 +323,11 @@ void CompactionJob::Prepare() { seqno_to_time_mapping_.Enforce(); } else { seqno_to_time_mapping_.Enforce(_current_time); - uint64_t preserve_time = - static_cast(_current_time) > preserve_time_duration - ? _current_time - preserve_time_duration - : 0; - // GetProximalSeqnoBeforeTime tells us the last seqno known to have been - // written at or before the given time. + 1 to get the minimum we should - // preserve without excluding anything that might have been written on or - // after the given time. - preserve_time_min_seqno_ = - seqno_to_time_mapping_.GetProximalSeqnoBeforeTime(preserve_time) + 1; - if (c->immutable_options()->preclude_last_level_data_seconds > 0) { - uint64_t preclude_last_level_time = - static_cast(_current_time) > - c->immutable_options()->preclude_last_level_data_seconds - ? _current_time - - c->immutable_options()->preclude_last_level_data_seconds - : 0; - preclude_last_level_min_seqno_ = - seqno_to_time_mapping_.GetProximalSeqnoBeforeTime( - preclude_last_level_time) + - 1; - } + seqno_to_time_mapping_.GetCurrentTieringCutoffSeqnos( + static_cast(_current_time), + c->immutable_options()->preserve_internal_time_seconds, + c->immutable_options()->preclude_last_level_data_seconds, + &preserve_time_min_seqno_, &preclude_last_level_min_seqno_); } // For accuracy of the GetProximalSeqnoBeforeTime queries above, we only // limit the capacity after them. @@ -1295,8 +1282,9 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { auto c_iter = std::make_unique( input, cfd->user_comparator(), &merge, versions_->LastSequence(), - &existing_snapshots_, earliest_write_conflict_snapshot_, job_snapshot_seq, - snapshot_checker_, env_, ShouldReportDetailedTime(env_, stats_), + &existing_snapshots_, earliest_snapshot_, + earliest_write_conflict_snapshot_, job_snapshot_seq, snapshot_checker_, + env_, ShouldReportDetailedTime(env_, stats_), /*expect_valid_internal_key=*/true, range_del_agg.get(), blob_file_builder.get(), db_options_.allow_data_in_errors, db_options_.enforce_single_del_contracts, manual_compaction_canceled_, @@ -1969,7 +1957,10 @@ Status CompactionJob::OpenCompactionOutputFile(SubcompactionState* sub_compact, cfd->GetName(), sub_compact->compaction->output_level(), bottommost_level_, TableFileCreationReason::kCompaction, 0 /* oldest_key_time */, current_time, db_id_, db_session_id_, - sub_compact->compaction->max_output_file_size(), file_number); + sub_compact->compaction->max_output_file_size(), file_number, + preclude_last_level_min_seqno_ == kMaxSequenceNumber + ? preclude_last_level_min_seqno_ + : std::min(earliest_snapshot_, preclude_last_level_min_seqno_)); outputs.NewBuilder(tboptions); diff --git a/db/compaction/compaction_job.h b/db/compaction/compaction_job.h index caa1593e72..224f4e46f3 100644 --- a/db/compaction/compaction_job.h +++ b/db/compaction/compaction_job.h @@ -314,6 +314,8 @@ class CompactionJob { // deleted because that version is not visible in any snapshot. std::vector existing_snapshots_; + SequenceNumber earliest_snapshot_; + // This is the earliest snapshot that could be used for write-conflict // checking by a transaction. For any user-key newer than this snapshot, we // should make sure not to remove evidence that a write occurred. diff --git a/db/compaction/tiered_compaction_test.cc b/db/compaction/tiered_compaction_test.cc index c99db98509..93b9791210 100644 --- a/db/compaction/tiered_compaction_test.cc +++ b/db/compaction/tiered_compaction_test.cc @@ -13,6 +13,7 @@ #include "rocksdb/iostats_context.h" #include "rocksdb/listener.h" #include "rocksdb/utilities/debug.h" +#include "rocksdb/utilities/table_properties_collectors.h" #include "test_util/mock_time_env.h" #include "utilities/merge_operators.h" @@ -1734,6 +1735,82 @@ TEST_P(TimedPutPrecludeLastLevelTest, PreserveTimedPutOnPenultimateLevel) { Close(); } +TEST_P(TimedPutPrecludeLastLevelTest, AutoTriggerCompaction) { + const int kNumTrigger = 10; + const int kNumLevels = 7; + const int kNumKeys = 200; + + Options options = CurrentOptions(); + options.compaction_style = kCompactionStyleUniversal; + options.preclude_last_level_data_seconds = 60; + options.preserve_internal_time_seconds = 0; + options.env = mock_env_.get(); + options.level0_file_num_compaction_trigger = kNumTrigger; + options.num_levels = kNumLevels; + options.last_level_temperature = Temperature::kCold; + ConfigOptions config_options; + config_options.ignore_unsupported_options = false; + std::shared_ptr factory; + std::string id = CompactForTieringCollectorFactory::kClassName(); + ASSERT_OK(TablePropertiesCollectorFactory::CreateFromString( + config_options, "compaction_trigger_ratio=0.4; id=" + id, &factory)); + auto collector_factory = + factory->CheckedCast(); + options.table_properties_collector_factories.push_back(factory); + DestroyAndReopen(options); + WriteOptions wo; + wo.protection_bytes_per_key = GetParam(); + + Random rnd(301); + + dbfull()->TEST_WaitForPeriodicTaskRun([&] { + mock_clock_->MockSleepForSeconds(static_cast(rnd.Uniform(10) + 1)); + }); + + for (int i = 0; i < kNumKeys / 4; i++) { + ASSERT_OK(Put(Key(i), rnd.RandomString(100), wo)); + dbfull()->TEST_WaitForPeriodicTaskRun([&] { + mock_clock_->MockSleepForSeconds(static_cast(rnd.Uniform(2))); + }); + } + // Create one file with regular Put. + ASSERT_OK(Flush()); + + // Create one file with TimedPut. + // These data are eligible to be put on the last level once written to db + // and compaction will fast track them to the last level. + for (int i = kNumKeys / 4; i < kNumKeys / 2; i++) { + ASSERT_OK(TimedPut(0, Key(i), rnd.RandomString(100), 50, wo)); + } + ASSERT_OK(Flush()); + + // TimedPut file moved to the last level via auto triggered compaction. + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + ASSERT_EQ("1,0,0,0,0,0,1", FilesPerLevel()); + ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), 0); + ASSERT_GT(GetSstSizeHelper(Temperature::kCold), 0); + + collector_factory->SetCompactionTriggerRatio(1.1); + for (int i = kNumKeys / 2; i < kNumKeys * 3 / 4; i++) { + ASSERT_OK(TimedPut(0, Key(i), rnd.RandomString(100), 50, wo)); + } + ASSERT_OK(Flush()); + + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + ASSERT_EQ("2,0,0,0,0,0,1", FilesPerLevel()); + + collector_factory->SetCompactionTriggerRatio(0); + for (int i = kNumKeys * 3 / 4; i < kNumKeys; i++) { + ASSERT_OK(TimedPut(0, Key(i), rnd.RandomString(100), 50, wo)); + } + ASSERT_OK(Flush()); + + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + ASSERT_EQ("3,0,0,0,0,0,1", FilesPerLevel()); + + Close(); +} + INSTANTIATE_TEST_CASE_P(TimedPutPrecludeLastLevelTest, TimedPutPrecludeLastLevelTest, ::testing::Values(0, 8)); diff --git a/db/db_impl/db_impl_open.cc b/db/db_impl/db_impl_open.cc index 61aa289b71..0bd72b9102 100644 --- a/db/db_impl/db_impl_open.cc +++ b/db/db_impl/db_impl_open.cc @@ -1670,6 +1670,8 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd, SequenceNumber earliest_write_conflict_snapshot; std::vector snapshot_seqs = snapshots_.GetAll(&earliest_write_conflict_snapshot); + SequenceNumber earliest_snapshot = + (snapshot_seqs.empty() ? kMaxSequenceNumber : snapshot_seqs.at(0)); auto snapshot_checker = snapshot_checker_.get(); if (use_custom_gc_ && snapshot_checker == nullptr) { snapshot_checker = DisableGCSnapshotChecker::Instance(); @@ -1689,6 +1691,7 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd, IOStatus io_s; const ReadOptions read_option(Env::IOActivity::kDBOpen); const WriteOptions write_option(Env::IO_HIGH, Env::IOActivity::kDBOpen); + TableBuilderOptions tboptions( *cfd->ioptions(), mutable_cf_options, read_option, write_option, cfd->internal_comparator(), cfd->internal_tbl_prop_coll_factories(), @@ -1697,21 +1700,22 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd, 0 /* level */, false /* is_bottommost */, TableFileCreationReason::kRecovery, 0 /* oldest_key_time */, 0 /* file_creation_time */, db_id_, db_session_id_, - 0 /* target_file_size */, meta.fd.GetNumber()); + 0 /* target_file_size */, meta.fd.GetNumber(), kMaxSequenceNumber); Version* version = cfd->current(); version->Ref(); uint64_t num_input_entries = 0; - s = BuildTable( - dbname_, versions_.get(), immutable_db_options_, tboptions, - file_options_for_compaction_, cfd->table_cache(), iter.get(), - std::move(range_del_iters), &meta, &blob_file_additions, - snapshot_seqs, earliest_write_conflict_snapshot, kMaxSequenceNumber, - snapshot_checker, paranoid_file_checks, cfd->internal_stats(), &io_s, - io_tracer_, BlobFileCreationReason::kRecovery, - nullptr /* seqno_to_time_mapping */, &event_logger_, job_id, - nullptr /* table_properties */, write_hint, - nullptr /*full_history_ts_low*/, &blob_callback_, version, - &num_input_entries); + s = BuildTable(dbname_, versions_.get(), immutable_db_options_, tboptions, + file_options_for_compaction_, cfd->table_cache(), + iter.get(), std::move(range_del_iters), &meta, + &blob_file_additions, snapshot_seqs, earliest_snapshot, + earliest_write_conflict_snapshot, kMaxSequenceNumber, + snapshot_checker, paranoid_file_checks, + cfd->internal_stats(), &io_s, io_tracer_, + BlobFileCreationReason::kRecovery, + nullptr /* seqno_to_time_mapping */, &event_logger_, + job_id, nullptr /* table_properties */, write_hint, + nullptr /*full_history_ts_low*/, &blob_callback_, version, + &num_input_entries); version->Unref(); LogFlush(immutable_db_options_.info_log); ROCKS_LOG_DEBUG(immutable_db_options_.info_log, diff --git a/db/flush_job.cc b/db/flush_job.cc index 1a317f3b23..72edf18dd8 100644 --- a/db/flush_job.cc +++ b/db/flush_job.cc @@ -115,6 +115,9 @@ FlushJob::FlushJob( db_mutex_(db_mutex), shutting_down_(shutting_down), existing_snapshots_(std::move(existing_snapshots)), + earliest_snapshot_(existing_snapshots_.empty() + ? kMaxSequenceNumber + : existing_snapshots_.at(0)), earliest_write_conflict_snapshot_(earliest_write_conflict_snapshot), snapshot_checker_(snapshot_checker), job_context_(job_context), @@ -194,6 +197,7 @@ void FlushJob::PickMemTable() { // Track effective cutoff user-defined timestamp during flush if // user-defined timestamps can be stripped. GetEffectiveCutoffUDTForPickedMemTables(); + GetPrecludeLastLevelMinSeqno(); ReportFlushInputSize(mems_); @@ -502,7 +506,7 @@ Status FlushJob::MemPurge() { const std::atomic kManualCompactionCanceledFalse{false}; CompactionIterator c_iter( iter.get(), (cfd_->internal_comparator()).user_comparator(), &merge, - kMaxSequenceNumber, &existing_snapshots_, + kMaxSequenceNumber, &existing_snapshots_, earliest_snapshot_, earliest_write_conflict_snapshot_, job_snapshot_seq, snapshot_checker_, env, ShouldReportDetailedTime(env, ioptions->stats), true /* internal key corruption is not ok */, range_del_agg.get(), @@ -968,14 +972,17 @@ Status FlushJob::WriteLevel0Table() { cfd_->GetID(), cfd_->GetName(), 0 /* level */, false /* is_bottommost */, TableFileCreationReason::kFlush, oldest_key_time, current_time, db_id_, db_session_id_, - 0 /* target_file_size */, meta_.fd.GetNumber()); + 0 /* target_file_size */, meta_.fd.GetNumber(), + preclude_last_level_min_seqno_ == kMaxSequenceNumber + ? preclude_last_level_min_seqno_ + : std::min(earliest_snapshot_, preclude_last_level_min_seqno_)); const SequenceNumber job_snapshot_seq = job_context_->GetJobSnapshotSequence(); s = BuildTable( dbname_, versions_, db_options_, tboptions, file_options_, cfd_->table_cache(), iter.get(), std::move(range_del_iters), &meta_, - &blob_file_additions, existing_snapshots_, + &blob_file_additions, existing_snapshots_, earliest_snapshot_, earliest_write_conflict_snapshot_, job_snapshot_seq, snapshot_checker_, mutable_cf_options_.paranoid_file_checks, cfd_->internal_stats(), &io_s, io_tracer_, @@ -1154,6 +1161,26 @@ void FlushJob::GetEffectiveCutoffUDTForPickedMemTables() { } } +void FlushJob::GetPrecludeLastLevelMinSeqno() { + if (cfd_->ioptions()->preclude_last_level_data_seconds == 0) { + return; + } + int64_t current_time = 0; + Status s = db_options_.clock->GetCurrentTime(¤t_time); + if (!s.ok()) { + ROCKS_LOG_WARN(db_options_.info_log, + "Failed to get current time in Flush: Status: %s", + s.ToString().c_str()); + } else { + SequenceNumber preserve_time_min_seqno; + seqno_to_time_mapping_->GetCurrentTieringCutoffSeqnos( + static_cast(current_time), + cfd_->ioptions()->preserve_internal_time_seconds, + cfd_->ioptions()->preclude_last_level_data_seconds, + &preserve_time_min_seqno, &preclude_last_level_min_seqno_); + } +} + Status FlushJob::MaybeIncreaseFullHistoryTsLowToAboveCutoffUDT() { db_mutex_->AssertHeld(); const auto* ucmp = cfd_->user_comparator(); diff --git a/db/flush_job.h b/db/flush_job.h index 337e9cd9bc..596d5c2045 100644 --- a/db/flush_job.h +++ b/db/flush_job.h @@ -143,6 +143,13 @@ class FlushJob { // `MaybeIncreaseFullHistoryTsLowToAboveCutoffUDT` for details. void GetEffectiveCutoffUDTForPickedMemTables(); + // If this column family enables tiering feature, it will find the current + // `preclude_last_level_min_seqno_`, and the smaller one between this and + // the `earliset_snapshot_` will later be announced to user property + // collectors. It indicates to tiering use cases which data are old enough to + // be placed on the last level. + void GetPrecludeLastLevelMinSeqno(); + Status MaybeIncreaseFullHistoryTsLowToAboveCutoffUDT(); const std::string& dbname_; @@ -161,6 +168,7 @@ class FlushJob { InstrumentedMutex* db_mutex_; std::atomic* shutting_down_; std::vector existing_snapshots_; + SequenceNumber earliest_snapshot_; SequenceNumber earliest_write_conflict_snapshot_; SnapshotChecker* snapshot_checker_; JobContext* job_context_; @@ -221,6 +229,12 @@ class FlushJob { // Keeps track of the newest user-defined timestamp for this flush job if // `persist_user_defined_timestamps` flag is false. std::string cutoff_udt_; + + // The current minimum seqno that compaction jobs will preclude the data from + // the last level. Data with seqnos larger than this or larger than + // `earliest_snapshot_` will be output to the penultimate level had it gone + // through a compaction to the last level. + SequenceNumber preclude_last_level_min_seqno_ = kMaxSequenceNumber; }; } // namespace ROCKSDB_NAMESPACE diff --git a/db/repair.cc b/db/repair.cc index 4fe8b47886..c3c96fefc2 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -480,9 +480,10 @@ class Repairer { dbname_, /* versions */ nullptr, immutable_db_options_, tboptions, file_options_, table_cache_.get(), iter.get(), std::move(range_del_iters), &meta, nullptr /* blob_file_additions */, - {}, kMaxSequenceNumber, kMaxSequenceNumber, snapshot_checker, - false /* paranoid_file_checks*/, nullptr /* internal_stats */, &io_s, - nullptr /*IOTracer*/, BlobFileCreationReason::kRecovery, + {}, kMaxSequenceNumber, kMaxSequenceNumber, kMaxSequenceNumber, + snapshot_checker, false /* paranoid_file_checks*/, + nullptr /* internal_stats */, &io_s, nullptr /*IOTracer*/, + BlobFileCreationReason::kRecovery, nullptr /* seqno_to_time_mapping */, nullptr /* event_logger */, 0 /* job_id */, nullptr /* table_properties */, write_hint); ROCKS_LOG_INFO(db_options_.info_log, diff --git a/db/seqno_to_time_mapping.cc b/db/seqno_to_time_mapping.cc index 6355570196..b540fd9196 100644 --- a/db/seqno_to_time_mapping.cc +++ b/db/seqno_to_time_mapping.cc @@ -69,6 +69,36 @@ SequenceNumber SeqnoToTimeMapping::GetProximalSeqnoBeforeTime( return it->seqno; } +void SeqnoToTimeMapping::GetCurrentTieringCutoffSeqnos( + uint64_t current_time, uint64_t preserve_internal_time_seconds, + uint64_t preclude_last_level_data_seconds, + SequenceNumber* preserve_time_min_seqno, + SequenceNumber* preclude_last_level_min_seqno) const { + uint64_t preserve_time_duration = std::max(preserve_internal_time_seconds, + preclude_last_level_data_seconds); + if (preserve_time_duration <= 0) { + return; + } + uint64_t preserve_time = current_time > preserve_time_duration + ? current_time - preserve_time_duration + : 0; + // GetProximalSeqnoBeforeTime tells us the last seqno known to have been + // written at or before the given time. + 1 to get the minimum we should + // preserve without excluding anything that might have been written on or + // after the given time. + if (preserve_time_min_seqno) { + *preserve_time_min_seqno = GetProximalSeqnoBeforeTime(preserve_time) + 1; + } + if (preclude_last_level_data_seconds > 0 && preclude_last_level_min_seqno) { + uint64_t preclude_last_level_time = + current_time > preclude_last_level_data_seconds + ? current_time - preclude_last_level_data_seconds + : 0; + *preclude_last_level_min_seqno = + GetProximalSeqnoBeforeTime(preclude_last_level_time) + 1; + } +} + void SeqnoToTimeMapping::EnforceMaxTimeSpan(uint64_t now) { assert(enforced_); // at least sorted uint64_t cutoff_time; diff --git a/db/seqno_to_time_mapping.h b/db/seqno_to_time_mapping.h index a9255a806f..741e643694 100644 --- a/db/seqno_to_time_mapping.h +++ b/db/seqno_to_time_mapping.h @@ -213,6 +213,15 @@ class SeqnoToTimeMapping { // must be in enforced state as a precondition. SequenceNumber GetProximalSeqnoBeforeTime(uint64_t time) const; + // Given current time, the configured `preserve_internal_time_seconds`, and + // `preclude_last_level_data_seconds`, find the relevant cutoff sequence + // numbers for tiering. + void GetCurrentTieringCutoffSeqnos( + uint64_t current_time, uint64_t preserve_internal_time_seconds, + uint64_t preclude_last_level_data_seconds, + SequenceNumber* preserve_time_min_seqno, + SequenceNumber* preclude_last_level_min_seqno) const; + // Encode to a binary string by appending to `dest`. // Because this is a const operation depending on sortedness, the structure // must be in enforced state as a precondition. diff --git a/db/table_properties_collector.h b/db/table_properties_collector.h index 53aff51cba..6b745b6549 100644 --- a/db/table_properties_collector.h +++ b/db/table_properties_collector.h @@ -44,7 +44,9 @@ class InternalTblPropCollFactory { virtual ~InternalTblPropCollFactory() {} // has to be thread-safe virtual InternalTblPropColl* CreateInternalTblPropColl( - uint32_t column_family_id, int level_at_creation) = 0; + uint32_t column_family_id, int level_at_creation, int num_levels, + SequenceNumber last_level_inclusive_max_seqno_threshold = + kMaxSequenceNumber) = 0; // The name of the properties collector can be used for debugging purpose. virtual const char* Name() const = 0; @@ -92,10 +94,15 @@ class UserKeyTablePropertiesCollectorFactory std::shared_ptr user_collector_factory) : user_collector_factory_(user_collector_factory) {} InternalTblPropColl* CreateInternalTblPropColl( - uint32_t column_family_id, int level_at_creation) override { + uint32_t column_family_id, int level_at_creation, int num_levels, + SequenceNumber last_level_inclusive_max_seqno_threshold = + kMaxSequenceNumber) override { TablePropertiesCollectorFactory::Context context; context.column_family_id = column_family_id; context.level_at_creation = level_at_creation; + context.num_levels = num_levels; + context.last_level_inclusive_max_seqno_threshold = + last_level_inclusive_max_seqno_threshold; TablePropertiesCollector* collector = user_collector_factory_->CreateTablePropertiesCollector(context); if (collector) { diff --git a/db/table_properties_collector_test.cc b/db/table_properties_collector_test.cc index a10ebdc24d..c9dfb7d0ff 100644 --- a/db/table_properties_collector_test.cc +++ b/db/table_properties_collector_test.cc @@ -209,7 +209,9 @@ class RegularKeysStartWithAFactory : public InternalTblPropCollFactory, } } InternalTblPropColl* CreateInternalTblPropColl( - uint32_t /*column_family_id*/, int /* level_at_creation */) override { + uint32_t /*column_family_id*/, int /* level_at_creation */, + int /* num_levels */, + SequenceNumber /* last_level_inclusive_max_seqno_threshold */) override { return new RegularKeysStartWithAInternal(); } const char* Name() const override { return "RegularKeysStartWithA"; } diff --git a/include/rocksdb/table_properties.h b/include/rocksdb/table_properties.h index f444275b96..225cd7788a 100644 --- a/include/rocksdb/table_properties.h +++ b/include/rocksdb/table_properties.h @@ -157,8 +157,25 @@ class TablePropertiesCollectorFactory : public Customizable { // The level at creating the SST file (i.e, table), of which the // properties are being collected. int level_at_creation = kUnknownLevelAtCreation; + int num_levels = kUnknownNumLevels; + // In the tiering case, data with seqnos smaller than or equal to this + // cutoff sequence number will be considered by a compaction job as eligible + // to be placed on the last level. When this is the maximum sequence number, + // it indicates tiering is disabled. + SequenceNumber last_level_inclusive_max_seqno_threshold; static const uint32_t kUnknownColumnFamily; static const int kUnknownLevelAtCreation = -1; + static const int kUnknownNumLevels = -1; + + Context() {} + + Context(uint32_t _column_family_id, int _level_at_creation, int _num_levels, + SequenceNumber _last_level_inclusive_max_seqno_threshold) + : column_family_id(_column_family_id), + level_at_creation(_level_at_creation), + num_levels(_num_levels), + last_level_inclusive_max_seqno_threshold( + _last_level_inclusive_max_seqno_threshold) {} }; ~TablePropertiesCollectorFactory() override {} diff --git a/include/rocksdb/utilities/table_properties_collectors.h b/include/rocksdb/utilities/table_properties_collectors.h index 064ce32f4a..327e80af72 100644 --- a/include/rocksdb/utilities/table_properties_collectors.h +++ b/include/rocksdb/utilities/table_properties_collectors.h @@ -84,4 +84,50 @@ std::shared_ptr NewCompactOnDeletionCollectorFactory(size_t sliding_window_size, size_t deletion_trigger, double deletion_ratio = 0); + +// A factory of a table property collector that marks a SST file as +// need-compaction when for the tiering use case, it observes, among all the +// data entries, the ratio of entries that are already eligible to be placed on +// the last level but are not yet on the last level is equal to or higher than +// the configured `compaction_trigger_ratio_`. +// 1) Setting the ratio to be equal to or smaller than 0 disables this collector +// 2) Setting the ratio to be within (0, 1] will write the number of +// observed eligible entries into a user property and marks a file as +// need-compaction when aforementioned condition is met. +// 3) Setting the ratio to be higher than 1 can be used to just writes the user +// table property, and not mark any file as need compaction. +// For a column family that does not enable tiering feature, even if an +// effective configuration is provided, this collector is still disabled. +class CompactForTieringCollectorFactory + : public TablePropertiesCollectorFactory { + public: + // @param compaction_trigger_ratio: the triggering threshold for the ratio of + // eligible entries to the total number of entries. See class documentation + // for what entry is eligible. + CompactForTieringCollectorFactory(double compaction_trigger_ratio); + + ~CompactForTieringCollectorFactory() {} + + TablePropertiesCollector* CreateTablePropertiesCollector( + TablePropertiesCollectorFactory::Context context) override; + + void SetCompactionTriggerRatio(double new_ratio) { + compaction_trigger_ratio_.store(new_ratio); + } + + double GetCompactionTriggerRatio() const { + return compaction_trigger_ratio_.load(); + } + + static const char* kClassName() { return "CompactForTieringCollector"; } + const char* Name() const override { return kClassName(); } + + std::string ToString() const override; + + private: + std::atomic compaction_trigger_ratio_; +}; + +std::shared_ptr +NewCompactForTieringCollectorFactory(double compaction_trigger_ratio); } // namespace ROCKSDB_NAMESPACE diff --git a/src.mk b/src.mk index 23cf348e1e..2206f84005 100644 --- a/src.mk +++ b/src.mk @@ -304,6 +304,7 @@ LIB_SOURCES = \ utilities/persistent_cache/volatile_tier_impl.cc \ utilities/simulator_cache/cache_simulator.cc \ utilities/simulator_cache/sim_cache.cc \ + utilities/table_properties_collectors/compact_for_tiering_collector.cc \ utilities/table_properties_collectors/compact_on_deletion_collector.cc \ utilities/trace/file_trace_reader_writer.cc \ utilities/trace/replayer_impl.cc \ @@ -628,6 +629,7 @@ TEST_MAIN_SOURCES = \ utilities/persistent_cache/persistent_cache_test.cc \ utilities/simulator_cache/cache_simulator_test.cc \ utilities/simulator_cache/sim_cache_test.cc \ + utilities/table_properties_collectors/compact_for_tiering_collector_test.cc \ utilities/table_properties_collectors/compact_on_deletion_collector_test.cc \ utilities/transactions/optimistic_transaction_test.cc \ utilities/transactions/lock/range/range_locking_test.cc \ diff --git a/table/block_based/block_based_table_builder.cc b/table/block_based/block_based_table_builder.cc index 060a541e39..87f0bd61c1 100644 --- a/table/block_based/block_based_table_builder.cc +++ b/table/block_based/block_based_table_builder.cc @@ -582,8 +582,10 @@ struct BlockBasedTableBuilder::Rep { assert(factory); std::unique_ptr collector{ - factory->CreateInternalTblPropColl(tbo.column_family_id, - tbo.level_at_creation)}; + factory->CreateInternalTblPropColl( + tbo.column_family_id, tbo.level_at_creation, + tbo.ioptions.num_levels, + tbo.last_level_inclusive_max_seqno_threshold)}; if (collector) { table_properties_collectors.emplace_back(std::move(collector)); } diff --git a/table/plain/plain_table_builder.cc b/table/plain/plain_table_builder.cc index f0443bd945..541b4a5b76 100644 --- a/table/plain/plain_table_builder.cc +++ b/table/plain/plain_table_builder.cc @@ -119,8 +119,8 @@ PlainTableBuilder::PlainTableBuilder( assert(factory); std::unique_ptr collector{ - factory->CreateInternalTblPropColl(column_family_id, - level_at_creation)}; + factory->CreateInternalTblPropColl(column_family_id, level_at_creation, + ioptions.num_levels)}; if (collector) { table_properties_collectors_.emplace_back(std::move(collector)); } diff --git a/table/sst_file_writer_collectors.h b/table/sst_file_writer_collectors.h index 5f421dffb9..28c6e03835 100644 --- a/table/sst_file_writer_collectors.h +++ b/table/sst_file_writer_collectors.h @@ -79,7 +79,9 @@ class SstFileWriterPropertiesCollectorFactory : version_(version), global_seqno_(global_seqno) {} InternalTblPropColl* CreateInternalTblPropColl( - uint32_t /*column_family_id*/, int /* level_at_creation */) override { + uint32_t /*column_family_id*/, int /* level_at_creation */, + int /* num_levels */, + SequenceNumber /* last_level_inclusive_max_seqno_threshold */) override { return new SstFileWriterPropertiesCollector(version_, global_seqno_); } diff --git a/table/table_builder.h b/table/table_builder.h index b2866c25b2..0a1944e1f3 100644 --- a/table/table_builder.h +++ b/table/table_builder.h @@ -99,7 +99,7 @@ struct TableReaderOptions { bool user_defined_timestamps_persisted; }; -struct TableBuilderOptions { +struct TableBuilderOptions : public TablePropertiesCollectorFactory::Context { TableBuilderOptions( const ImmutableOptions& _ioptions, const MutableCFOptions& _moptions, const ReadOptions& _read_options, const WriteOptions& _write_options, @@ -113,8 +113,13 @@ struct TableBuilderOptions { const int64_t _oldest_key_time = 0, const uint64_t _file_creation_time = 0, const std::string& _db_id = "", const std::string& _db_session_id = "", - const uint64_t _target_file_size = 0, const uint64_t _cur_file_num = 0) - : ioptions(_ioptions), + const uint64_t _target_file_size = 0, const uint64_t _cur_file_num = 0, + const SequenceNumber _last_level_inclusive_max_seqno_threshold = + kMaxSequenceNumber) + : TablePropertiesCollectorFactory::Context( + _column_family_id, _level, _ioptions.num_levels, + _last_level_inclusive_max_seqno_threshold), + ioptions(_ioptions), moptions(_moptions), read_options(_read_options), write_options(_write_options), @@ -122,14 +127,12 @@ struct TableBuilderOptions { internal_tbl_prop_coll_factories(_internal_tbl_prop_coll_factories), compression_type(_compression_type), compression_opts(_compression_opts), - column_family_id(_column_family_id), column_family_name(_column_family_name), oldest_key_time(_oldest_key_time), target_file_size(_target_file_size), file_creation_time(_file_creation_time), db_id(_db_id), db_session_id(_db_session_id), - level_at_creation(_level), is_bottommost(_is_bottommost), reason(_reason), cur_file_num(_cur_file_num) {} @@ -142,7 +145,6 @@ struct TableBuilderOptions { const InternalTblPropCollFactories* internal_tbl_prop_coll_factories; const CompressionType compression_type; const CompressionOptions& compression_opts; - const uint32_t column_family_id; const std::string& column_family_name; const int64_t oldest_key_time; const uint64_t target_file_size; @@ -150,7 +152,6 @@ struct TableBuilderOptions { const std::string db_id; const std::string db_session_id; // BEGIN for FilterBuildingContext - const int level_at_creation; const bool is_bottommost; const TableFileCreationReason reason; // END for FilterBuildingContext diff --git a/unreleased_history/new_features/auto_trigger_compaction_for_tiering.md b/unreleased_history/new_features/auto_trigger_compaction_for_tiering.md new file mode 100644 index 0000000000..973b40bfbc --- /dev/null +++ b/unreleased_history/new_features/auto_trigger_compaction_for_tiering.md @@ -0,0 +1 @@ +Added a `CompactForTieringCollectorFactory` to auto trigger compaction for tiering use case. \ No newline at end of file diff --git a/utilities/table_properties_collectors/compact_for_tiering_collector.cc b/utilities/table_properties_collectors/compact_for_tiering_collector.cc new file mode 100644 index 0000000000..93759ffd6f --- /dev/null +++ b/utilities/table_properties_collectors/compact_for_tiering_collector.cc @@ -0,0 +1,146 @@ +// Copyright (c) Meta Platforms, Inc. and affiliates. +// +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "utilities/table_properties_collectors/compact_for_tiering_collector.h" + +#include + +#include "db/seqno_to_time_mapping.h" +#include "rocksdb/status.h" +#include "rocksdb/types.h" +#include "rocksdb/utilities/customizable_util.h" +#include "rocksdb/utilities/object_registry.h" +#include "rocksdb/utilities/options_type.h" +#include "rocksdb/utilities/table_properties_collectors.h" +#include "util/string_util.h" + +namespace ROCKSDB_NAMESPACE { +const std::string + CompactForTieringCollector::kNumEligibleLastLevelEntriesPropertyName = + "rocksdb.eligible.last.level.entries"; + +CompactForTieringCollector::CompactForTieringCollector( + SequenceNumber last_level_inclusive_max_seqno_threshold, + double compaction_trigger_ratio) + : last_level_inclusive_max_seqno_threshold_( + last_level_inclusive_max_seqno_threshold), + compaction_trigger_ratio_(compaction_trigger_ratio) { + assert(last_level_inclusive_max_seqno_threshold_ != kMaxSequenceNumber); +} + +Status CompactForTieringCollector::AddUserKey(const Slice& /*key*/, + const Slice& value, + EntryType type, + SequenceNumber seq, + uint64_t /*file_size*/) { + SequenceNumber seq_for_check = seq; + if (type == kEntryTimedPut) { + seq_for_check = ParsePackedValueForSeqno(value); + } + if (seq_for_check < last_level_inclusive_max_seqno_threshold_) { + last_level_eligible_entries_counter_++; + } + total_entries_counter_ += 1; + return Status::OK(); +} + +Status CompactForTieringCollector::Finish(UserCollectedProperties* properties) { + assert(!finish_called_); + assert(compaction_trigger_ratio_ > 0); + if (last_level_eligible_entries_counter_ >= + compaction_trigger_ratio_ * total_entries_counter_) { + assert(compaction_trigger_ratio_ <= 1); + need_compaction_ = true; + } + if (last_level_eligible_entries_counter_ > 0) { + *properties = UserCollectedProperties{ + {kNumEligibleLastLevelEntriesPropertyName, + std::to_string(last_level_eligible_entries_counter_)}, + }; + } + finish_called_ = true; + return Status::OK(); +} + +UserCollectedProperties CompactForTieringCollector::GetReadableProperties() + const { + return UserCollectedProperties{ + {kNumEligibleLastLevelEntriesPropertyName, + std::to_string(last_level_eligible_entries_counter_)}, + }; +} + +bool CompactForTieringCollector::NeedCompact() const { + return need_compaction_; +} + +void CompactForTieringCollector::Reset() { + last_level_eligible_entries_counter_ = 0; + total_entries_counter_ = 0; + finish_called_ = false; + need_compaction_ = false; +} + +TablePropertiesCollector* +CompactForTieringCollectorFactory::CreateTablePropertiesCollector( + TablePropertiesCollectorFactory::Context context) { + double compaction_trigger_ratio = GetCompactionTriggerRatio(); + if (compaction_trigger_ratio <= 0 || + context.level_at_creation == context.num_levels - 1 || + context.last_level_inclusive_max_seqno_threshold == kMaxSequenceNumber) { + return nullptr; + } + return new CompactForTieringCollector( + context.last_level_inclusive_max_seqno_threshold, + compaction_trigger_ratio); +} + +static std::unordered_map + on_compact_for_tiering_type_info = { + {"compaction_trigger_ratio", + {0, OptionType::kUnknown, OptionVerificationType::kNormal, + OptionTypeFlags::kCompareNever | OptionTypeFlags::kMutable, + [](const ConfigOptions&, const std::string&, const std::string& value, + void* addr) { + auto* factory = + static_cast(addr); + factory->SetCompactionTriggerRatio(ParseDouble(value)); + return Status::OK(); + }, + [](const ConfigOptions&, const std::string&, const void* addr, + std::string* value) { + const auto* factory = + static_cast(addr); + *value = std::to_string(factory->GetCompactionTriggerRatio()); + return Status::OK(); + }, + nullptr}}, + +}; + +CompactForTieringCollectorFactory::CompactForTieringCollectorFactory( + double compaction_trigger_ratio) + : compaction_trigger_ratio_(compaction_trigger_ratio) { + RegisterOptions("", this, &on_compact_for_tiering_type_info); +} + +std::string CompactForTieringCollectorFactory::ToString() const { + std::ostringstream cfg; + cfg << Name() + << ", compaction trigger ratio:" << compaction_trigger_ratio_.load() + << std::endl; + return cfg.str(); +} + +std::shared_ptr +NewCompactForTieringCollectorFactory(double compaction_trigger_ratio) { + return std::make_shared( + compaction_trigger_ratio); + return std::shared_ptr( + new CompactForTieringCollectorFactory(compaction_trigger_ratio)); +} + +} // namespace ROCKSDB_NAMESPACE diff --git a/utilities/table_properties_collectors/compact_for_tiering_collector.h b/utilities/table_properties_collectors/compact_for_tiering_collector.h new file mode 100644 index 0000000000..55497b671f --- /dev/null +++ b/utilities/table_properties_collectors/compact_for_tiering_collector.h @@ -0,0 +1,45 @@ +// Copyright (c) Meta Platforms, Inc. and affiliates. +// +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#include "rocksdb/utilities/table_properties_collectors.h" + +namespace ROCKSDB_NAMESPACE { + +// A user property collector that marks a SST file as need-compaction when for +// the tiering use case. See documentation for +// `CompactForTieringCollectorFactory`. +class CompactForTieringCollector : public TablePropertiesCollector { + public: + static const std::string kNumEligibleLastLevelEntriesPropertyName; + + CompactForTieringCollector( + SequenceNumber last_level_inclusive_max_seqno_threshold_, + double compaction_trigger_ratio); + + Status AddUserKey(const Slice& key, const Slice& value, EntryType type, + SequenceNumber seq, uint64_t file_size) override; + + Status Finish(UserCollectedProperties* properties) override; + + UserCollectedProperties GetReadableProperties() const override; + + const char* Name() const override { return "CompactForTieringCollector"; } + + bool NeedCompact() const override; + + private: + void Reset(); + + SequenceNumber last_level_inclusive_max_seqno_threshold_; + double compaction_trigger_ratio_; + size_t last_level_eligible_entries_counter_ = 0; + size_t total_entries_counter_ = 0; + bool finish_called_ = false; + bool need_compaction_ = false; +}; +} // namespace ROCKSDB_NAMESPACE diff --git a/utilities/table_properties_collectors/compact_for_tiering_collector_test.cc b/utilities/table_properties_collectors/compact_for_tiering_collector_test.cc new file mode 100644 index 0000000000..a054e402f3 --- /dev/null +++ b/utilities/table_properties_collectors/compact_for_tiering_collector_test.cc @@ -0,0 +1,139 @@ +// Copyright (c) Meta Platforms, Inc. and affiliates. +// +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "utilities/table_properties_collectors/compact_for_tiering_collector.h" + +#include +#include +#include +#include +#include + +#include "db/seqno_to_time_mapping.h" +#include "port/stack_trace.h" +#include "rocksdb/table.h" +#include "rocksdb/table_properties.h" +#include "rocksdb/utilities/table_properties_collectors.h" +#include "test_util/testharness.h" +#include "util/random.h" + +namespace ROCKSDB_NAMESPACE { + +TEST(CompactForTieringCollector, NotEnabled) { + TablePropertiesCollectorFactory::Context context; + context.column_family_id = 1; + context.level_at_creation = 1; + context.num_levels = 6; + context.last_level_inclusive_max_seqno_threshold = 50; + + // Set compaction trigger ratio to 0 to disable it. No collector created. + auto factory = NewCompactForTieringCollectorFactory(0); + std::unique_ptr collector( + factory->CreateTablePropertiesCollector(context)); + ASSERT_EQ(nullptr, collector); +} + +TEST(CompactForTieringCollector, TieringDisabled) { + TablePropertiesCollectorFactory::Context context; + context.column_family_id = 1; + context.level_at_creation = 1; + context.num_levels = 6; + context.last_level_inclusive_max_seqno_threshold = kMaxSequenceNumber; + + // Tiering is disabled on the column family. No collector created. + { + for (double compaction_trigger_ratio : {0.0, 0.1, 1.0, 1.5}) { + auto factory = + NewCompactForTieringCollectorFactory(compaction_trigger_ratio); + std::unique_ptr collector( + factory->CreateTablePropertiesCollector(context)); + ASSERT_EQ(nullptr, collector); + } + } +} + +TEST(CompactForTieringCollector, LastLevelFile) { + TablePropertiesCollectorFactory::Context context; + context.column_family_id = 1; + context.level_at_creation = 5; + context.num_levels = 6; + context.last_level_inclusive_max_seqno_threshold = 50; + + // No collector created for a file that is already on the last level. + { + for (double compaction_trigger_ratio : {0.0, 0.1, 1.0, 1.5}) { + auto factory = + NewCompactForTieringCollectorFactory(compaction_trigger_ratio); + std::unique_ptr collector( + factory->CreateTablePropertiesCollector(context)); + ASSERT_EQ(nullptr, collector); + } + } +} + +TEST(CompactForTieringCollector, CollectorEnabled) { + TablePropertiesCollectorFactory::Context context; + context.column_family_id = 1; + context.level_at_creation = 1; + context.num_levels = 6; + context.last_level_inclusive_max_seqno_threshold = 50; + const size_t kTotalEntries = 100; + + { + for (double compaction_trigger_ratio : {0.1, 0.33333333, 0.5, 1.0, 1.5}) { + auto factory = + NewCompactForTieringCollectorFactory(compaction_trigger_ratio); + std::unique_ptr collector( + factory->CreateTablePropertiesCollector(context)); + for (size_t i = 0; i < kTotalEntries; i++) { + ASSERT_OK(collector->AddUserKey("hello", "rocksdb", kEntryPut, i, 0)); + ASSERT_FALSE(collector->NeedCompact()); + } + UserCollectedProperties user_properties; + ASSERT_OK(collector->Finish(&user_properties)); + ASSERT_EQ(user_properties[CompactForTieringCollector:: + kNumEligibleLastLevelEntriesPropertyName], + std::to_string(50)); + if (compaction_trigger_ratio > 0.5) { + ASSERT_FALSE(collector->NeedCompact()); + } else { + ASSERT_TRUE(collector->NeedCompact()); + } + } + } +} + +TEST(CompactForTieringCollector, TimedPutEntries) { + TablePropertiesCollectorFactory::Context context; + context.column_family_id = 1; + context.level_at_creation = 1; + context.num_levels = 6; + context.last_level_inclusive_max_seqno_threshold = 50; + const size_t kTotalEntries = 100; + + auto factory = NewCompactForTieringCollectorFactory(0.1); + std::unique_ptr collector( + factory->CreateTablePropertiesCollector(context)); + for (size_t i = 0; i < kTotalEntries; i++) { + std::string value; + PackValueAndSeqno("rocksdb", i, &value); + ASSERT_OK(collector->AddUserKey("hello", value, kEntryTimedPut, 0, 0)); + ASSERT_FALSE(collector->NeedCompact()); + } + UserCollectedProperties user_properties; + ASSERT_OK(collector->Finish(&user_properties)); + ASSERT_EQ(user_properties[CompactForTieringCollector:: + kNumEligibleLastLevelEntriesPropertyName], + std::to_string(50)); + ASSERT_TRUE(collector->NeedCompact()); +} +} // namespace ROCKSDB_NAMESPACE + +int main(int argc, char** argv) { + ROCKSDB_NAMESPACE::port::InstallStackTraceHandler(); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} \ No newline at end of file diff --git a/utilities/table_properties_collectors/compact_on_deletion_collector.cc b/utilities/table_properties_collectors/compact_on_deletion_collector.cc index a3b5189da0..a175d0a016 100644 --- a/utilities/table_properties_collectors/compact_on_deletion_collector.cc +++ b/utilities/table_properties_collectors/compact_on_deletion_collector.cc @@ -188,6 +188,7 @@ NewCompactOnDeletionCollectorFactory(size_t sliding_window_size, new CompactOnDeletionCollectorFactory(sliding_window_size, deletion_trigger, deletion_ratio)); } + namespace { static int RegisterTablePropertiesCollectorFactories( ObjectLibrary& library, const std::string& /*arg*/) { @@ -202,6 +203,17 @@ static int RegisterTablePropertiesCollectorFactories( guard->reset(new CompactOnDeletionCollectorFactory(0, 0, 0)); return guard->get(); }); + library.AddFactory( + CompactForTieringCollectorFactory::kClassName(), + [](const std::string& /*uri*/, + std::unique_ptr* guard, + std::string* /* errmsg */) { + // By default, create a `CompactForTieringCollectorFactory` that is + // disabled. Users will need to call corresponding setters to enable + // the factory. + guard->reset(new CompactForTieringCollectorFactory(0)); + return guard->get(); + }); return 1; } } // namespace diff --git a/utilities/table_properties_collectors/compact_on_deletion_collector_test.cc b/utilities/table_properties_collectors/compact_on_deletion_collector_test.cc index 34a47dbf3f..9fec089fc1 100644 --- a/utilities/table_properties_collectors/compact_on_deletion_collector_test.cc +++ b/utilities/table_properties_collectors/compact_on_deletion_collector_test.cc @@ -14,6 +14,7 @@ #include #include +#include "db/dbformat.h" #include "port/stack_trace.h" #include "rocksdb/table.h" #include "rocksdb/table_properties.h" @@ -27,6 +28,7 @@ TEST(CompactOnDeletionCollector, DeletionRatio) { TablePropertiesCollectorFactory::Context context; context.column_family_id = TablePropertiesCollectorFactory::Context::kUnknownColumnFamily; + context.last_level_inclusive_max_seqno_threshold = kMaxSequenceNumber; const size_t kTotalEntries = 100; { @@ -86,6 +88,7 @@ TEST(CompactOnDeletionCollector, SlidingWindow) { TablePropertiesCollectorFactory::Context context; context.column_family_id = TablePropertiesCollectorFactory::Context::kUnknownColumnFamily; + context.last_level_inclusive_max_seqno_threshold = kMaxSequenceNumber; std::vector window_sizes; std::vector deletion_triggers;