Allow penultimate level output for the last level only compaction (#10822)

Summary:
Allow the last level only compaction able to output result to penultimate level if the penultimate level is empty. Which will also block the other compaction output to the penultimate level.
(it includes the PR https://github.com/facebook/rocksdb/issues/10829)

Pull Request resolved: https://github.com/facebook/rocksdb/pull/10822

Reviewed By: siying

Differential Revision: D40389180

Pulled By: jay-zhuang

fbshipit-source-id: 4e5dcdce307795b5e07b5dd1fa29dd75bb093bad
This commit is contained in:
Jay Zhuang 2022-10-22 08:57:38 -07:00 committed by Facebook GitHub Bot
parent 27c9705ac4
commit f726d29a82
11 changed files with 1002 additions and 44 deletions

View File

@ -6,9 +6,10 @@
* Tiered Storage: allow data moving up from the last level to the penultimate level if the input level is penultimate level or above.
* Added `DB::Properties::kFastBlockCacheEntryStats`, which is similar to `DB::Properties::kBlockCacheEntryStats`, except returns cached (stale) values in more cases to reduce overhead.
* FIFO compaction now supports migrating from a multi-level DB via DB::Open(). During the migration phase, FIFO compaction picker will:
* picks the sst file with the smallest starting key in the bottom-most non-empty level.
* Note that during the migration phase, the file purge order will only be an approximation of "FIFO" as files in lower-level might sometime contain newer keys than files in upper-level.
* picks the sst file with the smallest starting key in the bottom-most non-empty level.
* Note that during the migration phase, the file purge order will only be an approximation of "FIFO" as files in lower-level might sometime contain newer keys than files in upper-level.
* Added an option `ignore_max_compaction_bytes_for_input` to ignore max_compaction_bytes limit when adding files to be compacted from input level. This should help reduce write amplification. The option is enabled by default.
* Tiered Storage: allow data moving up from the last level even if it's a last level only compaction, as long as the penultimate level is empty.
### Bug Fixes
* Fix a bug in io_uring_prep_cancel in AbortIO API for posix which expects sqe->addr to match with read request submitted and wrong paramter was being passed.

View File

@ -259,7 +259,7 @@ Compaction::Compaction(
? mutable_cf_options()->blob_garbage_collection_age_cutoff
: _blob_garbage_collection_age_cutoff),
penultimate_level_(EvaluatePenultimateLevel(
immutable_options_, start_level_, output_level_)) {
vstorage, immutable_options_, start_level_, output_level_)) {
MarkFilesBeingCompacted(true);
if (is_manual_compaction_) {
compaction_reason_ = CompactionReason::kManualCompaction;
@ -322,13 +322,67 @@ void Compaction::PopulatePenultimateLevelOutputRange() {
return;
}
int exclude_level =
immutable_options_.compaction_style == kCompactionStyleUniversal
? kInvalidLevel
: number_levels_ - 1;
// exclude the last level, the range of all input levels is the safe range
// of keys that can be moved up.
int exclude_level = number_levels_ - 1;
penultimate_output_range_type_ = PenultimateOutputRangeType::kNonLastRange;
// For universal compaction, the penultimate_output_range could be extended if
// all penultimate level files are included in the compaction (which includes
// the case that the penultimate level is empty).
if (immutable_options_.compaction_style == kCompactionStyleUniversal) {
exclude_level = kInvalidLevel;
std::set<uint64_t> penultimate_inputs;
for (const auto& input_lvl : inputs_) {
if (input_lvl.level == penultimate_level_) {
for (const auto& file : input_lvl.files) {
penultimate_inputs.emplace(file->fd.GetNumber());
}
}
}
auto penultimate_files = input_vstorage_->LevelFiles(penultimate_level_);
for (const auto& file : penultimate_files) {
if (penultimate_inputs.find(file->fd.GetNumber()) ==
penultimate_inputs.end()) {
exclude_level = number_levels_ - 1;
penultimate_output_range_type_ = PenultimateOutputRangeType::kFullRange;
break;
}
}
}
GetBoundaryKeys(input_vstorage_, inputs_,
&penultimate_level_smallest_user_key_,
&penultimate_level_largest_user_key_, exclude_level);
// If there's a case that the penultimate level output range is overlapping
// with the existing files, disable the penultimate level output by setting
// the range to empty. One example is the range delete could have overlap
// boundary with the next file. (which is actually a false overlap)
// TODO: Exclude such false overlap, so it won't disable the penultimate
// output.
std::set<uint64_t> penultimate_inputs;
for (const auto& input_lvl : inputs_) {
if (input_lvl.level == penultimate_level_) {
for (const auto& file : input_lvl.files) {
penultimate_inputs.emplace(file->fd.GetNumber());
}
}
}
auto penultimate_files = input_vstorage_->LevelFiles(penultimate_level_);
for (const auto& file : penultimate_files) {
if (penultimate_inputs.find(file->fd.GetNumber()) ==
penultimate_inputs.end() &&
OverlapPenultimateLevelOutputRange(file->smallest.user_key(),
file->largest.user_key())) {
// basically disable the penultimate range output. which should be rare
// or a false overlap caused by range del
penultimate_level_smallest_user_key_ = "";
penultimate_level_largest_user_key_ = "";
penultimate_output_range_type_ = PenultimateOutputRangeType::kDisabled;
}
}
}
Compaction::~Compaction() {
@ -368,6 +422,11 @@ bool Compaction::WithinPenultimateLevelOutputRange(const Slice& key) const {
return false;
}
if (penultimate_level_smallest_user_key_.empty() ||
penultimate_level_largest_user_key_.empty()) {
return false;
}
const Comparator* ucmp =
input_vstorage_->InternalComparator()->user_comparator();
@ -749,6 +808,7 @@ uint64_t Compaction::MinInputFileOldestAncesterTime(
}
int Compaction::EvaluatePenultimateLevel(
const VersionStorageInfo* vstorage,
const ImmutableOptions& immutable_options, const int start_level,
const int output_level) {
// TODO: currently per_key_placement feature only support level and universal
@ -763,7 +823,19 @@ int Compaction::EvaluatePenultimateLevel(
int penultimate_level = output_level - 1;
assert(penultimate_level < immutable_options.num_levels);
if (penultimate_level <= 0 || penultimate_level < start_level) {
if (penultimate_level <= 0) {
return kInvalidLevel;
}
// If the penultimate level is not within input level -> output level range
// check if the penultimate output level is empty, if it's empty, it could
// also be locked for the penultimate output.
// TODO: ideally, it only needs to check if there's a file within the
// compaction output key range. For simplicity, it just check if there's any
// file on the penultimate level.
if (start_level == immutable_options.num_levels - 1 &&
(immutable_options.compaction_style != kCompactionStyleUniversal ||
!vstorage->LevelFiles(penultimate_level).empty())) {
return kInvalidLevel;
}

View File

@ -87,6 +87,15 @@ class Compaction {
BlobGarbageCollectionPolicy::kUseDefault,
double blob_garbage_collection_age_cutoff = -1);
// The type of the penultimate level output range
enum class PenultimateOutputRangeType : int {
kNotSupported, // it cannot output to the penultimate level
kFullRange, // any data could be output to the penultimate level
kNonLastRange, // only the keys within non_last_level compaction inputs can
// be outputted to the penultimate level
kDisabled, // no data can be outputted to the penultimate level
};
// No copying allowed
Compaction(const Compaction&) = delete;
void operator=(const Compaction&) = delete;
@ -310,6 +319,18 @@ class Compaction {
Slice GetLargestUserKey() const { return largest_user_key_; }
Slice GetPenultimateLevelSmallestUserKey() const {
return penultimate_level_smallest_user_key_;
}
Slice GetPenultimateLevelLargestUserKey() const {
return penultimate_level_largest_user_key_;
}
PenultimateOutputRangeType GetPenultimateOutputRangeType() const {
return penultimate_output_range_type_;
}
// Return true if the compaction supports per_key_placement
bool SupportsPerKeyPlacement() const;
@ -369,11 +390,18 @@ class Compaction {
}
static constexpr int kInvalidLevel = -1;
// Evaluate penultimate output level. If the compaction supports
// per_key_placement feature, it returns the penultimate level number.
// Otherwise, it's set to kInvalidLevel (-1), which means
// output_to_penultimate_level is not supported.
static int EvaluatePenultimateLevel(const ImmutableOptions& immutable_options,
// Note: even the penultimate level output is supported (PenultimateLevel !=
// kInvalidLevel), some key range maybe unsafe to be outputted to the
// penultimate level. The safe key range is populated by
// `PopulatePenultimateLevelOutputRange()`.
// Which could potentially disable all penultimate level output.
static int EvaluatePenultimateLevel(const VersionStorageInfo* vstorage,
const ImmutableOptions& immutable_options,
const int start_level,
const int output_level);
@ -390,11 +418,6 @@ class Compaction {
// populate penultimate level output range, which will be used to determine if
// a key is safe to output to the penultimate level (details see
// `Compaction::WithinPenultimateLevelOutputRange()`.
// TODO: Currently the penultimate level output range is the min/max keys of
// non-last-level input files. Which is only good if there's no key moved
// from the last level to the penultimate level. For a more complicated per
// key placement which may move data from the last level to the penultimate
// level, it needs extra check.
void PopulatePenultimateLevelOutputRange();
// Get the atomic file boundaries for all files in the compaction. Necessary
@ -503,8 +526,11 @@ class Compaction {
// Key range for penultimate level output
// includes timestamp if user-defined timestamp is enabled.
// penultimate_output_range_type_ shows the range type
Slice penultimate_level_smallest_user_key_;
Slice penultimate_level_largest_user_key_;
PenultimateOutputRangeType penultimate_output_range_type_ =
PenultimateOutputRangeType::kNotSupported;
};
#ifndef NDEBUG

View File

@ -107,6 +107,23 @@ const char* GetCompactionReasonString(CompactionReason compaction_reason) {
}
}
const char* GetCompactionPenultimateOutputRangeTypeString(
Compaction::PenultimateOutputRangeType range_type) {
switch (range_type) {
case Compaction::PenultimateOutputRangeType::kNotSupported:
return "NotSupported";
case Compaction::PenultimateOutputRangeType::kFullRange:
return "FullRange";
case Compaction::PenultimateOutputRangeType::kNonLastRange:
return "NonLastRange";
case Compaction::PenultimateOutputRangeType::kDisabled:
return "Disabled";
default:
assert(false);
return "Invalid";
}
}
CompactionJob::CompactionJob(
int job_id, Compaction* compaction, const ImmutableDBOptions& db_options,
const MutableDBOptions& mutable_db_options, const FileOptions& file_options,
@ -1261,6 +1278,10 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
};
Status status;
TEST_SYNC_POINT_CALLBACK(
"CompactionJob::ProcessKeyValueCompaction()::Processing",
reinterpret_cast<void*>(
const_cast<Compaction*>(sub_compact->compaction)));
while (status.ok() && !cfd->IsDropped() && c_iter->Valid()) {
// Invariant: c_iter.status() is guaranteed to be OK if c_iter->Valid()
// returns true.
@ -1976,7 +1997,7 @@ void CompactionJob::LogCompaction() {
compaction->InputLevelSummary(&inputs_summary), compaction->score());
char scratch[2345];
compaction->Summary(scratch, sizeof(scratch));
ROCKS_LOG_INFO(db_options_.info_log, "[%s] Compaction start summary: %s\n",
ROCKS_LOG_INFO(db_options_.info_log, "[%s]: Compaction start summary: %s\n",
cfd->GetName().c_str(), scratch);
// build event logger report
auto stream = event_logger_->Log();
@ -1997,6 +2018,23 @@ void CompactionJob::LogCompaction() {
<< (existing_snapshots_.empty()
? int64_t{-1} // Use -1 for "none"
: static_cast<int64_t>(existing_snapshots_[0]));
if (compaction->SupportsPerKeyPlacement()) {
stream << "preclude_last_level_min_seqno"
<< preclude_last_level_min_seqno_;
stream << "penultimate_output_level" << compaction->GetPenultimateLevel();
stream << "penultimate_output_range"
<< GetCompactionPenultimateOutputRangeTypeString(
compaction->GetPenultimateOutputRangeType());
if (compaction->GetPenultimateOutputRangeType() ==
Compaction::PenultimateOutputRangeType::kDisabled) {
ROCKS_LOG_WARN(
db_options_.info_log,
"[%s] [JOB %d] Penultimate level output is disabled, likely "
"because of the range conflict in the penultimate level",
cfd->GetName().c_str(), job_id_);
}
}
}
}

View File

@ -294,13 +294,12 @@ bool CompactionPicker::RangeOverlapWithCompaction(
}
bool CompactionPicker::FilesRangeOverlapWithCompaction(
const std::vector<CompactionInputFiles>& inputs, int level) const {
const std::vector<CompactionInputFiles>& inputs, int level,
int penultimate_level) const {
bool is_empty = true;
int start_level = -1;
for (auto& in : inputs) {
if (!in.empty()) {
is_empty = false;
start_level = in.level; // inputs are sorted by level
break;
}
}
@ -309,10 +308,10 @@ bool CompactionPicker::FilesRangeOverlapWithCompaction(
return false;
}
// TODO: Intra L0 compactions can have the ranges overlapped, but the input
// files cannot be overlapped in the order of L0 files.
InternalKey smallest, largest;
GetRange(inputs, &smallest, &largest, Compaction::kInvalidLevel);
int penultimate_level =
Compaction::EvaluatePenultimateLevel(ioptions_, start_level, level);
if (penultimate_level != Compaction::kInvalidLevel) {
if (ioptions_.compaction_style == kCompactionStyleUniversal) {
if (RangeOverlapWithCompaction(smallest.user_key(), largest.user_key(),
@ -350,11 +349,25 @@ Compaction* CompactionPicker::CompactFiles(
const std::vector<CompactionInputFiles>& input_files, int output_level,
VersionStorageInfo* vstorage, const MutableCFOptions& mutable_cf_options,
const MutableDBOptions& mutable_db_options, uint32_t output_path_id) {
#ifndef NDEBUG
assert(input_files.size());
// This compaction output should not overlap with a running compaction as
// `SanitizeCompactionInputFiles` should've checked earlier and db mutex
// shouldn't have been released since.
assert(!FilesRangeOverlapWithCompaction(input_files, output_level));
int start_level = Compaction::kInvalidLevel;
for (const auto& in : input_files) {
// input_files should already be sorted by level
if (!in.empty()) {
start_level = in.level;
break;
}
}
assert(output_level == 0 ||
!FilesRangeOverlapWithCompaction(
input_files, output_level,
Compaction::EvaluatePenultimateLevel(vstorage, ioptions_,
start_level, output_level)));
#endif /* !NDEBUG */
CompressionType compression_type;
if (compact_options.compression == kDisableCompressionOption) {
@ -652,7 +665,10 @@ Compaction* CompactionPicker::CompactRange(
// 2 non-exclusive manual compactions could run at the same time producing
// overlaping outputs in the same level.
if (FilesRangeOverlapWithCompaction(inputs, output_level)) {
if (FilesRangeOverlapWithCompaction(
inputs, output_level,
Compaction::EvaluatePenultimateLevel(vstorage, ioptions_,
start_level, output_level))) {
// This compaction output could potentially conflict with the output
// of a currently running compaction, we cannot run it.
*manual_conflict = true;
@ -831,7 +847,10 @@ Compaction* CompactionPicker::CompactRange(
// 2 non-exclusive manual compactions could run at the same time producing
// overlaping outputs in the same level.
if (FilesRangeOverlapWithCompaction(compaction_inputs, output_level)) {
if (FilesRangeOverlapWithCompaction(
compaction_inputs, output_level,
Compaction::EvaluatePenultimateLevel(vstorage, ioptions_, input_level,
output_level))) {
// This compaction output could potentially conflict with the output
// of a currently running compaction, we cannot run it.
*manual_conflict = true;
@ -1116,7 +1135,8 @@ void CompactionPicker::RegisterCompaction(Compaction* c) {
}
assert(ioptions_.compaction_style != kCompactionStyleLevel ||
c->output_level() == 0 ||
!FilesRangeOverlapWithCompaction(*c->inputs(), c->output_level()));
!FilesRangeOverlapWithCompaction(*c->inputs(), c->output_level(),
c->GetPenultimateLevel()));
if (c->start_level() == 0 ||
ioptions_.compaction_style == kCompactionStyleUniversal) {
level0_compactions_in_progress_.insert(c);

View File

@ -182,7 +182,8 @@ class CompactionPicker {
// Returns true if the key range that `inputs` files cover overlap with the
// key range of a currently running compaction.
bool FilesRangeOverlapWithCompaction(
const std::vector<CompactionInputFiles>& inputs, int level) const;
const std::vector<CompactionInputFiles>& inputs, int level,
int penultimate_level) const;
bool SetupOtherInputs(const std::string& cf_name,
const MutableCFOptions& mutable_cf_options,

View File

@ -379,7 +379,9 @@ void LevelCompactionBuilder::SetupOtherFilesWithRoundRobinExpansion() {
if (!compaction_picker_->ExpandInputsToCleanCut(cf_name_, vstorage_,
&tmp_start_level_inputs) ||
compaction_picker_->FilesRangeOverlapWithCompaction(
{tmp_start_level_inputs}, output_level_)) {
{tmp_start_level_inputs}, output_level_,
Compaction::EvaluatePenultimateLevel(
vstorage_, ioptions_, start_level_, output_level_))) {
// Constraint 1a
tmp_start_level_inputs.clear();
return;
@ -453,7 +455,9 @@ bool LevelCompactionBuilder::SetupOtherInputsIfNeeded() {
// (2) AddFile ingest a new file into the LSM tree
// We need to disallow this from happening.
if (compaction_picker_->FilesRangeOverlapWithCompaction(
compaction_inputs_, output_level_)) {
compaction_inputs_, output_level_,
Compaction::EvaluatePenultimateLevel(
vstorage_, ioptions_, start_level_, output_level_))) {
// This compaction output could potentially conflict with the output
// of a currently running compaction, we cannot run it.
return false;
@ -755,7 +759,9 @@ bool LevelCompactionBuilder::PickFileToCompact() {
if (!compaction_picker_->ExpandInputsToCleanCut(cf_name_, vstorage_,
&start_level_inputs_) ||
compaction_picker_->FilesRangeOverlapWithCompaction(
{start_level_inputs_}, output_level_)) {
{start_level_inputs_}, output_level_,
Compaction::EvaluatePenultimateLevel(
vstorage_, ioptions_, start_level_, output_level_))) {
// A locked (pending compaction) input-level file was pulled in due to
// user-key overlap.
start_level_inputs_.clear();

View File

@ -3149,7 +3149,7 @@ TEST_F(CompactionPickerTest, UniversalMarkedL0Overlap2) {
// should fail
NewVersionStorage(1, kCompactionStyleUniversal);
// Mark file number 4 for compaction
// Mark file number 5 for compaction
Add(0, 4U, "260", "300", 1 * kFileSize, 0, 260, 300);
Add(0, 5U, "240", "290", 2 * kFileSize, 0, 201, 250, 0, true);
Add(0, 3U, "301", "350", 4 * kFileSize, 0, 101, 150);
@ -3524,9 +3524,11 @@ TEST_P(PerKeyPlacementCompactionPickerTest, OverlapWithNormalCompaction) {
ASSERT_OK(level_compaction_picker.GetCompactionInputsFromFileNumbers(
&input_files, &input_set, vstorage_.get(), comp_options));
ASSERT_EQ(
enable_per_key_placement_,
level_compaction_picker.FilesRangeOverlapWithCompaction(input_files, 6));
ASSERT_EQ(enable_per_key_placement_,
level_compaction_picker.FilesRangeOverlapWithCompaction(
input_files, 6,
Compaction::EvaluatePenultimateLevel(vstorage_.get(), ioptions_,
0, 6)));
}
TEST_P(PerKeyPlacementCompactionPickerTest, NormalCompactionOverlap) {
@ -3567,9 +3569,9 @@ TEST_P(PerKeyPlacementCompactionPickerTest, NormalCompactionOverlap) {
ASSERT_OK(level_compaction_picker.GetCompactionInputsFromFileNumbers(
&input_files, &input_set, vstorage_.get(), comp_options));
ASSERT_EQ(
enable_per_key_placement_,
level_compaction_picker.FilesRangeOverlapWithCompaction(input_files, 5));
ASSERT_EQ(enable_per_key_placement_,
level_compaction_picker.FilesRangeOverlapWithCompaction(
input_files, 5, Compaction::kInvalidLevel));
}
TEST_P(PerKeyPlacementCompactionPickerTest,
@ -3612,7 +3614,9 @@ TEST_P(PerKeyPlacementCompactionPickerTest,
ASSERT_EQ(enable_per_key_placement_,
universal_compaction_picker.FilesRangeOverlapWithCompaction(
input_files, 6));
input_files, 6,
Compaction::EvaluatePenultimateLevel(vstorage_.get(), ioptions_,
0, 6)));
}
TEST_P(PerKeyPlacementCompactionPickerTest, NormalCompactionOverlapUniversal) {
@ -3656,7 +3660,7 @@ TEST_P(PerKeyPlacementCompactionPickerTest, NormalCompactionOverlapUniversal) {
ASSERT_EQ(enable_per_key_placement_,
universal_compaction_picker.FilesRangeOverlapWithCompaction(
input_files, 5));
input_files, 5, Compaction::kInvalidLevel));
}
TEST_P(PerKeyPlacementCompactionPickerTest, PenultimateOverlapUniversal) {
@ -3677,7 +3681,7 @@ TEST_P(PerKeyPlacementCompactionPickerTest, PenultimateOverlapUniversal) {
Add(4, 40U, "200", "220", 60000000U);
Add(4, 41U, "230", "250", 60000000U);
Add(4, 42U, "360", "380", 60000000U);
Add(6, 50U, "101", "351", 60000000U);
Add(6, 60U, "101", "351", 60000000U);
UpdateVersionStorageInfo();
// the existing compaction is the 1st L4 file + L6 file
@ -3686,7 +3690,7 @@ TEST_P(PerKeyPlacementCompactionPickerTest, PenultimateOverlapUniversal) {
CompactionOptions comp_options;
std::unordered_set<uint64_t> input_set;
input_set.insert(40);
input_set.insert(50);
input_set.insert(60);
std::vector<CompactionInputFiles> input_files;
ASSERT_OK(universal_compaction_picker.GetCompactionInputsFromFileNumbers(
&input_files, &input_set, vstorage_.get(), comp_options));
@ -3703,7 +3707,7 @@ TEST_P(PerKeyPlacementCompactionPickerTest, PenultimateOverlapUniversal) {
ASSERT_EQ(enable_per_key_placement_,
universal_compaction_picker.FilesRangeOverlapWithCompaction(
input_files, 5));
input_files, 5, Compaction::kInvalidLevel));
// compacting the 3rd L4 file is always safe:
input_set.clear();
@ -3713,7 +3717,237 @@ TEST_P(PerKeyPlacementCompactionPickerTest, PenultimateOverlapUniversal) {
&input_files, &input_set, vstorage_.get(), comp_options));
ASSERT_FALSE(universal_compaction_picker.FilesRangeOverlapWithCompaction(
input_files, 5));
input_files, 5, Compaction::kInvalidLevel));
}
TEST_P(PerKeyPlacementCompactionPickerTest, LastLevelOnlyOverlapUniversal) {
if (enable_per_key_placement_) {
ioptions_.preclude_last_level_data_seconds = 10000;
}
int num_levels = ioptions_.num_levels;
ioptions_.compaction_style = kCompactionStyleUniversal;
UniversalCompactionPicker universal_compaction_picker(ioptions_, &icmp_);
NewVersionStorage(num_levels, kCompactionStyleUniversal);
// L4: [200, 220] [230, 250] [360, 380]
// L5:
// L6: [101, 351]
Add(4, 40U, "200", "220", 60000000U);
Add(4, 41U, "230", "250", 60000000U);
Add(4, 42U, "360", "380", 60000000U);
Add(6, 60U, "101", "351", 60000000U);
UpdateVersionStorageInfo();
CompactionOptions comp_options;
std::unordered_set<uint64_t> input_set;
input_set.insert(60);
std::vector<CompactionInputFiles> input_files;
ASSERT_OK(universal_compaction_picker.GetCompactionInputsFromFileNumbers(
&input_files, &input_set, vstorage_.get(), comp_options));
std::unique_ptr<Compaction> comp1(universal_compaction_picker.CompactFiles(
comp_options, input_files, 6, vstorage_.get(), mutable_cf_options_,
mutable_db_options_, 0));
// cannot compact file 41 if the preclude_last_level feature is on, otherwise
// compact file 41 is okay.
input_set.clear();
input_files.clear();
input_set.insert(41);
ASSERT_OK(universal_compaction_picker.GetCompactionInputsFromFileNumbers(
&input_files, &input_set, vstorage_.get(), comp_options));
ASSERT_EQ(enable_per_key_placement_,
universal_compaction_picker.FilesRangeOverlapWithCompaction(
input_files, 5, Compaction::kInvalidLevel));
// compacting the 3rd L4 file is always safe:
input_set.clear();
input_files.clear();
input_set.insert(42);
ASSERT_OK(universal_compaction_picker.GetCompactionInputsFromFileNumbers(
&input_files, &input_set, vstorage_.get(), comp_options));
ASSERT_FALSE(universal_compaction_picker.FilesRangeOverlapWithCompaction(
input_files, 5, Compaction::kInvalidLevel));
}
TEST_P(PerKeyPlacementCompactionPickerTest,
LastLevelOnlyFailPenultimateUniversal) {
// This is to test last_level only compaction still unable to do the
// penultimate level compaction if there's already a file in the penultimate
// level.
// This should rarely happen in universal compaction, as the non-empty L5
// should be included in the compaction.
if (enable_per_key_placement_) {
ioptions_.preclude_last_level_data_seconds = 10000;
}
int num_levels = ioptions_.num_levels;
ioptions_.compaction_style = kCompactionStyleUniversal;
UniversalCompactionPicker universal_compaction_picker(ioptions_, &icmp_);
NewVersionStorage(num_levels, kCompactionStyleUniversal);
// L4: [200, 220]
// L5: [230, 250]
// L6: [101, 351]
Add(4, 40U, "200", "220", 60000000U);
Add(5, 50U, "230", "250", 60000000U);
Add(6, 60U, "101", "351", 60000000U);
UpdateVersionStorageInfo();
CompactionOptions comp_options;
std::unordered_set<uint64_t> input_set;
input_set.insert(60);
std::vector<CompactionInputFiles> input_files;
ASSERT_OK(universal_compaction_picker.GetCompactionInputsFromFileNumbers(
&input_files, &input_set, vstorage_.get(), comp_options));
std::unique_ptr<Compaction> comp1(universal_compaction_picker.CompactFiles(
comp_options, input_files, 6, vstorage_.get(), mutable_cf_options_,
mutable_db_options_, 0));
ASSERT_TRUE(comp1);
ASSERT_EQ(comp1->GetPenultimateLevel(), Compaction::kInvalidLevel);
// As comp1 cannot be output to the penultimate level, compacting file 40 to
// L5 is always safe.
input_set.clear();
input_files.clear();
input_set.insert(40);
ASSERT_OK(universal_compaction_picker.GetCompactionInputsFromFileNumbers(
&input_files, &input_set, vstorage_.get(), comp_options));
ASSERT_FALSE(universal_compaction_picker.FilesRangeOverlapWithCompaction(
input_files, 5, Compaction::kInvalidLevel));
std::unique_ptr<Compaction> comp2(universal_compaction_picker.CompactFiles(
comp_options, input_files, 5, vstorage_.get(), mutable_cf_options_,
mutable_db_options_, 0));
ASSERT_TRUE(comp2);
ASSERT_EQ(Compaction::kInvalidLevel, comp2->GetPenultimateLevel());
}
TEST_P(PerKeyPlacementCompactionPickerTest,
LastLevelOnlyConflictWithOngoingUniversal) {
// This is to test last_level only compaction still unable to do the
// penultimate level compaction if there's already an ongoing compaction to
// the penultimate level
if (enable_per_key_placement_) {
ioptions_.preclude_last_level_data_seconds = 10000;
}
int num_levels = ioptions_.num_levels;
ioptions_.compaction_style = kCompactionStyleUniversal;
UniversalCompactionPicker universal_compaction_picker(ioptions_, &icmp_);
NewVersionStorage(num_levels, kCompactionStyleUniversal);
// L4: [200, 220] [230, 250] [360, 380]
// L5:
// L6: [101, 351]
Add(4, 40U, "200", "220", 60000000U);
Add(4, 41U, "230", "250", 60000000U);
Add(4, 42U, "360", "380", 60000000U);
Add(6, 60U, "101", "351", 60000000U);
UpdateVersionStorageInfo();
// create an ongoing compaction to L5 (penultimate level)
CompactionOptions comp_options;
std::unordered_set<uint64_t> input_set;
input_set.insert(40);
std::vector<CompactionInputFiles> input_files;
ASSERT_OK(universal_compaction_picker.GetCompactionInputsFromFileNumbers(
&input_files, &input_set, vstorage_.get(), comp_options));
std::unique_ptr<Compaction> comp1(universal_compaction_picker.CompactFiles(
comp_options, input_files, 5, vstorage_.get(), mutable_cf_options_,
mutable_db_options_, 0));
ASSERT_TRUE(comp1);
ASSERT_EQ(comp1->GetPenultimateLevel(), Compaction::kInvalidLevel);
input_set.clear();
input_files.clear();
input_set.insert(60);
ASSERT_OK(universal_compaction_picker.GetCompactionInputsFromFileNumbers(
&input_files, &input_set, vstorage_.get(), comp_options));
ASSERT_EQ(enable_per_key_placement_,
universal_compaction_picker.FilesRangeOverlapWithCompaction(
input_files, 6,
Compaction::EvaluatePenultimateLevel(vstorage_.get(), ioptions_,
6, 6)));
if (!enable_per_key_placement_) {
std::unique_ptr<Compaction> comp2(universal_compaction_picker.CompactFiles(
comp_options, input_files, 6, vstorage_.get(), mutable_cf_options_,
mutable_db_options_, 0));
ASSERT_TRUE(comp2);
ASSERT_EQ(Compaction::kInvalidLevel, comp2->GetPenultimateLevel());
}
}
TEST_P(PerKeyPlacementCompactionPickerTest,
LastLevelOnlyNoConflictWithOngoingUniversal) {
// This is similar to `LastLevelOnlyConflictWithOngoingUniversal`, the only
// change is the ongoing compaction to L5 has no overlap with the last level
// compaction, so it's safe to move data from the last level to the
// penultimate level.
if (enable_per_key_placement_) {
ioptions_.preclude_last_level_data_seconds = 10000;
}
int num_levels = ioptions_.num_levels;
ioptions_.compaction_style = kCompactionStyleUniversal;
UniversalCompactionPicker universal_compaction_picker(ioptions_, &icmp_);
NewVersionStorage(num_levels, kCompactionStyleUniversal);
// L4: [200, 220] [230, 250] [360, 380]
// L5:
// L6: [101, 351]
Add(4, 40U, "200", "220", 60000000U);
Add(4, 41U, "230", "250", 60000000U);
Add(4, 42U, "360", "380", 60000000U);
Add(6, 60U, "101", "351", 60000000U);
UpdateVersionStorageInfo();
// create an ongoing compaction to L5 (penultimate level)
CompactionOptions comp_options;
std::unordered_set<uint64_t> input_set;
input_set.insert(42);
std::vector<CompactionInputFiles> input_files;
ASSERT_OK(universal_compaction_picker.GetCompactionInputsFromFileNumbers(
&input_files, &input_set, vstorage_.get(), comp_options));
std::unique_ptr<Compaction> comp1(universal_compaction_picker.CompactFiles(
comp_options, input_files, 5, vstorage_.get(), mutable_cf_options_,
mutable_db_options_, 0));
ASSERT_TRUE(comp1);
ASSERT_EQ(comp1->GetPenultimateLevel(), Compaction::kInvalidLevel);
input_set.clear();
input_files.clear();
input_set.insert(60);
ASSERT_OK(universal_compaction_picker.GetCompactionInputsFromFileNumbers(
&input_files, &input_set, vstorage_.get(), comp_options));
// always safe to move data up
ASSERT_FALSE(universal_compaction_picker.FilesRangeOverlapWithCompaction(
input_files, 6,
Compaction::EvaluatePenultimateLevel(vstorage_.get(), ioptions_, 6, 6)));
// 2 compactions can be run in parallel
std::unique_ptr<Compaction> comp2(universal_compaction_picker.CompactFiles(
comp_options, input_files, 6, vstorage_.get(), mutable_cf_options_,
mutable_db_options_, 0));
ASSERT_TRUE(comp2);
if (enable_per_key_placement_) {
ASSERT_NE(Compaction::kInvalidLevel, comp2->GetPenultimateLevel());
} else {
ASSERT_EQ(Compaction::kInvalidLevel, comp2->GetPenultimateLevel());
}
}
INSTANTIATE_TEST_CASE_P(PerKeyPlacementCompactionPickerTest,

View File

@ -743,6 +743,13 @@ Compaction* UniversalCompactionBuilder::PickCompactionToReduceSortedRuns(
grandparents = vstorage_->LevelFiles(sorted_runs_[first_index_after].level);
}
if (output_level != 0 &&
picker_->FilesRangeOverlapWithCompaction(
inputs, output_level,
Compaction::EvaluatePenultimateLevel(vstorage_, ioptions_,
start_level, output_level))) {
return nullptr;
}
CompactionReason compaction_reason;
if (max_number_of_files_to_compact == UINT_MAX) {
compaction_reason = CompactionReason::kUniversalSizeRatio;
@ -1081,6 +1088,24 @@ Compaction* UniversalCompactionBuilder::PickIncrementalForReduceSizeAmp(
inputs.push_back(second_last_level_inputs);
inputs.push_back(bottom_level_inputs);
int start_level = Compaction::kInvalidLevel;
for (const auto& in : inputs) {
if (!in.empty()) {
// inputs should already be sorted by level
start_level = in.level;
break;
}
}
// intra L0 compactions outputs could have overlap
if (output_level != 0 &&
picker_->FilesRangeOverlapWithCompaction(
inputs, output_level,
Compaction::EvaluatePenultimateLevel(vstorage_, ioptions_,
start_level, output_level))) {
return nullptr;
}
// TODO support multi paths?
uint32_t path_id = 0;
return new Compaction(
@ -1210,7 +1235,10 @@ Compaction* UniversalCompactionBuilder::PickDeleteTriggeredCompaction() {
if (!output_level_inputs.empty()) {
inputs.push_back(output_level_inputs);
}
if (picker_->FilesRangeOverlapWithCompaction(inputs, output_level)) {
if (picker_->FilesRangeOverlapWithCompaction(
inputs, output_level,
Compaction::EvaluatePenultimateLevel(
vstorage_, ioptions_, start_level, output_level))) {
return nullptr;
}
@ -1312,6 +1340,15 @@ Compaction* UniversalCompactionBuilder::PickCompactionWithSortedRunRange(
output_level = sorted_runs_[end_index + 1].level - 1;
}
// intra L0 compactions outputs could have overlap
if (output_level != 0 &&
picker_->FilesRangeOverlapWithCompaction(
inputs, output_level,
Compaction::EvaluatePenultimateLevel(vstorage_, ioptions_,
start_level, output_level))) {
return nullptr;
}
// We never check size for
// compaction_options_universal.compression_size_percent,
// because we always compact all the files, so always compress.

View File

@ -10,6 +10,7 @@
#include "db/db_test_util.h"
#include "port/stack_trace.h"
#include "rocksdb/iostats_context.h"
#include "rocksdb/listener.h"
#include "rocksdb/utilities/debug.h"
#include "test_util/mock_time_env.h"
@ -416,7 +417,7 @@ TEST_P(TieredCompactionTest, RangeBasedTieredStorageUniversal) {
// No data is moved from cold tier to hot tier because no input files from L5
// or higher, it's not safe to move data to output_to_penultimate_level level.
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
ASSERT_EQ("0,0,0,0,0,0,1", FilesPerLevel());
ASSERT_EQ("0,0,0,0,0,1", FilesPerLevel());
// Add 2 keys in higher level, but in separated files, all keys can be moved
// up if it's hot
@ -1488,6 +1489,528 @@ TEST_F(PrecludeLastLevelTest, SmallPrecludeTime) {
Close();
}
TEST_F(PrecludeLastLevelTest, LastLevelOnlyCompactionPartial) {
const int kNumTrigger = 4;
const int kNumLevels = 7;
const int kNumKeys = 100;
const int kKeyPerSec = 10;
Options options = CurrentOptions();
options.compaction_style = kCompactionStyleUniversal;
options.preserve_internal_time_seconds = 2000;
options.env = mock_env_.get();
options.level0_file_num_compaction_trigger = kNumTrigger;
options.num_levels = kNumLevels;
DestroyAndReopen(options);
// pass some time first, otherwise the first a few keys write time are going
// to be zero, and internally zero has special meaning: kUnknownSeqnoTime
dbfull()->TEST_WaitForPeridicTaskRun(
[&] { mock_clock_->MockSleepForSeconds(static_cast<int>(kKeyPerSec)); });
int sst_num = 0;
// Write files that are overlap and enough to trigger compaction
for (; sst_num < kNumTrigger; sst_num++) {
for (int i = 0; i < kNumKeys; i++) {
ASSERT_OK(Put(Key(sst_num * (kNumKeys - 1) + i), "value"));
dbfull()->TEST_WaitForPeridicTaskRun([&] {
mock_clock_->MockSleepForSeconds(static_cast<int>(kKeyPerSec));
});
}
ASSERT_OK(Flush());
}
ASSERT_OK(dbfull()->WaitForCompact(true));
// all data is pushed to the last level
ASSERT_EQ("0,0,0,0,0,0,1", FilesPerLevel());
// enable preclude feature
options.preclude_last_level_data_seconds = 2000;
options.last_level_temperature = Temperature::kCold;
Reopen(options);
CompactRangeOptions cro;
cro.bottommost_level_compaction = BottommostLevelCompaction::kForce;
ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
// some data are moved up, some are not
ASSERT_EQ("0,0,0,0,0,1,1", FilesPerLevel());
ASSERT_GT(GetSstSizeHelper(Temperature::kCold), 0);
ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), 0);
std::vector<KeyVersion> key_versions;
ASSERT_OK(GetAllKeyVersions(db_, Slice(), Slice(),
std::numeric_limits<size_t>::max(),
&key_versions));
// make sure there're more than 300 keys and first 100 keys are having seqno
// zeroed out, the last 100 key seqno not zeroed out
ASSERT_GT(key_versions.size(), 300);
for (int i = 0; i < 100; i++) {
ASSERT_EQ(key_versions[i].sequence, 0);
}
auto rit = key_versions.rbegin();
for (int i = 0; i < 100; i++) {
ASSERT_GT(rit->sequence, 0);
rit++;
}
Close();
}
class PrecludeLastLevelTestWithParms
: public PrecludeLastLevelTest,
public testing::WithParamInterface<bool> {
public:
PrecludeLastLevelTestWithParms() : PrecludeLastLevelTest() {}
};
TEST_P(PrecludeLastLevelTestWithParms, LastLevelOnlyCompactionNoPreclude) {
const int kNumTrigger = 4;
const int kNumLevels = 7;
const int kNumKeys = 100;
const int kKeyPerSec = 10;
bool enable_preclude_last_level = GetParam();
Options options = CurrentOptions();
options.compaction_style = kCompactionStyleUniversal;
options.preserve_internal_time_seconds = 2000;
options.env = mock_env_.get();
options.level0_file_num_compaction_trigger = kNumTrigger;
options.num_levels = kNumLevels;
DestroyAndReopen(options);
// pass some time first, otherwise the first a few keys write time are going
// to be zero, and internally zero has special meaning: kUnknownSeqnoTime
dbfull()->TEST_WaitForPeridicTaskRun(
[&] { mock_clock_->MockSleepForSeconds(static_cast<int>(kKeyPerSec)); });
Random rnd(301);
int sst_num = 0;
// Write files that are overlap and enough to trigger compaction
for (; sst_num < kNumTrigger; sst_num++) {
for (int i = 0; i < kNumKeys; i++) {
ASSERT_OK(Put(Key(sst_num * (kNumKeys - 1) + i), rnd.RandomString(100)));
dbfull()->TEST_WaitForPeridicTaskRun([&] {
mock_clock_->MockSleepForSeconds(static_cast<int>(kKeyPerSec));
});
}
ASSERT_OK(Flush());
}
ASSERT_OK(dbfull()->WaitForCompact(true));
// all data is pushed to the last level
ASSERT_EQ("0,0,0,0,0,0,1", FilesPerLevel());
std::atomic_bool is_manual_compaction_running = false;
std::atomic_bool verified_compaction_order = false;
// Make sure the manual compaction is in progress and try to trigger a
// SizeRatio compaction by flushing 4 files to L0. The compaction will try to
// compact 4 files at L0 to L5 (the last empty level).
// If the preclude_last_feature is enabled, the auto triggered compaction
// cannot be picked. Otherwise, the auto triggered compaction can run in
// parallel with the last level compaction.
// L0: [a] [b] [c] [d]
// L5: (locked if preclude_last_level is enabled)
// L6: [z] (locked: manual compaction in progress)
// TODO: in this case, L0 files should just be compacted to L4, so the 2
// compactions won't be overlapped.
SyncPoint::GetInstance()->SetCallBack(
"CompactionJob::ProcessKeyValueCompaction()::Processing", [&](void* arg) {
auto compaction = static_cast<Compaction*>(arg);
if (compaction->is_manual_compaction()) {
is_manual_compaction_running = true;
TEST_SYNC_POINT(
"PrecludeLastLevelTest::LastLevelOnlyCompactionConflit:"
"ManualCompaction1");
TEST_SYNC_POINT(
"PrecludeLastLevelTest::LastLevelOnlyCompactionConflit:"
"ManualCompaction2");
is_manual_compaction_running = false;
}
});
SyncPoint::GetInstance()->SetCallBack(
"UniversalCompactionBuilder::PickCompaction:Return", [&](void* arg) {
auto compaction = static_cast<Compaction*>(arg);
if (enable_preclude_last_level && is_manual_compaction_running) {
ASSERT_TRUE(compaction == nullptr);
verified_compaction_order = true;
} else {
ASSERT_TRUE(compaction != nullptr);
verified_compaction_order = true;
}
if (!compaction || !compaction->is_manual_compaction()) {
TEST_SYNC_POINT(
"PrecludeLastLevelTest::LastLevelOnlyCompactionConflit:"
"AutoCompactionPicked");
}
});
SyncPoint::GetInstance()->LoadDependency({
{"PrecludeLastLevelTest::LastLevelOnlyCompactionConflit:"
"ManualCompaction1",
"PrecludeLastLevelTest::LastLevelOnlyCompactionConflit:StartWrite"},
{"PrecludeLastLevelTest::LastLevelOnlyCompactionConflit:"
"AutoCompactionPicked",
"PrecludeLastLevelTest::LastLevelOnlyCompactionConflit:"
"ManualCompaction2"},
});
SyncPoint::GetInstance()->EnableProcessing();
// only enable if the Parameter is true
if (enable_preclude_last_level) {
options.preclude_last_level_data_seconds = 2000;
}
options.max_background_jobs = 8;
options.last_level_temperature = Temperature::kCold;
Reopen(options);
auto manual_compaction_thread = port::Thread([this]() {
CompactRangeOptions cro;
cro.bottommost_level_compaction = BottommostLevelCompaction::kForce;
cro.exclusive_manual_compaction = false;
ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
});
TEST_SYNC_POINT(
"PrecludeLastLevelTest::LastLevelOnlyCompactionConflit:StartWrite");
auto stop_token =
dbfull()->TEST_write_controler().GetCompactionPressureToken();
for (; sst_num < kNumTrigger * 2; sst_num++) {
for (int i = 0; i < kNumKeys; i++) {
// the value needs to be big enough to trigger full compaction
ASSERT_OK(Put(Key(sst_num * (kNumKeys - 1) + i), "value"));
dbfull()->TEST_WaitForPeridicTaskRun([&] {
mock_clock_->MockSleepForSeconds(static_cast<int>(kKeyPerSec));
});
}
ASSERT_OK(Flush());
}
manual_compaction_thread.join();
ASSERT_OK(dbfull()->WaitForCompact(true));
if (enable_preclude_last_level) {
ASSERT_NE("0,0,0,0,0,1,1", FilesPerLevel());
} else {
ASSERT_EQ("0,0,0,0,0,1,1", FilesPerLevel());
}
ASSERT_TRUE(verified_compaction_order);
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
stop_token.reset();
Close();
}
INSTANTIATE_TEST_CASE_P(PrecludeLastLevelTestWithParms,
PrecludeLastLevelTestWithParms, testing::Bool());
// partition the SST into 3 ranges [0, 19] [20, 39] [40, ...]
class ThreeRangesPartitioner : public SstPartitioner {
public:
const char* Name() const override { return "SingleKeySstPartitioner"; }
PartitionerResult ShouldPartition(
const PartitionerRequest& request) override {
if ((cmp->CompareWithoutTimestamp(*request.current_user_key,
DBTestBase::Key(20)) >= 0 &&
cmp->CompareWithoutTimestamp(*request.prev_user_key,
DBTestBase::Key(20)) < 0) ||
(cmp->CompareWithoutTimestamp(*request.current_user_key,
DBTestBase::Key(40)) >= 0 &&
cmp->CompareWithoutTimestamp(*request.prev_user_key,
DBTestBase::Key(40)) < 0)) {
return kRequired;
} else {
return kNotRequired;
}
}
bool CanDoTrivialMove(const Slice& /*smallest_user_key*/,
const Slice& /*largest_user_key*/) override {
return false;
}
const Comparator* cmp = BytewiseComparator();
};
class ThreeRangesPartitionerFactory : public SstPartitionerFactory {
public:
static const char* kClassName() {
return "TombstoneTestSstPartitionerFactory";
}
const char* Name() const override { return kClassName(); }
std::unique_ptr<SstPartitioner> CreatePartitioner(
const SstPartitioner::Context& /* context */) const override {
return std::unique_ptr<SstPartitioner>(new ThreeRangesPartitioner());
}
};
TEST_F(PrecludeLastLevelTest, PartialPenultimateLevelCompaction) {
const int kNumTrigger = 4;
const int kNumLevels = 7;
const int kKeyPerSec = 10;
Options options = CurrentOptions();
options.compaction_style = kCompactionStyleUniversal;
options.env = mock_env_.get();
options.level0_file_num_compaction_trigger = kNumTrigger;
options.preserve_internal_time_seconds = 10000;
options.num_levels = kNumLevels;
DestroyAndReopen(options);
// pass some time first, otherwise the first a few keys write time are going
// to be zero, and internally zero has special meaning: kUnknownSeqnoTime
dbfull()->TEST_WaitForPeridicTaskRun(
[&] { mock_clock_->MockSleepForSeconds(static_cast<int>(10)); });
Random rnd(301);
for (int i = 0; i < 300; i++) {
ASSERT_OK(Put(Key(i), rnd.RandomString(100)));
dbfull()->TEST_WaitForPeridicTaskRun(
[&] { mock_clock_->MockSleepForSeconds(kKeyPerSec); });
}
ASSERT_OK(Flush());
CompactRangeOptions cro;
cro.bottommost_level_compaction = BottommostLevelCompaction::kForce;
ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
// make sure all data is compacted to the last level
ASSERT_EQ("0,0,0,0,0,0,1", FilesPerLevel());
// Create 3 L5 files
auto factory = std::make_shared<ThreeRangesPartitionerFactory>();
options.sst_partitioner_factory = factory;
Reopen(options);
for (int i = 0; i < kNumTrigger - 1; i++) {
for (int j = 0; j < 100; j++) {
ASSERT_OK(Put(Key(i * 100 + j), rnd.RandomString(10)));
}
ASSERT_OK(Flush());
}
ASSERT_OK(dbfull()->WaitForCompact(true));
// L5: [0,19] [20,39] [40,299]
// L6: [0, 299]
ASSERT_EQ("0,0,0,0,0,3,1", FilesPerLevel());
// enable tiered storage feature
options.preclude_last_level_data_seconds = 10000;
options.last_level_temperature = Temperature::kCold;
options.statistics = CreateDBStatistics();
Reopen(options);
ColumnFamilyMetaData meta;
db_->GetColumnFamilyMetaData(&meta);
ASSERT_EQ(meta.levels[5].files.size(), 3);
ASSERT_EQ(meta.levels[6].files.size(), 1);
ASSERT_EQ(meta.levels[6].files[0].smallestkey, Key(0));
ASSERT_EQ(meta.levels[6].files[0].largestkey, Key(299));
std::string file_path = meta.levels[5].files[1].db_path;
std::vector<std::string> files;
// pick 3rd file @L5 + file@L6 for compaction
files.push_back(file_path + "/" + meta.levels[5].files[2].name);
files.push_back(file_path + "/" + meta.levels[6].files[0].name);
ASSERT_OK(db_->CompactFiles(CompactionOptions(), files, 6));
// The compaction only moved partial of the hot data to hot tier, range[0,39]
// is unsafe to move up, otherwise, they will be overlapped with the existing
// files@L5.
// The output should be:
// L5: [0,19] [20,39] [40,299] <-- Temperature::kUnknown
// L6: [0,19] [20,39] <-- Temperature::kCold
// L6 file is split because of the customized partitioner
ASSERT_EQ("0,0,0,0,0,3,2", FilesPerLevel());
// even all the data is hot, but not all data are moved to the hot tier
ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), 0);
ASSERT_GT(GetSstSizeHelper(Temperature::kCold), 0);
db_->GetColumnFamilyMetaData(&meta);
ASSERT_EQ(meta.levels[5].files.size(), 3);
ASSERT_EQ(meta.levels[6].files.size(), 2);
for (const auto& file : meta.levels[5].files) {
ASSERT_EQ(file.temperature, Temperature::kUnknown);
}
for (const auto& file : meta.levels[6].files) {
ASSERT_EQ(file.temperature, Temperature::kCold);
}
ASSERT_EQ(meta.levels[6].files[0].smallestkey, Key(0));
ASSERT_EQ(meta.levels[6].files[0].largestkey, Key(19));
ASSERT_EQ(meta.levels[6].files[1].smallestkey, Key(20));
ASSERT_EQ(meta.levels[6].files[1].largestkey, Key(39));
Close();
}
struct TestPropertiesCollector : public TablePropertiesCollector {
Status AddUserKey(const Slice& key, const Slice& /*value*/,
EntryType /*type*/, SequenceNumber /*seq*/,
uint64_t /*file_size*/) override {
if (cmp->Compare(key, DBTestBase::Key(100)) == 0) {
has_key_100 = true;
}
if (cmp->Compare(key, DBTestBase::Key(200)) == 0) {
has_key_200 = true;
}
return Status::OK();
}
const char* Name() const override { return "TestTablePropertiesCollector"; }
UserCollectedProperties GetReadableProperties() const override {
UserCollectedProperties ret;
return ret;
}
Status Finish(UserCollectedProperties* /*properties*/) override {
// The LSM tree would be like:
// L5: [0,19] [20,39] [40,299]
// L6: [0, 299]
// the 3rd file @L5 has both 100 and 200, which will be marked for
// compaction
// Also avoid marking flushed SST for compaction, which won't have both 100
// and 200
if (has_key_100 && has_key_200) {
need_compact_ = true;
} else {
need_compact_ = false;
}
has_key_100 = false;
has_key_200 = false;
return Status::OK();
}
bool NeedCompact() const override { return need_compact_; }
const Comparator* cmp = BytewiseComparator();
private:
bool has_key_100 = false;
bool has_key_200 = false;
bool need_compact_ = false;
};
class TestPropertiesCollectorFactory : public TablePropertiesCollectorFactory {
public:
TablePropertiesCollector* CreateTablePropertiesCollector(
TablePropertiesCollectorFactory::Context /*context*/) override {
return new TestPropertiesCollector;
}
const char* Name() const override { return "TestTablePropertiesCollector"; }
};
TEST_F(PrecludeLastLevelTest, PartialPenultimateLevelCompactionWithRangeDel) {
const int kNumTrigger = 4;
const int kNumLevels = 7;
const int kKeyPerSec = 10;
Options options = CurrentOptions();
options.compaction_style = kCompactionStyleUniversal;
options.env = mock_env_.get();
options.level0_file_num_compaction_trigger = kNumTrigger;
options.preserve_internal_time_seconds = 10000;
options.num_levels = kNumLevels;
// set a small max_compaction_bytes to avoid input level expansion
options.max_compaction_bytes = 30000;
options.ignore_max_compaction_bytes_for_input = false;
DestroyAndReopen(options);
// pass some time first, otherwise the first a few keys write time are going
// to be zero, and internally zero has special meaning: kUnknownSeqnoTime
dbfull()->TEST_WaitForPeridicTaskRun(
[&] { mock_clock_->MockSleepForSeconds(static_cast<int>(10)); });
Random rnd(301);
for (int i = 0; i < 300; i++) {
ASSERT_OK(Put(Key(i), rnd.RandomString(100)));
dbfull()->TEST_WaitForPeridicTaskRun(
[&] { mock_clock_->MockSleepForSeconds(kKeyPerSec); });
}
ASSERT_OK(Flush());
CompactRangeOptions cro;
cro.bottommost_level_compaction = BottommostLevelCompaction::kForce;
ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
// make sure all data is compacted to the last level
ASSERT_EQ("0,0,0,0,0,0,1", FilesPerLevel());
// Create 3 L5 files
auto factory = std::make_shared<ThreeRangesPartitionerFactory>();
options.sst_partitioner_factory = factory;
// the user defined properties_collector will mark the 3rd file for compaction
auto collector_factory = std::make_shared<TestPropertiesCollectorFactory>();
options.table_properties_collector_factories.resize(1);
options.table_properties_collector_factories[0] = collector_factory;
// enable tiered storage feature
options.preclude_last_level_data_seconds = 10000;
options.last_level_temperature = Temperature::kCold;
Reopen(options);
for (int i = 0; i < kNumTrigger - 2; i++) {
for (int j = 0; j < 100; j++) {
ASSERT_OK(Put(Key(i * 100 + j), rnd.RandomString(10)));
}
ASSERT_OK(Flush());
}
// make sure there is one and only one compaction supports per-key placement
// but has the penultimate level output disabled.
std::atomic_int per_key_comp_num = 0;
SyncPoint::GetInstance()->SetCallBack(
"UniversalCompactionBuilder::PickCompaction:Return", [&](void* arg) {
auto compaction = static_cast<Compaction*>(arg);
if (compaction->SupportsPerKeyPlacement()) {
ASSERT_EQ(compaction->GetPenultimateOutputRangeType(),
Compaction::PenultimateOutputRangeType::kDisabled);
per_key_comp_num++;
}
});
SyncPoint::GetInstance()->EnableProcessing();
for (int j = 0; j < 100; j++) {
ASSERT_OK(Put(Key(200 + j), rnd.RandomString(10)));
}
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
Key(32), Key(40)));
ASSERT_OK(Flush());
// Before the per-key placement compaction, the LSM tress should be like:
// L5: [0,19] [20,40] [40,299]
// L6: [0, 299]
// The 2nd file @L5 has the largest key 40 because of range del
ASSERT_OK(dbfull()->WaitForCompact(true));
ASSERT_EQ(per_key_comp_num, 1);
// the compaction won't move any data to the penultimate level
ASSERT_EQ("0,0,0,0,0,2,3", FilesPerLevel());
Close();
}
#endif // !defined(ROCKSDB_LITE)
} // namespace ROCKSDB_NAMESPACE

View File

@ -6923,7 +6923,7 @@ TEST_F(DBTest2, LastLevelTemperatureUniversal) {
ASSERT_EQ(size, 0);
ASSERT_EQ(iostats->file_io_stats_by_temperature.hot_file_read_count, 0);
ASSERT_EQ(iostats->file_io_stats_by_temperature.warm_file_read_count, 0);
ASSERT_EQ(iostats->file_io_stats_by_temperature.hot_file_read_count, 0);
ASSERT_EQ(iostats->file_io_stats_by_temperature.cold_file_read_count, 0);
ASSERT_EQ(options.statistics->getTickerCount(HOT_FILE_READ_BYTES), 0);
ASSERT_EQ(options.statistics->getTickerCount(WARM_FILE_READ_BYTES), 0);
ASSERT_EQ(options.statistics->getTickerCount(COLD_FILE_READ_BYTES), 0);