Add CompactForTieringCollector to support automatically trigger compaction for tiering use case (#12760)

Summary:
This PR adds user property collector factory `CompactForTieringCollectorFactory` to support observe SST file and mark it as need compaction for fast tracking data to the proper tier.

A triggering ratio `compaction_trigger_ratio_` can be configured to achieve the following:
1) Setting the ratio to be equal to or smaller than 0 disables this collector
2) Setting the ratio to be within (0, 1] will write the number of observed eligible entries into a user property and marks a file as need-compaction when aforementioned condition is met.
3) Setting the ratio to be higher than 1 can be used to just writes the user table property, and not mark any file as need compaction.
 For a column family that does not enable tiering feature, even if an effective configuration is provided, this collector is still disabled. For a file that is already on the last level, this collector is also disabled.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/12760

Test Plan: Added unit tests

Reviewed By: pdillinger

Differential Revision: D58734976

Pulled By: jowlyzhang

fbshipit-source-id: 6daab2c4f62b5c6689c3c03e3b3907bbbe6b7a81
This commit is contained in:
Yu Zhang 2024-06-18 10:51:29 -07:00 committed by Facebook GitHub Bot
parent 9f95aa8269
commit c73cf7a878
32 changed files with 660 additions and 65 deletions

View File

@ -939,6 +939,7 @@ set(SOURCES
utilities/persistent_cache/volatile_tier_impl.cc utilities/persistent_cache/volatile_tier_impl.cc
utilities/simulator_cache/cache_simulator.cc utilities/simulator_cache/cache_simulator.cc
utilities/simulator_cache/sim_cache.cc utilities/simulator_cache/sim_cache.cc
utilities/table_properties_collectors/compact_for_tiering_collector.cc
utilities/table_properties_collectors/compact_on_deletion_collector.cc utilities/table_properties_collectors/compact_on_deletion_collector.cc
utilities/trace/file_trace_reader_writer.cc utilities/trace/file_trace_reader_writer.cc
utilities/trace/replayer_impl.cc utilities/trace/replayer_impl.cc
@ -1481,6 +1482,7 @@ if(WITH_TESTS)
utilities/persistent_cache/persistent_cache_test.cc utilities/persistent_cache/persistent_cache_test.cc
utilities/simulator_cache/cache_simulator_test.cc utilities/simulator_cache/cache_simulator_test.cc
utilities/simulator_cache/sim_cache_test.cc utilities/simulator_cache/sim_cache_test.cc
utilities/table_properties_collectors/compact_for_tiering_collector_test.cc
utilities/table_properties_collectors/compact_on_deletion_collector_test.cc utilities/table_properties_collectors/compact_on_deletion_collector_test.cc
utilities/transactions/optimistic_transaction_test.cc utilities/transactions/optimistic_transaction_test.cc
utilities/transactions/transaction_test.cc utilities/transactions/transaction_test.cc

View File

@ -1642,6 +1642,9 @@ compaction_job_stats_test: $(OBJ_DIR)/db/compaction/compaction_job_stats_test.o
compaction_service_test: $(OBJ_DIR)/db/compaction/compaction_service_test.o $(TEST_LIBRARY) $(LIBRARY) compaction_service_test: $(OBJ_DIR)/db/compaction/compaction_service_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK) $(AM_LINK)
compact_for_tiering_collector_test: $(OBJ_DIR)/utilities/table_properties_collectors/compact_for_tiering_collector_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)
compact_on_deletion_collector_test: $(OBJ_DIR)/utilities/table_properties_collectors/compact_on_deletion_collector_test.o $(TEST_LIBRARY) $(LIBRARY) compact_on_deletion_collector_test: $(OBJ_DIR)/utilities/table_properties_collectors/compact_on_deletion_collector_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK) $(AM_LINK)

View File

@ -317,6 +317,7 @@ cpp_library_wrapper(name="rocksdb_lib", srcs=[
"utilities/persistent_cache/volatile_tier_impl.cc", "utilities/persistent_cache/volatile_tier_impl.cc",
"utilities/simulator_cache/cache_simulator.cc", "utilities/simulator_cache/cache_simulator.cc",
"utilities/simulator_cache/sim_cache.cc", "utilities/simulator_cache/sim_cache.cc",
"utilities/table_properties_collectors/compact_for_tiering_collector.cc",
"utilities/table_properties_collectors/compact_on_deletion_collector.cc", "utilities/table_properties_collectors/compact_on_deletion_collector.cc",
"utilities/trace/file_trace_reader_writer.cc", "utilities/trace/file_trace_reader_writer.cc",
"utilities/trace/replayer_impl.cc", "utilities/trace/replayer_impl.cc",
@ -4626,6 +4627,12 @@ cpp_unittest_wrapper(name="compact_files_test",
extra_compiler_flags=[]) extra_compiler_flags=[])
cpp_unittest_wrapper(name="compact_for_tiering_collector_test",
srcs=["utilities/table_properties_collectors/compact_for_tiering_collector_test.cc"],
deps=[":rocksdb_test_lib"],
extra_compiler_flags=[])
cpp_unittest_wrapper(name="compact_on_deletion_collector_test", cpp_unittest_wrapper(name="compact_on_deletion_collector_test",
srcs=["utilities/table_properties_collectors/compact_on_deletion_collector_test.cc"], srcs=["utilities/table_properties_collectors/compact_on_deletion_collector_test.cc"],
deps=[":rocksdb_test_lib"], deps=[":rocksdb_test_lib"],

View File

@ -64,7 +64,7 @@ Status BuildTable(
std::vector<std::unique_ptr<FragmentedRangeTombstoneIterator>> std::vector<std::unique_ptr<FragmentedRangeTombstoneIterator>>
range_del_iters, range_del_iters,
FileMetaData* meta, std::vector<BlobFileAddition>* blob_file_additions, FileMetaData* meta, std::vector<BlobFileAddition>* blob_file_additions,
std::vector<SequenceNumber> snapshots, std::vector<SequenceNumber> snapshots, SequenceNumber earliest_snapshot,
SequenceNumber earliest_write_conflict_snapshot, SequenceNumber earliest_write_conflict_snapshot,
SequenceNumber job_snapshot, SnapshotChecker* snapshot_checker, SequenceNumber job_snapshot, SnapshotChecker* snapshot_checker,
bool paranoid_file_checks, InternalStats* internal_stats, bool paranoid_file_checks, InternalStats* internal_stats,
@ -195,7 +195,7 @@ Status BuildTable(
const std::atomic<bool> kManualCompactionCanceledFalse{false}; const std::atomic<bool> kManualCompactionCanceledFalse{false};
CompactionIterator c_iter( CompactionIterator c_iter(
iter, ucmp, &merge, kMaxSequenceNumber, &snapshots, iter, ucmp, &merge, kMaxSequenceNumber, &snapshots, earliest_snapshot,
earliest_write_conflict_snapshot, job_snapshot, snapshot_checker, env, earliest_write_conflict_snapshot, job_snapshot, snapshot_checker, env,
ShouldReportDetailedTime(env, ioptions.stats), ShouldReportDetailedTime(env, ioptions.stats),
true /* internal key corruption is not ok */, range_del_agg.get(), true /* internal key corruption is not ok */, range_del_agg.get(),

View File

@ -57,7 +57,7 @@ Status BuildTable(
std::vector<std::unique_ptr<FragmentedRangeTombstoneIterator>> std::vector<std::unique_ptr<FragmentedRangeTombstoneIterator>>
range_del_iters, range_del_iters,
FileMetaData* meta, std::vector<BlobFileAddition>* blob_file_additions, FileMetaData* meta, std::vector<BlobFileAddition>* blob_file_additions,
std::vector<SequenceNumber> snapshots, std::vector<SequenceNumber> snapshots, SequenceNumber earliest_snapshot,
SequenceNumber earliest_write_conflict_snapshot, SequenceNumber earliest_write_conflict_snapshot,
SequenceNumber job_snapshot, SnapshotChecker* snapshot_checker, SequenceNumber job_snapshot, SnapshotChecker* snapshot_checker,
bool paranoid_file_checks, InternalStats* internal_stats, bool paranoid_file_checks, InternalStats* internal_stats,

View File

@ -25,6 +25,7 @@ namespace ROCKSDB_NAMESPACE {
CompactionIterator::CompactionIterator( CompactionIterator::CompactionIterator(
InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper, InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper,
SequenceNumber last_sequence, std::vector<SequenceNumber>* snapshots, SequenceNumber last_sequence, std::vector<SequenceNumber>* snapshots,
SequenceNumber earliest_snapshot,
SequenceNumber earliest_write_conflict_snapshot, SequenceNumber earliest_write_conflict_snapshot,
SequenceNumber job_snapshot, const SnapshotChecker* snapshot_checker, SequenceNumber job_snapshot, const SnapshotChecker* snapshot_checker,
Env* env, bool report_detailed_time, bool expect_valid_internal_key, Env* env, bool report_detailed_time, bool expect_valid_internal_key,
@ -40,7 +41,7 @@ CompactionIterator::CompactionIterator(
const SequenceNumber preserve_time_min_seqno, const SequenceNumber preserve_time_min_seqno,
const SequenceNumber preclude_last_level_min_seqno) const SequenceNumber preclude_last_level_min_seqno)
: CompactionIterator( : CompactionIterator(
input, cmp, merge_helper, last_sequence, snapshots, input, cmp, merge_helper, last_sequence, snapshots, earliest_snapshot,
earliest_write_conflict_snapshot, job_snapshot, snapshot_checker, env, earliest_write_conflict_snapshot, job_snapshot, snapshot_checker, env,
report_detailed_time, expect_valid_internal_key, range_del_agg, report_detailed_time, expect_valid_internal_key, range_del_agg,
blob_file_builder, allow_data_in_errors, enforce_single_del_contracts, blob_file_builder, allow_data_in_errors, enforce_single_del_contracts,
@ -54,6 +55,7 @@ CompactionIterator::CompactionIterator(
CompactionIterator::CompactionIterator( CompactionIterator::CompactionIterator(
InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper, InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper,
SequenceNumber /*last_sequence*/, std::vector<SequenceNumber>* snapshots, SequenceNumber /*last_sequence*/, std::vector<SequenceNumber>* snapshots,
SequenceNumber earliest_snapshot,
SequenceNumber earliest_write_conflict_snapshot, SequenceNumber earliest_write_conflict_snapshot,
SequenceNumber job_snapshot, const SnapshotChecker* snapshot_checker, SequenceNumber job_snapshot, const SnapshotChecker* snapshot_checker,
Env* env, bool report_detailed_time, bool expect_valid_internal_key, Env* env, bool report_detailed_time, bool expect_valid_internal_key,
@ -91,9 +93,7 @@ CompactionIterator::CompactionIterator(
// snapshots_ cannot be nullptr, but we will assert later in the body of // snapshots_ cannot be nullptr, but we will assert later in the body of
// the constructor. // the constructor.
visible_at_tip_(snapshots_ ? snapshots_->empty() : false), visible_at_tip_(snapshots_ ? snapshots_->empty() : false),
earliest_snapshot_(!snapshots_ || snapshots_->empty() earliest_snapshot_(earliest_snapshot),
? kMaxSequenceNumber
: snapshots_->at(0)),
info_log_(info_log), info_log_(info_log),
allow_data_in_errors_(allow_data_in_errors), allow_data_in_errors_(allow_data_in_errors),
enforce_single_del_contracts_(enforce_single_del_contracts), enforce_single_del_contracts_(enforce_single_del_contracts),

View File

@ -203,6 +203,7 @@ class CompactionIterator {
CompactionIterator( CompactionIterator(
InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper, InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper,
SequenceNumber last_sequence, std::vector<SequenceNumber>* snapshots, SequenceNumber last_sequence, std::vector<SequenceNumber>* snapshots,
SequenceNumber earliest_snapshot,
SequenceNumber earliest_write_conflict_snapshot, SequenceNumber earliest_write_conflict_snapshot,
SequenceNumber job_snapshot, const SnapshotChecker* snapshot_checker, SequenceNumber job_snapshot, const SnapshotChecker* snapshot_checker,
Env* env, bool report_detailed_time, bool expect_valid_internal_key, Env* env, bool report_detailed_time, bool expect_valid_internal_key,
@ -222,6 +223,7 @@ class CompactionIterator {
CompactionIterator( CompactionIterator(
InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper, InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper,
SequenceNumber last_sequence, std::vector<SequenceNumber>* snapshots, SequenceNumber last_sequence, std::vector<SequenceNumber>* snapshots,
SequenceNumber earliest_snapshot,
SequenceNumber earliest_write_conflict_snapshot, SequenceNumber earliest_write_conflict_snapshot,
SequenceNumber job_snapshot, const SnapshotChecker* snapshot_checker, SequenceNumber job_snapshot, const SnapshotChecker* snapshot_checker,
Env* env, bool report_detailed_time, bool expect_valid_internal_key, Env* env, bool report_detailed_time, bool expect_valid_internal_key,

View File

@ -296,6 +296,7 @@ class CompactionIteratorTest : public testing::TestWithParam<bool> {
iter_->SeekToFirst(); iter_->SeekToFirst();
c_iter_.reset(new CompactionIterator( c_iter_.reset(new CompactionIterator(
iter_.get(), cmp_, merge_helper_.get(), last_sequence, &snapshots_, iter_.get(), cmp_, merge_helper_.get(), last_sequence, &snapshots_,
snapshots_.empty() ? kMaxSequenceNumber : snapshots_.at(0),
earliest_write_conflict_snapshot, kMaxSequenceNumber, earliest_write_conflict_snapshot, kMaxSequenceNumber,
snapshot_checker_.get(), Env::Default(), snapshot_checker_.get(), Env::Default(),
false /* report_detailed_time */, false, range_del_agg_.get(), false /* report_detailed_time */, false, range_del_agg_.get(),

View File

@ -174,6 +174,9 @@ CompactionJob::CompactionJob(
db_mutex_(db_mutex), db_mutex_(db_mutex),
db_error_handler_(db_error_handler), db_error_handler_(db_error_handler),
existing_snapshots_(std::move(existing_snapshots)), existing_snapshots_(std::move(existing_snapshots)),
earliest_snapshot_(existing_snapshots_.empty()
? kMaxSequenceNumber
: existing_snapshots_.at(0)),
earliest_write_conflict_snapshot_(earliest_write_conflict_snapshot), earliest_write_conflict_snapshot_(earliest_write_conflict_snapshot),
snapshot_checker_(snapshot_checker), snapshot_checker_(snapshot_checker),
job_context_(job_context), job_context_(job_context),
@ -282,6 +285,7 @@ void CompactionJob::Prepare() {
// collect all seqno->time information from the input files which will be used // collect all seqno->time information from the input files which will be used
// to encode seqno->time to the output files. // to encode seqno->time to the output files.
uint64_t preserve_time_duration = uint64_t preserve_time_duration =
std::max(c->immutable_options()->preserve_internal_time_seconds, std::max(c->immutable_options()->preserve_internal_time_seconds,
c->immutable_options()->preclude_last_level_data_seconds); c->immutable_options()->preclude_last_level_data_seconds);
@ -319,28 +323,11 @@ void CompactionJob::Prepare() {
seqno_to_time_mapping_.Enforce(); seqno_to_time_mapping_.Enforce();
} else { } else {
seqno_to_time_mapping_.Enforce(_current_time); seqno_to_time_mapping_.Enforce(_current_time);
uint64_t preserve_time = seqno_to_time_mapping_.GetCurrentTieringCutoffSeqnos(
static_cast<uint64_t>(_current_time) > preserve_time_duration static_cast<uint64_t>(_current_time),
? _current_time - preserve_time_duration c->immutable_options()->preserve_internal_time_seconds,
: 0; c->immutable_options()->preclude_last_level_data_seconds,
// GetProximalSeqnoBeforeTime tells us the last seqno known to have been &preserve_time_min_seqno_, &preclude_last_level_min_seqno_);
// written at or before the given time. + 1 to get the minimum we should
// preserve without excluding anything that might have been written on or
// after the given time.
preserve_time_min_seqno_ =
seqno_to_time_mapping_.GetProximalSeqnoBeforeTime(preserve_time) + 1;
if (c->immutable_options()->preclude_last_level_data_seconds > 0) {
uint64_t preclude_last_level_time =
static_cast<uint64_t>(_current_time) >
c->immutable_options()->preclude_last_level_data_seconds
? _current_time -
c->immutable_options()->preclude_last_level_data_seconds
: 0;
preclude_last_level_min_seqno_ =
seqno_to_time_mapping_.GetProximalSeqnoBeforeTime(
preclude_last_level_time) +
1;
}
} }
// For accuracy of the GetProximalSeqnoBeforeTime queries above, we only // For accuracy of the GetProximalSeqnoBeforeTime queries above, we only
// limit the capacity after them. // limit the capacity after them.
@ -1295,8 +1282,9 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
auto c_iter = std::make_unique<CompactionIterator>( auto c_iter = std::make_unique<CompactionIterator>(
input, cfd->user_comparator(), &merge, versions_->LastSequence(), input, cfd->user_comparator(), &merge, versions_->LastSequence(),
&existing_snapshots_, earliest_write_conflict_snapshot_, job_snapshot_seq, &existing_snapshots_, earliest_snapshot_,
snapshot_checker_, env_, ShouldReportDetailedTime(env_, stats_), earliest_write_conflict_snapshot_, job_snapshot_seq, snapshot_checker_,
env_, ShouldReportDetailedTime(env_, stats_),
/*expect_valid_internal_key=*/true, range_del_agg.get(), /*expect_valid_internal_key=*/true, range_del_agg.get(),
blob_file_builder.get(), db_options_.allow_data_in_errors, blob_file_builder.get(), db_options_.allow_data_in_errors,
db_options_.enforce_single_del_contracts, manual_compaction_canceled_, db_options_.enforce_single_del_contracts, manual_compaction_canceled_,
@ -1969,7 +1957,10 @@ Status CompactionJob::OpenCompactionOutputFile(SubcompactionState* sub_compact,
cfd->GetName(), sub_compact->compaction->output_level(), cfd->GetName(), sub_compact->compaction->output_level(),
bottommost_level_, TableFileCreationReason::kCompaction, bottommost_level_, TableFileCreationReason::kCompaction,
0 /* oldest_key_time */, current_time, db_id_, db_session_id_, 0 /* oldest_key_time */, current_time, db_id_, db_session_id_,
sub_compact->compaction->max_output_file_size(), file_number); sub_compact->compaction->max_output_file_size(), file_number,
preclude_last_level_min_seqno_ == kMaxSequenceNumber
? preclude_last_level_min_seqno_
: std::min(earliest_snapshot_, preclude_last_level_min_seqno_));
outputs.NewBuilder(tboptions); outputs.NewBuilder(tboptions);

View File

@ -314,6 +314,8 @@ class CompactionJob {
// deleted because that version is not visible in any snapshot. // deleted because that version is not visible in any snapshot.
std::vector<SequenceNumber> existing_snapshots_; std::vector<SequenceNumber> existing_snapshots_;
SequenceNumber earliest_snapshot_;
// This is the earliest snapshot that could be used for write-conflict // This is the earliest snapshot that could be used for write-conflict
// checking by a transaction. For any user-key newer than this snapshot, we // checking by a transaction. For any user-key newer than this snapshot, we
// should make sure not to remove evidence that a write occurred. // should make sure not to remove evidence that a write occurred.

View File

@ -13,6 +13,7 @@
#include "rocksdb/iostats_context.h" #include "rocksdb/iostats_context.h"
#include "rocksdb/listener.h" #include "rocksdb/listener.h"
#include "rocksdb/utilities/debug.h" #include "rocksdb/utilities/debug.h"
#include "rocksdb/utilities/table_properties_collectors.h"
#include "test_util/mock_time_env.h" #include "test_util/mock_time_env.h"
#include "utilities/merge_operators.h" #include "utilities/merge_operators.h"
@ -1734,6 +1735,82 @@ TEST_P(TimedPutPrecludeLastLevelTest, PreserveTimedPutOnPenultimateLevel) {
Close(); Close();
} }
TEST_P(TimedPutPrecludeLastLevelTest, AutoTriggerCompaction) {
const int kNumTrigger = 10;
const int kNumLevels = 7;
const int kNumKeys = 200;
Options options = CurrentOptions();
options.compaction_style = kCompactionStyleUniversal;
options.preclude_last_level_data_seconds = 60;
options.preserve_internal_time_seconds = 0;
options.env = mock_env_.get();
options.level0_file_num_compaction_trigger = kNumTrigger;
options.num_levels = kNumLevels;
options.last_level_temperature = Temperature::kCold;
ConfigOptions config_options;
config_options.ignore_unsupported_options = false;
std::shared_ptr<TablePropertiesCollectorFactory> factory;
std::string id = CompactForTieringCollectorFactory::kClassName();
ASSERT_OK(TablePropertiesCollectorFactory::CreateFromString(
config_options, "compaction_trigger_ratio=0.4; id=" + id, &factory));
auto collector_factory =
factory->CheckedCast<CompactForTieringCollectorFactory>();
options.table_properties_collector_factories.push_back(factory);
DestroyAndReopen(options);
WriteOptions wo;
wo.protection_bytes_per_key = GetParam();
Random rnd(301);
dbfull()->TEST_WaitForPeriodicTaskRun([&] {
mock_clock_->MockSleepForSeconds(static_cast<int>(rnd.Uniform(10) + 1));
});
for (int i = 0; i < kNumKeys / 4; i++) {
ASSERT_OK(Put(Key(i), rnd.RandomString(100), wo));
dbfull()->TEST_WaitForPeriodicTaskRun([&] {
mock_clock_->MockSleepForSeconds(static_cast<int>(rnd.Uniform(2)));
});
}
// Create one file with regular Put.
ASSERT_OK(Flush());
// Create one file with TimedPut.
// These data are eligible to be put on the last level once written to db
// and compaction will fast track them to the last level.
for (int i = kNumKeys / 4; i < kNumKeys / 2; i++) {
ASSERT_OK(TimedPut(0, Key(i), rnd.RandomString(100), 50, wo));
}
ASSERT_OK(Flush());
// TimedPut file moved to the last level via auto triggered compaction.
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ("1,0,0,0,0,0,1", FilesPerLevel());
ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), 0);
ASSERT_GT(GetSstSizeHelper(Temperature::kCold), 0);
collector_factory->SetCompactionTriggerRatio(1.1);
for (int i = kNumKeys / 2; i < kNumKeys * 3 / 4; i++) {
ASSERT_OK(TimedPut(0, Key(i), rnd.RandomString(100), 50, wo));
}
ASSERT_OK(Flush());
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ("2,0,0,0,0,0,1", FilesPerLevel());
collector_factory->SetCompactionTriggerRatio(0);
for (int i = kNumKeys * 3 / 4; i < kNumKeys; i++) {
ASSERT_OK(TimedPut(0, Key(i), rnd.RandomString(100), 50, wo));
}
ASSERT_OK(Flush());
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ("3,0,0,0,0,0,1", FilesPerLevel());
Close();
}
INSTANTIATE_TEST_CASE_P(TimedPutPrecludeLastLevelTest, INSTANTIATE_TEST_CASE_P(TimedPutPrecludeLastLevelTest,
TimedPutPrecludeLastLevelTest, ::testing::Values(0, 8)); TimedPutPrecludeLastLevelTest, ::testing::Values(0, 8));

View File

@ -1670,6 +1670,8 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
SequenceNumber earliest_write_conflict_snapshot; SequenceNumber earliest_write_conflict_snapshot;
std::vector<SequenceNumber> snapshot_seqs = std::vector<SequenceNumber> snapshot_seqs =
snapshots_.GetAll(&earliest_write_conflict_snapshot); snapshots_.GetAll(&earliest_write_conflict_snapshot);
SequenceNumber earliest_snapshot =
(snapshot_seqs.empty() ? kMaxSequenceNumber : snapshot_seqs.at(0));
auto snapshot_checker = snapshot_checker_.get(); auto snapshot_checker = snapshot_checker_.get();
if (use_custom_gc_ && snapshot_checker == nullptr) { if (use_custom_gc_ && snapshot_checker == nullptr) {
snapshot_checker = DisableGCSnapshotChecker::Instance(); snapshot_checker = DisableGCSnapshotChecker::Instance();
@ -1689,6 +1691,7 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
IOStatus io_s; IOStatus io_s;
const ReadOptions read_option(Env::IOActivity::kDBOpen); const ReadOptions read_option(Env::IOActivity::kDBOpen);
const WriteOptions write_option(Env::IO_HIGH, Env::IOActivity::kDBOpen); const WriteOptions write_option(Env::IO_HIGH, Env::IOActivity::kDBOpen);
TableBuilderOptions tboptions( TableBuilderOptions tboptions(
*cfd->ioptions(), mutable_cf_options, read_option, write_option, *cfd->ioptions(), mutable_cf_options, read_option, write_option,
cfd->internal_comparator(), cfd->internal_tbl_prop_coll_factories(), cfd->internal_comparator(), cfd->internal_tbl_prop_coll_factories(),
@ -1697,21 +1700,22 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
0 /* level */, false /* is_bottommost */, 0 /* level */, false /* is_bottommost */,
TableFileCreationReason::kRecovery, 0 /* oldest_key_time */, TableFileCreationReason::kRecovery, 0 /* oldest_key_time */,
0 /* file_creation_time */, db_id_, db_session_id_, 0 /* file_creation_time */, db_id_, db_session_id_,
0 /* target_file_size */, meta.fd.GetNumber()); 0 /* target_file_size */, meta.fd.GetNumber(), kMaxSequenceNumber);
Version* version = cfd->current(); Version* version = cfd->current();
version->Ref(); version->Ref();
uint64_t num_input_entries = 0; uint64_t num_input_entries = 0;
s = BuildTable( s = BuildTable(dbname_, versions_.get(), immutable_db_options_, tboptions,
dbname_, versions_.get(), immutable_db_options_, tboptions, file_options_for_compaction_, cfd->table_cache(),
file_options_for_compaction_, cfd->table_cache(), iter.get(), iter.get(), std::move(range_del_iters), &meta,
std::move(range_del_iters), &meta, &blob_file_additions, &blob_file_additions, snapshot_seqs, earliest_snapshot,
snapshot_seqs, earliest_write_conflict_snapshot, kMaxSequenceNumber, earliest_write_conflict_snapshot, kMaxSequenceNumber,
snapshot_checker, paranoid_file_checks, cfd->internal_stats(), &io_s, snapshot_checker, paranoid_file_checks,
io_tracer_, BlobFileCreationReason::kRecovery, cfd->internal_stats(), &io_s, io_tracer_,
nullptr /* seqno_to_time_mapping */, &event_logger_, job_id, BlobFileCreationReason::kRecovery,
nullptr /* table_properties */, write_hint, nullptr /* seqno_to_time_mapping */, &event_logger_,
nullptr /*full_history_ts_low*/, &blob_callback_, version, job_id, nullptr /* table_properties */, write_hint,
&num_input_entries); nullptr /*full_history_ts_low*/, &blob_callback_, version,
&num_input_entries);
version->Unref(); version->Unref();
LogFlush(immutable_db_options_.info_log); LogFlush(immutable_db_options_.info_log);
ROCKS_LOG_DEBUG(immutable_db_options_.info_log, ROCKS_LOG_DEBUG(immutable_db_options_.info_log,

View File

@ -115,6 +115,9 @@ FlushJob::FlushJob(
db_mutex_(db_mutex), db_mutex_(db_mutex),
shutting_down_(shutting_down), shutting_down_(shutting_down),
existing_snapshots_(std::move(existing_snapshots)), existing_snapshots_(std::move(existing_snapshots)),
earliest_snapshot_(existing_snapshots_.empty()
? kMaxSequenceNumber
: existing_snapshots_.at(0)),
earliest_write_conflict_snapshot_(earliest_write_conflict_snapshot), earliest_write_conflict_snapshot_(earliest_write_conflict_snapshot),
snapshot_checker_(snapshot_checker), snapshot_checker_(snapshot_checker),
job_context_(job_context), job_context_(job_context),
@ -194,6 +197,7 @@ void FlushJob::PickMemTable() {
// Track effective cutoff user-defined timestamp during flush if // Track effective cutoff user-defined timestamp during flush if
// user-defined timestamps can be stripped. // user-defined timestamps can be stripped.
GetEffectiveCutoffUDTForPickedMemTables(); GetEffectiveCutoffUDTForPickedMemTables();
GetPrecludeLastLevelMinSeqno();
ReportFlushInputSize(mems_); ReportFlushInputSize(mems_);
@ -502,7 +506,7 @@ Status FlushJob::MemPurge() {
const std::atomic<bool> kManualCompactionCanceledFalse{false}; const std::atomic<bool> kManualCompactionCanceledFalse{false};
CompactionIterator c_iter( CompactionIterator c_iter(
iter.get(), (cfd_->internal_comparator()).user_comparator(), &merge, iter.get(), (cfd_->internal_comparator()).user_comparator(), &merge,
kMaxSequenceNumber, &existing_snapshots_, kMaxSequenceNumber, &existing_snapshots_, earliest_snapshot_,
earliest_write_conflict_snapshot_, job_snapshot_seq, snapshot_checker_, earliest_write_conflict_snapshot_, job_snapshot_seq, snapshot_checker_,
env, ShouldReportDetailedTime(env, ioptions->stats), env, ShouldReportDetailedTime(env, ioptions->stats),
true /* internal key corruption is not ok */, range_del_agg.get(), true /* internal key corruption is not ok */, range_del_agg.get(),
@ -968,14 +972,17 @@ Status FlushJob::WriteLevel0Table() {
cfd_->GetID(), cfd_->GetName(), 0 /* level */, cfd_->GetID(), cfd_->GetName(), 0 /* level */,
false /* is_bottommost */, TableFileCreationReason::kFlush, false /* is_bottommost */, TableFileCreationReason::kFlush,
oldest_key_time, current_time, db_id_, db_session_id_, oldest_key_time, current_time, db_id_, db_session_id_,
0 /* target_file_size */, meta_.fd.GetNumber()); 0 /* target_file_size */, meta_.fd.GetNumber(),
preclude_last_level_min_seqno_ == kMaxSequenceNumber
? preclude_last_level_min_seqno_
: std::min(earliest_snapshot_, preclude_last_level_min_seqno_));
const SequenceNumber job_snapshot_seq = const SequenceNumber job_snapshot_seq =
job_context_->GetJobSnapshotSequence(); job_context_->GetJobSnapshotSequence();
s = BuildTable( s = BuildTable(
dbname_, versions_, db_options_, tboptions, file_options_, dbname_, versions_, db_options_, tboptions, file_options_,
cfd_->table_cache(), iter.get(), std::move(range_del_iters), &meta_, cfd_->table_cache(), iter.get(), std::move(range_del_iters), &meta_,
&blob_file_additions, existing_snapshots_, &blob_file_additions, existing_snapshots_, earliest_snapshot_,
earliest_write_conflict_snapshot_, job_snapshot_seq, earliest_write_conflict_snapshot_, job_snapshot_seq,
snapshot_checker_, mutable_cf_options_.paranoid_file_checks, snapshot_checker_, mutable_cf_options_.paranoid_file_checks,
cfd_->internal_stats(), &io_s, io_tracer_, cfd_->internal_stats(), &io_s, io_tracer_,
@ -1154,6 +1161,26 @@ void FlushJob::GetEffectiveCutoffUDTForPickedMemTables() {
} }
} }
void FlushJob::GetPrecludeLastLevelMinSeqno() {
if (cfd_->ioptions()->preclude_last_level_data_seconds == 0) {
return;
}
int64_t current_time = 0;
Status s = db_options_.clock->GetCurrentTime(&current_time);
if (!s.ok()) {
ROCKS_LOG_WARN(db_options_.info_log,
"Failed to get current time in Flush: Status: %s",
s.ToString().c_str());
} else {
SequenceNumber preserve_time_min_seqno;
seqno_to_time_mapping_->GetCurrentTieringCutoffSeqnos(
static_cast<uint64_t>(current_time),
cfd_->ioptions()->preserve_internal_time_seconds,
cfd_->ioptions()->preclude_last_level_data_seconds,
&preserve_time_min_seqno, &preclude_last_level_min_seqno_);
}
}
Status FlushJob::MaybeIncreaseFullHistoryTsLowToAboveCutoffUDT() { Status FlushJob::MaybeIncreaseFullHistoryTsLowToAboveCutoffUDT() {
db_mutex_->AssertHeld(); db_mutex_->AssertHeld();
const auto* ucmp = cfd_->user_comparator(); const auto* ucmp = cfd_->user_comparator();

View File

@ -143,6 +143,13 @@ class FlushJob {
// `MaybeIncreaseFullHistoryTsLowToAboveCutoffUDT` for details. // `MaybeIncreaseFullHistoryTsLowToAboveCutoffUDT` for details.
void GetEffectiveCutoffUDTForPickedMemTables(); void GetEffectiveCutoffUDTForPickedMemTables();
// If this column family enables tiering feature, it will find the current
// `preclude_last_level_min_seqno_`, and the smaller one between this and
// the `earliset_snapshot_` will later be announced to user property
// collectors. It indicates to tiering use cases which data are old enough to
// be placed on the last level.
void GetPrecludeLastLevelMinSeqno();
Status MaybeIncreaseFullHistoryTsLowToAboveCutoffUDT(); Status MaybeIncreaseFullHistoryTsLowToAboveCutoffUDT();
const std::string& dbname_; const std::string& dbname_;
@ -161,6 +168,7 @@ class FlushJob {
InstrumentedMutex* db_mutex_; InstrumentedMutex* db_mutex_;
std::atomic<bool>* shutting_down_; std::atomic<bool>* shutting_down_;
std::vector<SequenceNumber> existing_snapshots_; std::vector<SequenceNumber> existing_snapshots_;
SequenceNumber earliest_snapshot_;
SequenceNumber earliest_write_conflict_snapshot_; SequenceNumber earliest_write_conflict_snapshot_;
SnapshotChecker* snapshot_checker_; SnapshotChecker* snapshot_checker_;
JobContext* job_context_; JobContext* job_context_;
@ -221,6 +229,12 @@ class FlushJob {
// Keeps track of the newest user-defined timestamp for this flush job if // Keeps track of the newest user-defined timestamp for this flush job if
// `persist_user_defined_timestamps` flag is false. // `persist_user_defined_timestamps` flag is false.
std::string cutoff_udt_; std::string cutoff_udt_;
// The current minimum seqno that compaction jobs will preclude the data from
// the last level. Data with seqnos larger than this or larger than
// `earliest_snapshot_` will be output to the penultimate level had it gone
// through a compaction to the last level.
SequenceNumber preclude_last_level_min_seqno_ = kMaxSequenceNumber;
}; };
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE

View File

@ -480,9 +480,10 @@ class Repairer {
dbname_, /* versions */ nullptr, immutable_db_options_, tboptions, dbname_, /* versions */ nullptr, immutable_db_options_, tboptions,
file_options_, table_cache_.get(), iter.get(), file_options_, table_cache_.get(), iter.get(),
std::move(range_del_iters), &meta, nullptr /* blob_file_additions */, std::move(range_del_iters), &meta, nullptr /* blob_file_additions */,
{}, kMaxSequenceNumber, kMaxSequenceNumber, snapshot_checker, {}, kMaxSequenceNumber, kMaxSequenceNumber, kMaxSequenceNumber,
false /* paranoid_file_checks*/, nullptr /* internal_stats */, &io_s, snapshot_checker, false /* paranoid_file_checks*/,
nullptr /*IOTracer*/, BlobFileCreationReason::kRecovery, nullptr /* internal_stats */, &io_s, nullptr /*IOTracer*/,
BlobFileCreationReason::kRecovery,
nullptr /* seqno_to_time_mapping */, nullptr /* event_logger */, nullptr /* seqno_to_time_mapping */, nullptr /* event_logger */,
0 /* job_id */, nullptr /* table_properties */, write_hint); 0 /* job_id */, nullptr /* table_properties */, write_hint);
ROCKS_LOG_INFO(db_options_.info_log, ROCKS_LOG_INFO(db_options_.info_log,

View File

@ -69,6 +69,36 @@ SequenceNumber SeqnoToTimeMapping::GetProximalSeqnoBeforeTime(
return it->seqno; return it->seqno;
} }
void SeqnoToTimeMapping::GetCurrentTieringCutoffSeqnos(
uint64_t current_time, uint64_t preserve_internal_time_seconds,
uint64_t preclude_last_level_data_seconds,
SequenceNumber* preserve_time_min_seqno,
SequenceNumber* preclude_last_level_min_seqno) const {
uint64_t preserve_time_duration = std::max(preserve_internal_time_seconds,
preclude_last_level_data_seconds);
if (preserve_time_duration <= 0) {
return;
}
uint64_t preserve_time = current_time > preserve_time_duration
? current_time - preserve_time_duration
: 0;
// GetProximalSeqnoBeforeTime tells us the last seqno known to have been
// written at or before the given time. + 1 to get the minimum we should
// preserve without excluding anything that might have been written on or
// after the given time.
if (preserve_time_min_seqno) {
*preserve_time_min_seqno = GetProximalSeqnoBeforeTime(preserve_time) + 1;
}
if (preclude_last_level_data_seconds > 0 && preclude_last_level_min_seqno) {
uint64_t preclude_last_level_time =
current_time > preclude_last_level_data_seconds
? current_time - preclude_last_level_data_seconds
: 0;
*preclude_last_level_min_seqno =
GetProximalSeqnoBeforeTime(preclude_last_level_time) + 1;
}
}
void SeqnoToTimeMapping::EnforceMaxTimeSpan(uint64_t now) { void SeqnoToTimeMapping::EnforceMaxTimeSpan(uint64_t now) {
assert(enforced_); // at least sorted assert(enforced_); // at least sorted
uint64_t cutoff_time; uint64_t cutoff_time;

View File

@ -213,6 +213,15 @@ class SeqnoToTimeMapping {
// must be in enforced state as a precondition. // must be in enforced state as a precondition.
SequenceNumber GetProximalSeqnoBeforeTime(uint64_t time) const; SequenceNumber GetProximalSeqnoBeforeTime(uint64_t time) const;
// Given current time, the configured `preserve_internal_time_seconds`, and
// `preclude_last_level_data_seconds`, find the relevant cutoff sequence
// numbers for tiering.
void GetCurrentTieringCutoffSeqnos(
uint64_t current_time, uint64_t preserve_internal_time_seconds,
uint64_t preclude_last_level_data_seconds,
SequenceNumber* preserve_time_min_seqno,
SequenceNumber* preclude_last_level_min_seqno) const;
// Encode to a binary string by appending to `dest`. // Encode to a binary string by appending to `dest`.
// Because this is a const operation depending on sortedness, the structure // Because this is a const operation depending on sortedness, the structure
// must be in enforced state as a precondition. // must be in enforced state as a precondition.

View File

@ -44,7 +44,9 @@ class InternalTblPropCollFactory {
virtual ~InternalTblPropCollFactory() {} virtual ~InternalTblPropCollFactory() {}
// has to be thread-safe // has to be thread-safe
virtual InternalTblPropColl* CreateInternalTblPropColl( virtual InternalTblPropColl* CreateInternalTblPropColl(
uint32_t column_family_id, int level_at_creation) = 0; uint32_t column_family_id, int level_at_creation, int num_levels,
SequenceNumber last_level_inclusive_max_seqno_threshold =
kMaxSequenceNumber) = 0;
// The name of the properties collector can be used for debugging purpose. // The name of the properties collector can be used for debugging purpose.
virtual const char* Name() const = 0; virtual const char* Name() const = 0;
@ -92,10 +94,15 @@ class UserKeyTablePropertiesCollectorFactory
std::shared_ptr<TablePropertiesCollectorFactory> user_collector_factory) std::shared_ptr<TablePropertiesCollectorFactory> user_collector_factory)
: user_collector_factory_(user_collector_factory) {} : user_collector_factory_(user_collector_factory) {}
InternalTblPropColl* CreateInternalTblPropColl( InternalTblPropColl* CreateInternalTblPropColl(
uint32_t column_family_id, int level_at_creation) override { uint32_t column_family_id, int level_at_creation, int num_levels,
SequenceNumber last_level_inclusive_max_seqno_threshold =
kMaxSequenceNumber) override {
TablePropertiesCollectorFactory::Context context; TablePropertiesCollectorFactory::Context context;
context.column_family_id = column_family_id; context.column_family_id = column_family_id;
context.level_at_creation = level_at_creation; context.level_at_creation = level_at_creation;
context.num_levels = num_levels;
context.last_level_inclusive_max_seqno_threshold =
last_level_inclusive_max_seqno_threshold;
TablePropertiesCollector* collector = TablePropertiesCollector* collector =
user_collector_factory_->CreateTablePropertiesCollector(context); user_collector_factory_->CreateTablePropertiesCollector(context);
if (collector) { if (collector) {

View File

@ -209,7 +209,9 @@ class RegularKeysStartWithAFactory : public InternalTblPropCollFactory,
} }
} }
InternalTblPropColl* CreateInternalTblPropColl( InternalTblPropColl* CreateInternalTblPropColl(
uint32_t /*column_family_id*/, int /* level_at_creation */) override { uint32_t /*column_family_id*/, int /* level_at_creation */,
int /* num_levels */,
SequenceNumber /* last_level_inclusive_max_seqno_threshold */) override {
return new RegularKeysStartWithAInternal(); return new RegularKeysStartWithAInternal();
} }
const char* Name() const override { return "RegularKeysStartWithA"; } const char* Name() const override { return "RegularKeysStartWithA"; }

View File

@ -157,8 +157,25 @@ class TablePropertiesCollectorFactory : public Customizable {
// The level at creating the SST file (i.e, table), of which the // The level at creating the SST file (i.e, table), of which the
// properties are being collected. // properties are being collected.
int level_at_creation = kUnknownLevelAtCreation; int level_at_creation = kUnknownLevelAtCreation;
int num_levels = kUnknownNumLevels;
// In the tiering case, data with seqnos smaller than or equal to this
// cutoff sequence number will be considered by a compaction job as eligible
// to be placed on the last level. When this is the maximum sequence number,
// it indicates tiering is disabled.
SequenceNumber last_level_inclusive_max_seqno_threshold;
static const uint32_t kUnknownColumnFamily; static const uint32_t kUnknownColumnFamily;
static const int kUnknownLevelAtCreation = -1; static const int kUnknownLevelAtCreation = -1;
static const int kUnknownNumLevels = -1;
Context() {}
Context(uint32_t _column_family_id, int _level_at_creation, int _num_levels,
SequenceNumber _last_level_inclusive_max_seqno_threshold)
: column_family_id(_column_family_id),
level_at_creation(_level_at_creation),
num_levels(_num_levels),
last_level_inclusive_max_seqno_threshold(
_last_level_inclusive_max_seqno_threshold) {}
}; };
~TablePropertiesCollectorFactory() override {} ~TablePropertiesCollectorFactory() override {}

View File

@ -84,4 +84,50 @@ std::shared_ptr<CompactOnDeletionCollectorFactory>
NewCompactOnDeletionCollectorFactory(size_t sliding_window_size, NewCompactOnDeletionCollectorFactory(size_t sliding_window_size,
size_t deletion_trigger, size_t deletion_trigger,
double deletion_ratio = 0); double deletion_ratio = 0);
// A factory of a table property collector that marks a SST file as
// need-compaction when for the tiering use case, it observes, among all the
// data entries, the ratio of entries that are already eligible to be placed on
// the last level but are not yet on the last level is equal to or higher than
// the configured `compaction_trigger_ratio_`.
// 1) Setting the ratio to be equal to or smaller than 0 disables this collector
// 2) Setting the ratio to be within (0, 1] will write the number of
// observed eligible entries into a user property and marks a file as
// need-compaction when aforementioned condition is met.
// 3) Setting the ratio to be higher than 1 can be used to just writes the user
// table property, and not mark any file as need compaction.
// For a column family that does not enable tiering feature, even if an
// effective configuration is provided, this collector is still disabled.
class CompactForTieringCollectorFactory
: public TablePropertiesCollectorFactory {
public:
// @param compaction_trigger_ratio: the triggering threshold for the ratio of
// eligible entries to the total number of entries. See class documentation
// for what entry is eligible.
CompactForTieringCollectorFactory(double compaction_trigger_ratio);
~CompactForTieringCollectorFactory() {}
TablePropertiesCollector* CreateTablePropertiesCollector(
TablePropertiesCollectorFactory::Context context) override;
void SetCompactionTriggerRatio(double new_ratio) {
compaction_trigger_ratio_.store(new_ratio);
}
double GetCompactionTriggerRatio() const {
return compaction_trigger_ratio_.load();
}
static const char* kClassName() { return "CompactForTieringCollector"; }
const char* Name() const override { return kClassName(); }
std::string ToString() const override;
private:
std::atomic<double> compaction_trigger_ratio_;
};
std::shared_ptr<CompactForTieringCollectorFactory>
NewCompactForTieringCollectorFactory(double compaction_trigger_ratio);
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE

2
src.mk
View File

@ -304,6 +304,7 @@ LIB_SOURCES = \
utilities/persistent_cache/volatile_tier_impl.cc \ utilities/persistent_cache/volatile_tier_impl.cc \
utilities/simulator_cache/cache_simulator.cc \ utilities/simulator_cache/cache_simulator.cc \
utilities/simulator_cache/sim_cache.cc \ utilities/simulator_cache/sim_cache.cc \
utilities/table_properties_collectors/compact_for_tiering_collector.cc \
utilities/table_properties_collectors/compact_on_deletion_collector.cc \ utilities/table_properties_collectors/compact_on_deletion_collector.cc \
utilities/trace/file_trace_reader_writer.cc \ utilities/trace/file_trace_reader_writer.cc \
utilities/trace/replayer_impl.cc \ utilities/trace/replayer_impl.cc \
@ -628,6 +629,7 @@ TEST_MAIN_SOURCES = \
utilities/persistent_cache/persistent_cache_test.cc \ utilities/persistent_cache/persistent_cache_test.cc \
utilities/simulator_cache/cache_simulator_test.cc \ utilities/simulator_cache/cache_simulator_test.cc \
utilities/simulator_cache/sim_cache_test.cc \ utilities/simulator_cache/sim_cache_test.cc \
utilities/table_properties_collectors/compact_for_tiering_collector_test.cc \
utilities/table_properties_collectors/compact_on_deletion_collector_test.cc \ utilities/table_properties_collectors/compact_on_deletion_collector_test.cc \
utilities/transactions/optimistic_transaction_test.cc \ utilities/transactions/optimistic_transaction_test.cc \
utilities/transactions/lock/range/range_locking_test.cc \ utilities/transactions/lock/range/range_locking_test.cc \

View File

@ -582,8 +582,10 @@ struct BlockBasedTableBuilder::Rep {
assert(factory); assert(factory);
std::unique_ptr<InternalTblPropColl> collector{ std::unique_ptr<InternalTblPropColl> collector{
factory->CreateInternalTblPropColl(tbo.column_family_id, factory->CreateInternalTblPropColl(
tbo.level_at_creation)}; tbo.column_family_id, tbo.level_at_creation,
tbo.ioptions.num_levels,
tbo.last_level_inclusive_max_seqno_threshold)};
if (collector) { if (collector) {
table_properties_collectors.emplace_back(std::move(collector)); table_properties_collectors.emplace_back(std::move(collector));
} }

View File

@ -119,8 +119,8 @@ PlainTableBuilder::PlainTableBuilder(
assert(factory); assert(factory);
std::unique_ptr<InternalTblPropColl> collector{ std::unique_ptr<InternalTblPropColl> collector{
factory->CreateInternalTblPropColl(column_family_id, factory->CreateInternalTblPropColl(column_family_id, level_at_creation,
level_at_creation)}; ioptions.num_levels)};
if (collector) { if (collector) {
table_properties_collectors_.emplace_back(std::move(collector)); table_properties_collectors_.emplace_back(std::move(collector));
} }

View File

@ -79,7 +79,9 @@ class SstFileWriterPropertiesCollectorFactory
: version_(version), global_seqno_(global_seqno) {} : version_(version), global_seqno_(global_seqno) {}
InternalTblPropColl* CreateInternalTblPropColl( InternalTblPropColl* CreateInternalTblPropColl(
uint32_t /*column_family_id*/, int /* level_at_creation */) override { uint32_t /*column_family_id*/, int /* level_at_creation */,
int /* num_levels */,
SequenceNumber /* last_level_inclusive_max_seqno_threshold */) override {
return new SstFileWriterPropertiesCollector(version_, global_seqno_); return new SstFileWriterPropertiesCollector(version_, global_seqno_);
} }

View File

@ -99,7 +99,7 @@ struct TableReaderOptions {
bool user_defined_timestamps_persisted; bool user_defined_timestamps_persisted;
}; };
struct TableBuilderOptions { struct TableBuilderOptions : public TablePropertiesCollectorFactory::Context {
TableBuilderOptions( TableBuilderOptions(
const ImmutableOptions& _ioptions, const MutableCFOptions& _moptions, const ImmutableOptions& _ioptions, const MutableCFOptions& _moptions,
const ReadOptions& _read_options, const WriteOptions& _write_options, const ReadOptions& _read_options, const WriteOptions& _write_options,
@ -113,8 +113,13 @@ struct TableBuilderOptions {
const int64_t _oldest_key_time = 0, const int64_t _oldest_key_time = 0,
const uint64_t _file_creation_time = 0, const std::string& _db_id = "", const uint64_t _file_creation_time = 0, const std::string& _db_id = "",
const std::string& _db_session_id = "", const std::string& _db_session_id = "",
const uint64_t _target_file_size = 0, const uint64_t _cur_file_num = 0) const uint64_t _target_file_size = 0, const uint64_t _cur_file_num = 0,
: ioptions(_ioptions), const SequenceNumber _last_level_inclusive_max_seqno_threshold =
kMaxSequenceNumber)
: TablePropertiesCollectorFactory::Context(
_column_family_id, _level, _ioptions.num_levels,
_last_level_inclusive_max_seqno_threshold),
ioptions(_ioptions),
moptions(_moptions), moptions(_moptions),
read_options(_read_options), read_options(_read_options),
write_options(_write_options), write_options(_write_options),
@ -122,14 +127,12 @@ struct TableBuilderOptions {
internal_tbl_prop_coll_factories(_internal_tbl_prop_coll_factories), internal_tbl_prop_coll_factories(_internal_tbl_prop_coll_factories),
compression_type(_compression_type), compression_type(_compression_type),
compression_opts(_compression_opts), compression_opts(_compression_opts),
column_family_id(_column_family_id),
column_family_name(_column_family_name), column_family_name(_column_family_name),
oldest_key_time(_oldest_key_time), oldest_key_time(_oldest_key_time),
target_file_size(_target_file_size), target_file_size(_target_file_size),
file_creation_time(_file_creation_time), file_creation_time(_file_creation_time),
db_id(_db_id), db_id(_db_id),
db_session_id(_db_session_id), db_session_id(_db_session_id),
level_at_creation(_level),
is_bottommost(_is_bottommost), is_bottommost(_is_bottommost),
reason(_reason), reason(_reason),
cur_file_num(_cur_file_num) {} cur_file_num(_cur_file_num) {}
@ -142,7 +145,6 @@ struct TableBuilderOptions {
const InternalTblPropCollFactories* internal_tbl_prop_coll_factories; const InternalTblPropCollFactories* internal_tbl_prop_coll_factories;
const CompressionType compression_type; const CompressionType compression_type;
const CompressionOptions& compression_opts; const CompressionOptions& compression_opts;
const uint32_t column_family_id;
const std::string& column_family_name; const std::string& column_family_name;
const int64_t oldest_key_time; const int64_t oldest_key_time;
const uint64_t target_file_size; const uint64_t target_file_size;
@ -150,7 +152,6 @@ struct TableBuilderOptions {
const std::string db_id; const std::string db_id;
const std::string db_session_id; const std::string db_session_id;
// BEGIN for FilterBuildingContext // BEGIN for FilterBuildingContext
const int level_at_creation;
const bool is_bottommost; const bool is_bottommost;
const TableFileCreationReason reason; const TableFileCreationReason reason;
// END for FilterBuildingContext // END for FilterBuildingContext

View File

@ -0,0 +1 @@
Added a `CompactForTieringCollectorFactory` to auto trigger compaction for tiering use case.

View File

@ -0,0 +1,146 @@
// Copyright (c) Meta Platforms, Inc. and affiliates.
//
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include "utilities/table_properties_collectors/compact_for_tiering_collector.h"
#include <sstream>
#include "db/seqno_to_time_mapping.h"
#include "rocksdb/status.h"
#include "rocksdb/types.h"
#include "rocksdb/utilities/customizable_util.h"
#include "rocksdb/utilities/object_registry.h"
#include "rocksdb/utilities/options_type.h"
#include "rocksdb/utilities/table_properties_collectors.h"
#include "util/string_util.h"
namespace ROCKSDB_NAMESPACE {
const std::string
CompactForTieringCollector::kNumEligibleLastLevelEntriesPropertyName =
"rocksdb.eligible.last.level.entries";
CompactForTieringCollector::CompactForTieringCollector(
SequenceNumber last_level_inclusive_max_seqno_threshold,
double compaction_trigger_ratio)
: last_level_inclusive_max_seqno_threshold_(
last_level_inclusive_max_seqno_threshold),
compaction_trigger_ratio_(compaction_trigger_ratio) {
assert(last_level_inclusive_max_seqno_threshold_ != kMaxSequenceNumber);
}
Status CompactForTieringCollector::AddUserKey(const Slice& /*key*/,
const Slice& value,
EntryType type,
SequenceNumber seq,
uint64_t /*file_size*/) {
SequenceNumber seq_for_check = seq;
if (type == kEntryTimedPut) {
seq_for_check = ParsePackedValueForSeqno(value);
}
if (seq_for_check < last_level_inclusive_max_seqno_threshold_) {
last_level_eligible_entries_counter_++;
}
total_entries_counter_ += 1;
return Status::OK();
}
Status CompactForTieringCollector::Finish(UserCollectedProperties* properties) {
assert(!finish_called_);
assert(compaction_trigger_ratio_ > 0);
if (last_level_eligible_entries_counter_ >=
compaction_trigger_ratio_ * total_entries_counter_) {
assert(compaction_trigger_ratio_ <= 1);
need_compaction_ = true;
}
if (last_level_eligible_entries_counter_ > 0) {
*properties = UserCollectedProperties{
{kNumEligibleLastLevelEntriesPropertyName,
std::to_string(last_level_eligible_entries_counter_)},
};
}
finish_called_ = true;
return Status::OK();
}
UserCollectedProperties CompactForTieringCollector::GetReadableProperties()
const {
return UserCollectedProperties{
{kNumEligibleLastLevelEntriesPropertyName,
std::to_string(last_level_eligible_entries_counter_)},
};
}
bool CompactForTieringCollector::NeedCompact() const {
return need_compaction_;
}
void CompactForTieringCollector::Reset() {
last_level_eligible_entries_counter_ = 0;
total_entries_counter_ = 0;
finish_called_ = false;
need_compaction_ = false;
}
TablePropertiesCollector*
CompactForTieringCollectorFactory::CreateTablePropertiesCollector(
TablePropertiesCollectorFactory::Context context) {
double compaction_trigger_ratio = GetCompactionTriggerRatio();
if (compaction_trigger_ratio <= 0 ||
context.level_at_creation == context.num_levels - 1 ||
context.last_level_inclusive_max_seqno_threshold == kMaxSequenceNumber) {
return nullptr;
}
return new CompactForTieringCollector(
context.last_level_inclusive_max_seqno_threshold,
compaction_trigger_ratio);
}
static std::unordered_map<std::string, OptionTypeInfo>
on_compact_for_tiering_type_info = {
{"compaction_trigger_ratio",
{0, OptionType::kUnknown, OptionVerificationType::kNormal,
OptionTypeFlags::kCompareNever | OptionTypeFlags::kMutable,
[](const ConfigOptions&, const std::string&, const std::string& value,
void* addr) {
auto* factory =
static_cast<CompactForTieringCollectorFactory*>(addr);
factory->SetCompactionTriggerRatio(ParseDouble(value));
return Status::OK();
},
[](const ConfigOptions&, const std::string&, const void* addr,
std::string* value) {
const auto* factory =
static_cast<const CompactForTieringCollectorFactory*>(addr);
*value = std::to_string(factory->GetCompactionTriggerRatio());
return Status::OK();
},
nullptr}},
};
CompactForTieringCollectorFactory::CompactForTieringCollectorFactory(
double compaction_trigger_ratio)
: compaction_trigger_ratio_(compaction_trigger_ratio) {
RegisterOptions("", this, &on_compact_for_tiering_type_info);
}
std::string CompactForTieringCollectorFactory::ToString() const {
std::ostringstream cfg;
cfg << Name()
<< ", compaction trigger ratio:" << compaction_trigger_ratio_.load()
<< std::endl;
return cfg.str();
}
std::shared_ptr<CompactForTieringCollectorFactory>
NewCompactForTieringCollectorFactory(double compaction_trigger_ratio) {
return std::make_shared<CompactForTieringCollectorFactory>(
compaction_trigger_ratio);
return std::shared_ptr<CompactForTieringCollectorFactory>(
new CompactForTieringCollectorFactory(compaction_trigger_ratio));
}
} // namespace ROCKSDB_NAMESPACE

View File

@ -0,0 +1,45 @@
// Copyright (c) Meta Platforms, Inc. and affiliates.
//
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#pragma once
#include "rocksdb/utilities/table_properties_collectors.h"
namespace ROCKSDB_NAMESPACE {
// A user property collector that marks a SST file as need-compaction when for
// the tiering use case. See documentation for
// `CompactForTieringCollectorFactory`.
class CompactForTieringCollector : public TablePropertiesCollector {
public:
static const std::string kNumEligibleLastLevelEntriesPropertyName;
CompactForTieringCollector(
SequenceNumber last_level_inclusive_max_seqno_threshold_,
double compaction_trigger_ratio);
Status AddUserKey(const Slice& key, const Slice& value, EntryType type,
SequenceNumber seq, uint64_t file_size) override;
Status Finish(UserCollectedProperties* properties) override;
UserCollectedProperties GetReadableProperties() const override;
const char* Name() const override { return "CompactForTieringCollector"; }
bool NeedCompact() const override;
private:
void Reset();
SequenceNumber last_level_inclusive_max_seqno_threshold_;
double compaction_trigger_ratio_;
size_t last_level_eligible_entries_counter_ = 0;
size_t total_entries_counter_ = 0;
bool finish_called_ = false;
bool need_compaction_ = false;
};
} // namespace ROCKSDB_NAMESPACE

View File

@ -0,0 +1,139 @@
// Copyright (c) Meta Platforms, Inc. and affiliates.
//
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include "utilities/table_properties_collectors/compact_for_tiering_collector.h"
#include <algorithm>
#include <cmath>
#include <cstdio>
#include <iostream>
#include <vector>
#include "db/seqno_to_time_mapping.h"
#include "port/stack_trace.h"
#include "rocksdb/table.h"
#include "rocksdb/table_properties.h"
#include "rocksdb/utilities/table_properties_collectors.h"
#include "test_util/testharness.h"
#include "util/random.h"
namespace ROCKSDB_NAMESPACE {
TEST(CompactForTieringCollector, NotEnabled) {
TablePropertiesCollectorFactory::Context context;
context.column_family_id = 1;
context.level_at_creation = 1;
context.num_levels = 6;
context.last_level_inclusive_max_seqno_threshold = 50;
// Set compaction trigger ratio to 0 to disable it. No collector created.
auto factory = NewCompactForTieringCollectorFactory(0);
std::unique_ptr<TablePropertiesCollector> collector(
factory->CreateTablePropertiesCollector(context));
ASSERT_EQ(nullptr, collector);
}
TEST(CompactForTieringCollector, TieringDisabled) {
TablePropertiesCollectorFactory::Context context;
context.column_family_id = 1;
context.level_at_creation = 1;
context.num_levels = 6;
context.last_level_inclusive_max_seqno_threshold = kMaxSequenceNumber;
// Tiering is disabled on the column family. No collector created.
{
for (double compaction_trigger_ratio : {0.0, 0.1, 1.0, 1.5}) {
auto factory =
NewCompactForTieringCollectorFactory(compaction_trigger_ratio);
std::unique_ptr<TablePropertiesCollector> collector(
factory->CreateTablePropertiesCollector(context));
ASSERT_EQ(nullptr, collector);
}
}
}
TEST(CompactForTieringCollector, LastLevelFile) {
TablePropertiesCollectorFactory::Context context;
context.column_family_id = 1;
context.level_at_creation = 5;
context.num_levels = 6;
context.last_level_inclusive_max_seqno_threshold = 50;
// No collector created for a file that is already on the last level.
{
for (double compaction_trigger_ratio : {0.0, 0.1, 1.0, 1.5}) {
auto factory =
NewCompactForTieringCollectorFactory(compaction_trigger_ratio);
std::unique_ptr<TablePropertiesCollector> collector(
factory->CreateTablePropertiesCollector(context));
ASSERT_EQ(nullptr, collector);
}
}
}
TEST(CompactForTieringCollector, CollectorEnabled) {
TablePropertiesCollectorFactory::Context context;
context.column_family_id = 1;
context.level_at_creation = 1;
context.num_levels = 6;
context.last_level_inclusive_max_seqno_threshold = 50;
const size_t kTotalEntries = 100;
{
for (double compaction_trigger_ratio : {0.1, 0.33333333, 0.5, 1.0, 1.5}) {
auto factory =
NewCompactForTieringCollectorFactory(compaction_trigger_ratio);
std::unique_ptr<TablePropertiesCollector> collector(
factory->CreateTablePropertiesCollector(context));
for (size_t i = 0; i < kTotalEntries; i++) {
ASSERT_OK(collector->AddUserKey("hello", "rocksdb", kEntryPut, i, 0));
ASSERT_FALSE(collector->NeedCompact());
}
UserCollectedProperties user_properties;
ASSERT_OK(collector->Finish(&user_properties));
ASSERT_EQ(user_properties[CompactForTieringCollector::
kNumEligibleLastLevelEntriesPropertyName],
std::to_string(50));
if (compaction_trigger_ratio > 0.5) {
ASSERT_FALSE(collector->NeedCompact());
} else {
ASSERT_TRUE(collector->NeedCompact());
}
}
}
}
TEST(CompactForTieringCollector, TimedPutEntries) {
TablePropertiesCollectorFactory::Context context;
context.column_family_id = 1;
context.level_at_creation = 1;
context.num_levels = 6;
context.last_level_inclusive_max_seqno_threshold = 50;
const size_t kTotalEntries = 100;
auto factory = NewCompactForTieringCollectorFactory(0.1);
std::unique_ptr<TablePropertiesCollector> collector(
factory->CreateTablePropertiesCollector(context));
for (size_t i = 0; i < kTotalEntries; i++) {
std::string value;
PackValueAndSeqno("rocksdb", i, &value);
ASSERT_OK(collector->AddUserKey("hello", value, kEntryTimedPut, 0, 0));
ASSERT_FALSE(collector->NeedCompact());
}
UserCollectedProperties user_properties;
ASSERT_OK(collector->Finish(&user_properties));
ASSERT_EQ(user_properties[CompactForTieringCollector::
kNumEligibleLastLevelEntriesPropertyName],
std::to_string(50));
ASSERT_TRUE(collector->NeedCompact());
}
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {
ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

View File

@ -188,6 +188,7 @@ NewCompactOnDeletionCollectorFactory(size_t sliding_window_size,
new CompactOnDeletionCollectorFactory(sliding_window_size, new CompactOnDeletionCollectorFactory(sliding_window_size,
deletion_trigger, deletion_ratio)); deletion_trigger, deletion_ratio));
} }
namespace { namespace {
static int RegisterTablePropertiesCollectorFactories( static int RegisterTablePropertiesCollectorFactories(
ObjectLibrary& library, const std::string& /*arg*/) { ObjectLibrary& library, const std::string& /*arg*/) {
@ -202,6 +203,17 @@ static int RegisterTablePropertiesCollectorFactories(
guard->reset(new CompactOnDeletionCollectorFactory(0, 0, 0)); guard->reset(new CompactOnDeletionCollectorFactory(0, 0, 0));
return guard->get(); return guard->get();
}); });
library.AddFactory<TablePropertiesCollectorFactory>(
CompactForTieringCollectorFactory::kClassName(),
[](const std::string& /*uri*/,
std::unique_ptr<TablePropertiesCollectorFactory>* guard,
std::string* /* errmsg */) {
// By default, create a `CompactForTieringCollectorFactory` that is
// disabled. Users will need to call corresponding setters to enable
// the factory.
guard->reset(new CompactForTieringCollectorFactory(0));
return guard->get();
});
return 1; return 1;
} }
} // namespace } // namespace

View File

@ -14,6 +14,7 @@
#include <cstdio> #include <cstdio>
#include <vector> #include <vector>
#include "db/dbformat.h"
#include "port/stack_trace.h" #include "port/stack_trace.h"
#include "rocksdb/table.h" #include "rocksdb/table.h"
#include "rocksdb/table_properties.h" #include "rocksdb/table_properties.h"
@ -27,6 +28,7 @@ TEST(CompactOnDeletionCollector, DeletionRatio) {
TablePropertiesCollectorFactory::Context context; TablePropertiesCollectorFactory::Context context;
context.column_family_id = context.column_family_id =
TablePropertiesCollectorFactory::Context::kUnknownColumnFamily; TablePropertiesCollectorFactory::Context::kUnknownColumnFamily;
context.last_level_inclusive_max_seqno_threshold = kMaxSequenceNumber;
const size_t kTotalEntries = 100; const size_t kTotalEntries = 100;
{ {
@ -86,6 +88,7 @@ TEST(CompactOnDeletionCollector, SlidingWindow) {
TablePropertiesCollectorFactory::Context context; TablePropertiesCollectorFactory::Context context;
context.column_family_id = context.column_family_id =
TablePropertiesCollectorFactory::Context::kUnknownColumnFamily; TablePropertiesCollectorFactory::Context::kUnknownColumnFamily;
context.last_level_inclusive_max_seqno_threshold = kMaxSequenceNumber;
std::vector<int> window_sizes; std::vector<int> window_sizes;
std::vector<int> deletion_triggers; std::vector<int> deletion_triggers;