Refactor AddRangeDels() + consider range tombstone during compaction file cutting (#11113)

Summary:
A second attempt after https://github.com/facebook/rocksdb/issues/10802, with bug fixes and refactoring. This PR updates compaction logic to take range tombstones into account when determining whether to cut the current compaction output file (https://github.com/facebook/rocksdb/issues/4811). Before this change, only point keys were considered, and range tombstones could cause large compactions. For example, if the current compaction outputs is a range tombstone [a, b) and 2 point keys y, z, they would be added to the same file, and may overlap with too many files in the next level and cause a large compaction in the future. This PR also includes ajkr's effort to simplify the logic to add range tombstones to compaction output files in `AddRangeDels()` ([https://github.com/facebook/rocksdb/issues/11078](https://github.com/facebook/rocksdb/pull/11078#issuecomment-1386078861)).

The main change is for `CompactionIterator` to emit range tombstone start keys to be processed by `CompactionOutputs`. A new class `CompactionMergingIterator` is introduced to replace `MergingIterator` under `CompactionIterator` to enable emitting of range tombstone start keys. Further improvement after this PR include cutting compaction output at some grandparent boundary key (instead of the next output key) when cutting within a range tombstone to reduce overlap with grandparents.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/11113

Test Plan:
* added unit test in db_range_del_test
* crash test with a small key range: `python3 tools/db_crashtest.py blackbox --simple --max_key=100 --interval=600 --write_buffer_size=262144 --target_file_size_base=256 --max_bytes_for_level_base=262144 --block_size=128 --value_size_mult=33 --subcompactions=10 --use_multiget=1 --delpercent=3 --delrangepercent=2 --verify_iterator_with_expected_state_one_in=2 --num_iterations=10`

Reviewed By: ajkr

Differential Revision: D42655709

Pulled By: cbi42

fbshipit-source-id: 8367e36ef5640e8f21c14a3855d4a8d6e360a34c
This commit is contained in:
Changyu Bi 2023-02-22 12:28:18 -08:00 committed by Facebook GitHub Bot
parent 9fa9becf53
commit 229297d1b8
26 changed files with 1410 additions and 582 deletions

View file

@ -838,6 +838,7 @@ set(SOURCES
table/get_context.cc table/get_context.cc
table/iterator.cc table/iterator.cc
table/merging_iterator.cc table/merging_iterator.cc
table/compaction_merging_iterator.cc
table/meta_blocks.cc table/meta_blocks.cc
table/persistent_cache_helper.cc table/persistent_cache_helper.cc
table/plain/plain_table_bloom.cc table/plain/plain_table_bloom.cc

View file

@ -1,5 +1,7 @@
# Rocksdb Change Log # Rocksdb Change Log
## Unreleased ## Unreleased
### Behavior changes
* Compaction output file cutting logic now considers range tombstone start keys. For example, SST partitioner now may receive ParitionRequest for range tombstone start keys.
## 8.0.0 (02/19/2023) ## 8.0.0 (02/19/2023)
### Behavior changes ### Behavior changes

View file

@ -200,6 +200,7 @@ cpp_library_wrapper(name="rocksdb_lib", srcs=[
"table/block_based/reader_common.cc", "table/block_based/reader_common.cc",
"table/block_based/uncompression_dict_reader.cc", "table/block_based/uncompression_dict_reader.cc",
"table/block_fetcher.cc", "table/block_fetcher.cc",
"table/compaction_merging_iterator.cc",
"table/cuckoo/cuckoo_table_builder.cc", "table/cuckoo/cuckoo_table_builder.cc",
"table/cuckoo/cuckoo_table_factory.cc", "table/cuckoo/cuckoo_table_factory.cc",
"table/cuckoo/cuckoo_table_reader.cc", "table/cuckoo/cuckoo_table_reader.cc",
@ -543,6 +544,7 @@ cpp_library_wrapper(name="rocksdb_whole_archive_lib", srcs=[
"table/block_based/reader_common.cc", "table/block_based/reader_common.cc",
"table/block_based/uncompression_dict_reader.cc", "table/block_based/uncompression_dict_reader.cc",
"table/block_fetcher.cc", "table/block_fetcher.cc",
"table/compaction_merging_iterator.cc",
"table/cuckoo/cuckoo_table_builder.cc", "table/cuckoo/cuckoo_table_builder.cc",
"table/cuckoo/cuckoo_table_factory.cc", "table/cuckoo/cuckoo_table_factory.cc",
"table/cuckoo/cuckoo_table_reader.cc", "table/cuckoo/cuckoo_table_reader.cc",

View file

@ -123,6 +123,10 @@ class BlobCountingIterator : public InternalIterator {
return iter_->GetProperty(prop_name, prop); return iter_->GetProperty(prop_name, prop);
} }
bool IsDeleteRangeSentinelKey() const override {
return iter_->IsDeleteRangeSentinelKey();
}
private: private:
void UpdateAndCountBlobIfNeeded() { void UpdateAndCountBlobIfNeeded() {
assert(!iter_->Valid() || iter_->status().ok()); assert(!iter_->Valid() || iter_->status().ok());

View file

@ -188,6 +188,11 @@ class ClippingIterator : public InternalIterator {
return iter_->GetProperty(prop_name, prop); return iter_->GetProperty(prop_name, prop);
} }
bool IsDeleteRangeSentinelKey() const override {
assert(valid_);
return iter_->IsDeleteRangeSentinelKey();
}
private: private:
void UpdateValid() { void UpdateValid() {
assert(!iter_->Valid() || iter_->status().ok()); assert(!iter_->Valid() || iter_->status().ok());

View file

@ -464,6 +464,7 @@ void CompactionIterator::NextFromInput() {
value_ = input_.value(); value_ = input_.value();
blob_value_.Reset(); blob_value_.Reset();
iter_stats_.num_input_records++; iter_stats_.num_input_records++;
is_range_del_ = input_.IsDeleteRangeSentinelKey();
Status pik_status = ParseInternalKey(key_, &ikey_, allow_data_in_errors_); Status pik_status = ParseInternalKey(key_, &ikey_, allow_data_in_errors_);
if (!pik_status.ok()) { if (!pik_status.ok()) {
@ -483,7 +484,10 @@ void CompactionIterator::NextFromInput() {
break; break;
} }
TEST_SYNC_POINT_CALLBACK("CompactionIterator:ProcessKV", &ikey_); TEST_SYNC_POINT_CALLBACK("CompactionIterator:ProcessKV", &ikey_);
if (is_range_del_) {
validity_info_.SetValid(kRangeDeletion);
break;
}
// Update input statistics // Update input statistics
if (ikey_.type == kTypeDeletion || ikey_.type == kTypeSingleDeletion || if (ikey_.type == kTypeDeletion || ikey_.type == kTypeSingleDeletion ||
ikey_.type == kTypeDeletionWithTimestamp) { ikey_.type == kTypeDeletionWithTimestamp) {
@ -705,6 +709,14 @@ void CompactionIterator::NextFromInput() {
ParsedInternalKey next_ikey; ParsedInternalKey next_ikey;
AdvanceInputIter(); AdvanceInputIter();
while (input_.Valid() && input_.IsDeleteRangeSentinelKey() &&
ParseInternalKey(input_.key(), &next_ikey, allow_data_in_errors_)
.ok() &&
cmp_->EqualWithoutTimestamp(ikey_.user_key, next_ikey.user_key)) {
// skip range tombstone start keys with the same user key
// since they are not "real" point keys.
AdvanceInputIter();
}
// Check whether the next key exists, is not corrupt, and is the same key // Check whether the next key exists, is not corrupt, and is the same key
// as the single delete. // as the single delete.
@ -712,6 +724,7 @@ void CompactionIterator::NextFromInput() {
ParseInternalKey(input_.key(), &next_ikey, allow_data_in_errors_) ParseInternalKey(input_.key(), &next_ikey, allow_data_in_errors_)
.ok() && .ok() &&
cmp_->EqualWithoutTimestamp(ikey_.user_key, next_ikey.user_key)) { cmp_->EqualWithoutTimestamp(ikey_.user_key, next_ikey.user_key)) {
assert(!input_.IsDeleteRangeSentinelKey());
#ifndef NDEBUG #ifndef NDEBUG
const Compaction* c = const Compaction* c =
compaction_ ? compaction_->real_compaction() : nullptr; compaction_ ? compaction_->real_compaction() : nullptr;
@ -936,12 +949,14 @@ void CompactionIterator::NextFromInput() {
// Note that a deletion marker of type kTypeDeletionWithTimestamp will be // Note that a deletion marker of type kTypeDeletionWithTimestamp will be
// considered to have a different user key unless the timestamp is older // considered to have a different user key unless the timestamp is older
// than *full_history_ts_low_. // than *full_history_ts_low_.
//
// Range tombstone start keys are skipped as they are not "real" keys.
while (!IsPausingManualCompaction() && !IsShuttingDown() && while (!IsPausingManualCompaction() && !IsShuttingDown() &&
input_.Valid() && input_.Valid() &&
(ParseInternalKey(input_.key(), &next_ikey, allow_data_in_errors_) (ParseInternalKey(input_.key(), &next_ikey, allow_data_in_errors_)
.ok()) && .ok()) &&
cmp_->EqualWithoutTimestamp(ikey_.user_key, next_ikey.user_key) && cmp_->EqualWithoutTimestamp(ikey_.user_key, next_ikey.user_key) &&
(prev_snapshot == 0 || (prev_snapshot == 0 || input_.IsDeleteRangeSentinelKey() ||
DefinitelyNotInSnapshot(next_ikey.sequence, prev_snapshot))) { DefinitelyNotInSnapshot(next_ikey.sequence, prev_snapshot))) {
AdvanceInputIter(); AdvanceInputIter();
} }
@ -1235,11 +1250,13 @@ void CompactionIterator::DecideOutputLevel() {
void CompactionIterator::PrepareOutput() { void CompactionIterator::PrepareOutput() {
if (Valid()) { if (Valid()) {
if (LIKELY(!is_range_del_)) {
if (ikey_.type == kTypeValue) { if (ikey_.type == kTypeValue) {
ExtractLargeValueIfNeeded(); ExtractLargeValueIfNeeded();
} else if (ikey_.type == kTypeBlobIndex) { } else if (ikey_.type == kTypeBlobIndex) {
GarbageCollectBlobIfNeeded(); GarbageCollectBlobIfNeeded();
} }
}
if (compaction_ != nullptr && compaction_->SupportsPerKeyPlacement()) { if (compaction_ != nullptr && compaction_->SupportsPerKeyPlacement()) {
DecideOutputLevel(); DecideOutputLevel();
@ -1261,7 +1278,7 @@ void CompactionIterator::PrepareOutput() {
DefinitelyInSnapshot(ikey_.sequence, earliest_snapshot_) && DefinitelyInSnapshot(ikey_.sequence, earliest_snapshot_) &&
ikey_.type != kTypeMerge && current_key_committed_ && ikey_.type != kTypeMerge && current_key_committed_ &&
!output_to_penultimate_level_ && !output_to_penultimate_level_ &&
ikey_.sequence < preserve_time_min_seqno_) { ikey_.sequence < preserve_time_min_seqno_ && !is_range_del_) {
if (ikey_.type == kTypeDeletion || if (ikey_.type == kTypeDeletion ||
(ikey_.type == kTypeSingleDeletion && timestamp_size_ == 0)) { (ikey_.type == kTypeSingleDeletion && timestamp_size_ == 0)) {
ROCKS_LOG_FATAL( ROCKS_LOG_FATAL(

View file

@ -63,6 +63,10 @@ class SequenceIterWrapper : public InternalIterator {
void SeekToLast() override { assert(false); } void SeekToLast() override { assert(false); }
uint64_t num_itered() const { return num_itered_; } uint64_t num_itered() const { return num_itered_; }
bool IsDeleteRangeSentinelKey() const override {
assert(Valid());
return inner_iter_->IsDeleteRangeSentinelKey();
}
private: private:
InternalKeyComparator icmp_; InternalKeyComparator icmp_;
@ -242,7 +246,12 @@ class CompactionIterator {
const Status& status() const { return status_; } const Status& status() const { return status_; }
const ParsedInternalKey& ikey() const { return ikey_; } const ParsedInternalKey& ikey() const { return ikey_; }
inline bool Valid() const { return validity_info_.IsValid(); } inline bool Valid() const { return validity_info_.IsValid(); }
const Slice& user_key() const { return current_user_key_; } const Slice& user_key() const {
if (UNLIKELY(is_range_del_)) {
return ikey_.user_key;
}
return current_user_key_;
}
const CompactionIterationStats& iter_stats() const { return iter_stats_; } const CompactionIterationStats& iter_stats() const { return iter_stats_; }
uint64_t num_input_entry_scanned() const { return input_.num_itered(); } uint64_t num_input_entry_scanned() const { return input_.num_itered(); }
// If the current key should be placed on penultimate level, only valid if // If the current key should be placed on penultimate level, only valid if
@ -252,6 +261,8 @@ class CompactionIterator {
} }
Status InputStatus() const { return input_.status(); } Status InputStatus() const { return input_.status(); }
bool IsDeleteRangeSentinelKey() const { return is_range_del_; }
private: private:
// Processes the input stream to find the next output // Processes the input stream to find the next output
void NextFromInput(); void NextFromInput();
@ -385,6 +396,7 @@ class CompactionIterator {
kKeepSD = 8, kKeepSD = 8,
kKeepDel = 9, kKeepDel = 9,
kNewUserKey = 10, kNewUserKey = 10,
kRangeDeletion = 11,
}; };
struct ValidityInfo { struct ValidityInfo {
@ -493,6 +505,10 @@ class CompactionIterator {
// This is a best-effort facility, so memory_order_relaxed is sufficient. // This is a best-effort facility, so memory_order_relaxed is sufficient.
return manual_compaction_canceled_.load(std::memory_order_relaxed); return manual_compaction_canceled_.load(std::memory_order_relaxed);
} }
// Stores whether the current compaction iterator output
// is a range tombstone start key.
bool is_range_del_{false};
}; };
inline bool CompactionIterator::DefinitelyInSnapshot(SequenceNumber seq, inline bool CompactionIterator::DefinitelyInSnapshot(SequenceNumber seq,

View file

@ -1118,6 +1118,8 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
IterKey end_ikey; IterKey end_ikey;
Slice start_slice; Slice start_slice;
Slice end_slice; Slice end_slice;
Slice start_user_key{};
Slice end_user_key{};
static constexpr char kMaxTs[] = static constexpr char kMaxTs[] =
"\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff"; "\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff";
@ -1140,6 +1142,7 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
&ts_slice); &ts_slice);
} }
start_slice = start_ikey.GetInternalKey(); start_slice = start_ikey.GetInternalKey();
start_user_key = start_ikey.GetUserKey();
} }
if (end.has_value()) { if (end.has_value()) {
end_ikey.SetInternalKey(end.value(), kMaxSequenceNumber, kValueTypeForSeek); end_ikey.SetInternalKey(end.value(), kMaxSequenceNumber, kValueTypeForSeek);
@ -1148,6 +1151,7 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
&ts_slice); &ts_slice);
} }
end_slice = end_ikey.GetInternalKey(); end_slice = end_ikey.GetInternalKey();
end_user_key = end_ikey.GetUserKey();
} }
std::unique_ptr<InternalIterator> clip; std::unique_ptr<InternalIterator> clip;
@ -1263,11 +1267,15 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
[this, sub_compact](CompactionOutputs& outputs) { [this, sub_compact](CompactionOutputs& outputs) {
return this->OpenCompactionOutputFile(sub_compact, outputs); return this->OpenCompactionOutputFile(sub_compact, outputs);
}; };
const CompactionFileCloseFunc close_file_func = const CompactionFileCloseFunc close_file_func =
[this, sub_compact](CompactionOutputs& outputs, const Status& status, [this, sub_compact, start_user_key, end_user_key](
CompactionOutputs& outputs, const Status& status,
const Slice& next_table_min_key) { const Slice& next_table_min_key) {
return this->FinishCompactionOutputFile(status, sub_compact, outputs, return this->FinishCompactionOutputFile(
next_table_min_key); status, sub_compact, outputs, next_table_min_key,
sub_compact->start.has_value() ? &start_user_key : nullptr,
sub_compact->end.has_value() ? &end_user_key : nullptr);
}; };
Status status; Status status;
@ -1278,7 +1286,6 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
while (status.ok() && !cfd->IsDropped() && c_iter->Valid()) { while (status.ok() && !cfd->IsDropped() && c_iter->Valid()) {
// Invariant: c_iter.status() is guaranteed to be OK if c_iter->Valid() // Invariant: c_iter.status() is guaranteed to be OK if c_iter->Valid()
// returns true. // returns true.
assert(!end.has_value() || cfd->user_comparator()->Compare( assert(!end.has_value() || cfd->user_comparator()->Compare(
c_iter->user_key(), end.value()) < 0); c_iter->user_key(), end.value()) < 0);
@ -1458,7 +1465,8 @@ void CompactionJob::RecordDroppedKeys(
Status CompactionJob::FinishCompactionOutputFile( Status CompactionJob::FinishCompactionOutputFile(
const Status& input_status, SubcompactionState* sub_compact, const Status& input_status, SubcompactionState* sub_compact,
CompactionOutputs& outputs, const Slice& next_table_min_key) { CompactionOutputs& outputs, const Slice& next_table_min_key,
const Slice* comp_start_user_key, const Slice* comp_end_user_key) {
AutoThreadOperationStageUpdater stage_updater( AutoThreadOperationStageUpdater stage_updater(
ThreadStatus::STAGE_COMPACTION_SYNC_FILE); ThreadStatus::STAGE_COMPACTION_SYNC_FILE);
assert(sub_compact != nullptr); assert(sub_compact != nullptr);
@ -1488,12 +1496,10 @@ Status CompactionJob::FinishCompactionOutputFile(
// output_to_penultimate_level compaction here, as it's only used to decide // output_to_penultimate_level compaction here, as it's only used to decide
// if range dels could be dropped. // if range dels could be dropped.
if (outputs.HasRangeDel()) { if (outputs.HasRangeDel()) {
s = outputs.AddRangeDels( s = outputs.AddRangeDels(comp_start_user_key, comp_end_user_key,
sub_compact->start.has_value() ? &(sub_compact->start.value()) range_del_out_stats, bottommost_level_,
: nullptr, cfd->internal_comparator(), earliest_snapshot,
sub_compact->end.has_value() ? &(sub_compact->end.value()) : nullptr, next_table_min_key, full_history_ts_low_);
range_del_out_stats, bottommost_level_, cfd->internal_comparator(),
earliest_snapshot, next_table_min_key, full_history_ts_low_);
} }
RecordDroppedKeys(range_del_out_stats, &sub_compact->compaction_job_stats); RecordDroppedKeys(range_del_out_stats, &sub_compact->compaction_job_stats);
TEST_SYNC_POINT("CompactionJob::FinishCompactionOutputFile1"); TEST_SYNC_POINT("CompactionJob::FinishCompactionOutputFile1");

View file

@ -256,7 +256,9 @@ class CompactionJob {
Status FinishCompactionOutputFile(const Status& input_status, Status FinishCompactionOutputFile(const Status& input_status,
SubcompactionState* sub_compact, SubcompactionState* sub_compact,
CompactionOutputs& outputs, CompactionOutputs& outputs,
const Slice& next_table_min_key); const Slice& next_table_min_key,
const Slice* comp_start_user_key,
const Slice* comp_end_user_key);
Status InstallCompactionResults(const MutableCFOptions& mutable_cf_options); Status InstallCompactionResults(const MutableCFOptions& mutable_cf_options);
Status OpenCompactionOutputFile(SubcompactionState* sub_compact, Status OpenCompactionOutputFile(SubcompactionState* sub_compact,
CompactionOutputs& outputs); CompactionOutputs& outputs);

View file

@ -226,6 +226,15 @@ uint64_t CompactionOutputs::GetCurrentKeyGrandparentOverlappedBytes(
bool CompactionOutputs::ShouldStopBefore(const CompactionIterator& c_iter) { bool CompactionOutputs::ShouldStopBefore(const CompactionIterator& c_iter) {
assert(c_iter.Valid()); assert(c_iter.Valid());
const Slice& internal_key = c_iter.key(); const Slice& internal_key = c_iter.key();
#ifndef NDEBUG
bool should_stop = false;
std::pair<bool*, const Slice> p{&should_stop, internal_key};
TEST_SYNC_POINT_CALLBACK(
"CompactionOutputs::ShouldStopBefore::manual_decision", (void*)&p);
if (should_stop) {
return true;
}
#endif // NDEBUG
const uint64_t previous_overlapped_bytes = grandparent_overlapped_bytes_; const uint64_t previous_overlapped_bytes = grandparent_overlapped_bytes_;
const InternalKeyComparator* icmp = const InternalKeyComparator* icmp =
&compaction_->column_family_data()->internal_comparator(); &compaction_->column_family_data()->internal_comparator();
@ -347,8 +356,14 @@ Status CompactionOutputs::AddToOutput(
const CompactionFileOpenFunc& open_file_func, const CompactionFileOpenFunc& open_file_func,
const CompactionFileCloseFunc& close_file_func) { const CompactionFileCloseFunc& close_file_func) {
Status s; Status s;
bool is_range_del = c_iter.IsDeleteRangeSentinelKey();
if (is_range_del && compaction_->bottommost_level()) {
// We don't consider range tombstone for bottommost level since:
// 1. there is no grandparent and hence no overlap to consider
// 2. range tombstone may be dropped at bottommost level.
return s;
}
const Slice& key = c_iter.key(); const Slice& key = c_iter.key();
if (ShouldStopBefore(c_iter) && HasBuilder()) { if (ShouldStopBefore(c_iter) && HasBuilder()) {
s = close_file_func(*this, c_iter.InputStatus(), key); s = close_file_func(*this, c_iter.InputStatus(), key);
if (!s.ok()) { if (!s.ok()) {
@ -358,6 +373,13 @@ Status CompactionOutputs::AddToOutput(
grandparent_boundary_switched_num_ = 0; grandparent_boundary_switched_num_ = 0;
grandparent_overlapped_bytes_ = grandparent_overlapped_bytes_ =
GetCurrentKeyGrandparentOverlappedBytes(key); GetCurrentKeyGrandparentOverlappedBytes(key);
if (UNLIKELY(is_range_del)) {
// lower bound for this new output file, this is needed as the lower bound
// does not come from the smallest point key in this case.
range_tombstone_lower_bound_.DecodeFrom(key);
} else {
range_tombstone_lower_bound_.Clear();
}
} }
// Open output file if necessary // Open output file if necessary
@ -368,6 +390,17 @@ Status CompactionOutputs::AddToOutput(
} }
} }
// c_iter may emit range deletion keys, so update `last_key_for_partitioner_`
// here before returning below when `is_range_del` is true
if (partitioner_) {
last_key_for_partitioner_.assign(c_iter.user_key().data_,
c_iter.user_key().size_);
}
if (UNLIKELY(is_range_del)) {
return s;
}
assert(builder_ != nullptr); assert(builder_ != nullptr);
const Slice& value = c_iter.value(); const Slice& value = c_iter.value();
s = current_output().validator.Add(key, value); s = current_output().validator.Add(key, value);
@ -391,28 +424,33 @@ Status CompactionOutputs::AddToOutput(
s = current_output().meta.UpdateBoundaries(key, value, ikey.sequence, s = current_output().meta.UpdateBoundaries(key, value, ikey.sequence,
ikey.type); ikey.type);
if (partitioner_) {
last_key_for_partitioner_.assign(c_iter.user_key().data_,
c_iter.user_key().size_);
}
return s; return s;
} }
namespace {
void SetMaxSeqAndTs(InternalKey& internal_key, const Slice& user_key,
const size_t ts_sz) {
if (ts_sz) {
static constexpr char kTsMax[] = "\xff\xff\xff\xff\xff\xff\xff\xff\xff";
if (ts_sz <= strlen(kTsMax)) {
internal_key = InternalKey(user_key, kMaxSequenceNumber,
kTypeRangeDeletion, Slice(kTsMax, ts_sz));
} else {
internal_key =
InternalKey(user_key, kMaxSequenceNumber, kTypeRangeDeletion,
std::string(ts_sz, '\xff'));
}
} else {
internal_key.Set(user_key, kMaxSequenceNumber, kTypeRangeDeletion);
}
}
} // namespace
Status CompactionOutputs::AddRangeDels( Status CompactionOutputs::AddRangeDels(
const Slice* comp_start_user_key, const Slice* comp_end_user_key, const Slice* comp_start_user_key, const Slice* comp_end_user_key,
CompactionIterationStats& range_del_out_stats, bool bottommost_level, CompactionIterationStats& range_del_out_stats, bool bottommost_level,
const InternalKeyComparator& icmp, SequenceNumber earliest_snapshot, const InternalKeyComparator& icmp, SequenceNumber earliest_snapshot,
const Slice& next_table_min_key, const std::string& full_history_ts_low) { const Slice& next_table_min_key, const std::string& full_history_ts_low) {
assert(HasRangeDel());
FileMetaData& meta = current_output().meta;
const Comparator* ucmp = icmp.user_comparator();
Slice lower_bound_guard, upper_bound_guard;
std::string smallest_user_key;
const Slice *lower_bound, *upper_bound;
bool lower_bound_from_sub_compact = false;
// The following example does not happen since // The following example does not happen since
// CompactionOutput::ShouldStopBefore() always return false for the first // CompactionOutput::ShouldStopBefore() always return false for the first
// point key. But we should consider removing this dependency. Suppose for the // point key. But we should consider removing this dependency. Suppose for the
@ -424,98 +462,134 @@ Status CompactionOutputs::AddRangeDels(
// Then meta.smallest will be set to comp_start_user_key@seqno // Then meta.smallest will be set to comp_start_user_key@seqno
// and meta.largest will be set to comp_start_user_key@kMaxSequenceNumber // and meta.largest will be set to comp_start_user_key@kMaxSequenceNumber
// which violates the assumption that meta.smallest should be <= meta.largest. // which violates the assumption that meta.smallest should be <= meta.largest.
assert(HasRangeDel());
FileMetaData& meta = current_output().meta;
const Comparator* ucmp = icmp.user_comparator();
InternalKey lower_bound_buf, upper_bound_buf;
Slice lower_bound_guard, upper_bound_guard;
std::string smallest_user_key;
const Slice *lower_bound, *upper_bound;
// We first determine the internal key lower_bound and upper_bound for
// this output file. All and only range tombstones that overlap with
// [lower_bound, upper_bound] should be added to this file. File
// boundaries (meta.smallest/largest) should be updated accordingly when
// extended by range tombstones.
size_t output_size = outputs_.size(); size_t output_size = outputs_.size();
if (output_size == 1) { if (output_size == 1) {
// For the first output table, include range tombstones before the min // This is the first file in the subcompaction.
// key but after the subcompaction boundary. //
lower_bound = comp_start_user_key; // When outputting a range tombstone that spans a subcompaction boundary,
lower_bound_from_sub_compact = true; // the files on either side of that boundary need to include that
} else if (meta.smallest.size() > 0) { // boundary's user key. Otherwise, the spanning range tombstone would lose
// For subsequent output tables, only include range tombstones from min // coverage.
// key onwards since the previous file was extended to contain range //
// tombstones falling before min key. // To achieve this while preventing files from overlapping in internal key
smallest_user_key = meta.smallest.user_key().ToString(false /*hex*/); // (an LSM invariant violation), we allow the earlier file to include the
lower_bound_guard = Slice(smallest_user_key); // boundary user key up to `kMaxSequenceNumber,kTypeRangeDeletion`. The
// later file can begin at the boundary user key at the newest key version
// it contains. At this point that version number is unknown since we have
// not processed the range tombstones yet, so permit any version. Same story
// applies to timestamp, and a non-nullptr `comp_start_user_key` should have
// `kMaxTs` here, which similarly permits any timestamp.
if (comp_start_user_key) {
lower_bound_buf.Set(*comp_start_user_key, kMaxSequenceNumber,
kTypeRangeDeletion);
lower_bound_guard = lower_bound_buf.Encode();
lower_bound = &lower_bound_guard; lower_bound = &lower_bound_guard;
} else { } else {
lower_bound = nullptr; lower_bound = nullptr;
} }
if (!next_table_min_key.empty()) {
// This may be the last file in the subcompaction in some cases, so we
// need to compare the end key of subcompaction with the next file start
// key. When the end key is chosen by the subcompaction, we know that
// it must be the biggest key in output file. Therefore, it is safe to
// use the smaller key as the upper bound of the output file, to ensure
// that there is no overlapping between different output files.
upper_bound_guard = ExtractUserKey(next_table_min_key);
if (comp_end_user_key != nullptr &&
ucmp->CompareWithoutTimestamp(upper_bound_guard, *comp_end_user_key) >=
0) {
upper_bound = comp_end_user_key;
} else { } else {
upper_bound = &upper_bound_guard; // For subsequent output tables, only include range tombstones from min
// key onwards since the previous file was extended to contain range
// tombstones falling before min key.
if (range_tombstone_lower_bound_.size() > 0) {
assert(meta.smallest.size() == 0 ||
icmp.Compare(range_tombstone_lower_bound_, meta.smallest) < 0);
lower_bound_guard = range_tombstone_lower_bound_.Encode();
} else {
assert(meta.smallest.size() > 0);
lower_bound_guard = meta.smallest.Encode();
} }
} else { lower_bound = &lower_bound_guard;
// This is the last file in the subcompaction, so extend until the
// subcompaction ends.
upper_bound = comp_end_user_key;
}
bool has_overlapping_endpoints;
if (upper_bound != nullptr && meta.largest.size() > 0) {
has_overlapping_endpoints = ucmp->CompareWithoutTimestamp(
meta.largest.user_key(), *upper_bound) == 0;
} else {
has_overlapping_endpoints = false;
} }
const size_t ts_sz = ucmp->timestamp_size();
if (next_table_min_key.empty()) {
// Last file of the subcompaction.
if (comp_end_user_key) {
upper_bound_buf.Set(*comp_end_user_key, kMaxSequenceNumber,
kTypeRangeDeletion);
upper_bound_guard = upper_bound_buf.Encode();
upper_bound = &upper_bound_guard;
} else {
upper_bound = nullptr;
}
} else {
// There is another file coming whose coverage will begin at
// `next_table_min_key`. The current file needs to extend range tombstone
// coverage through its own keys (through `meta.largest`) and through user
// keys preceding `next_table_min_key`'s user key.
ParsedInternalKey next_table_min_key_parsed;
ParseInternalKey(next_table_min_key, &next_table_min_key_parsed,
false /* log_err_key */)
.PermitUncheckedError();
assert(next_table_min_key_parsed.sequence < kMaxSequenceNumber);
assert(meta.largest.size() == 0 ||
icmp.Compare(meta.largest.Encode(), next_table_min_key) < 0);
assert(!lower_bound || icmp.Compare(*lower_bound, next_table_min_key) <= 0);
if (meta.largest.size() > 0 &&
ucmp->EqualWithoutTimestamp(meta.largest.user_key(),
next_table_min_key_parsed.user_key)) {
// Caution: this assumes meta.largest.Encode() lives longer than
// upper_bound, which is only true if meta.largest is never updated.
// This just happens to be the case here since meta.largest serves
// as the upper_bound.
upper_bound_guard = meta.largest.Encode();
} else {
SetMaxSeqAndTs(upper_bound_buf, next_table_min_key_parsed.user_key,
ts_sz);
upper_bound_guard = upper_bound_buf.Encode();
}
upper_bound = &upper_bound_guard;
}
if (lower_bound && upper_bound &&
icmp.Compare(*lower_bound, *upper_bound) > 0) {
assert(meta.smallest.size() == 0 &&
ucmp->EqualWithoutTimestamp(ExtractUserKey(*lower_bound),
ExtractUserKey(*upper_bound)));
// This can only happen when lower_bound have the same user key as
// next_table_min_key and that there is no point key in the current
// compaction output file.
return Status::OK();
}
// The end key of the subcompaction must be bigger or equal to the upper // The end key of the subcompaction must be bigger or equal to the upper
// bound. If the end of subcompaction is null or the upper bound is null, // bound. If the end of subcompaction is null or the upper bound is null,
// it means that this file is the last file in the compaction. So there // it means that this file is the last file in the compaction. So there
// will be no overlapping between this file and others. // will be no overlapping between this file and others.
assert(comp_end_user_key == nullptr || upper_bound == nullptr || assert(comp_end_user_key == nullptr || upper_bound == nullptr ||
ucmp->CompareWithoutTimestamp(*upper_bound, *comp_end_user_key) <= 0); ucmp->CompareWithoutTimestamp(ExtractUserKey(*upper_bound),
auto it = range_del_agg_->NewIterator(lower_bound, upper_bound, *comp_end_user_key) <= 0);
has_overlapping_endpoints); auto it = range_del_agg_->NewIterator(lower_bound, upper_bound);
// Position the range tombstone output iterator. There may be tombstone
// fragments that are entirely out of range, so make sure that we do not
// include those.
if (lower_bound != nullptr) {
it->Seek(*lower_bound);
} else {
it->SeekToFirst();
}
Slice last_tombstone_start_user_key{}; Slice last_tombstone_start_user_key{};
for (; it->Valid(); it->Next()) { bool reached_lower_bound = false;
for (it->SeekToFirst(); it->Valid(); it->Next()) {
auto tombstone = it->Tombstone(); auto tombstone = it->Tombstone();
if (upper_bound != nullptr) { auto kv = tombstone.Serialize();
int cmp = InternalKey tombstone_end = tombstone.SerializeEndKey();
ucmp->CompareWithoutTimestamp(*upper_bound, tombstone.start_key_); // TODO: the underlying iterator should support clamping the bounds.
// Tombstones starting after upper_bound only need to be included in // tombstone_end.Encode is of form user_key@kMaxSeqno
// the next table. // if it is equal to lower_bound, there is no need to include
// If the current SST ends before upper_bound, i.e., // such range tombstone.
// `has_overlapping_endpoints == false`, we can also skip over range if (!reached_lower_bound && lower_bound &&
// tombstones that start exactly at upper_bound. Such range icmp.Compare(tombstone_end.Encode(), *lower_bound) <= 0) {
// tombstones will be included in the next file and are not relevant continue;
// to the point keys or endpoints of the current file.
// If the current SST ends at the same user key at upper_bound,
// i.e., `has_overlapping_endpoints == true`, AND the tombstone has
// the same start key as upper_bound, i.e., cmp == 0, then
// the tombstone is relevant only if the tombstone's sequence number
// is no larger than this file's largest key's sequence number. This
// is because the upper bound to truncate this file's range tombstone
// will be meta.largest in this case, and any tombstone that starts after
// it will not be relevant.
if (cmp < 0) {
break;
} else if (cmp == 0) {
if (!has_overlapping_endpoints ||
tombstone.seq_ < GetInternalKeySeqno(meta.largest.Encode())) {
break;
}
}
} }
assert(!lower_bound ||
icmp.Compare(*lower_bound, tombstone_end.Encode()) <= 0);
reached_lower_bound = true;
const size_t ts_sz = ucmp->timestamp_size();
// Garbage collection for range tombstones. // Garbage collection for range tombstones.
// If user-defined timestamp is enabled, range tombstones are dropped if // If user-defined timestamp is enabled, range tombstones are dropped if
// they are at bottommost_level, below full_history_ts_low and not visible // they are at bottommost_level, below full_history_ts_low and not visible
@ -534,83 +608,93 @@ Status CompactionOutputs::AddRangeDels(
continue; continue;
} }
auto kv = tombstone.Serialize();
assert(lower_bound == nullptr || assert(lower_bound == nullptr ||
ucmp->CompareWithoutTimestamp(*lower_bound, kv.second) < 0); ucmp->CompareWithoutTimestamp(ExtractUserKey(*lower_bound),
kv.second) < 0);
InternalKey tombstone_start = kv.first;
if (lower_bound &&
ucmp->CompareWithoutTimestamp(tombstone_start.user_key(),
ExtractUserKey(*lower_bound)) < 0) {
// This just updates the non-timestamp portion of `tombstone_start`'s user
// key. Ideally there would be a simpler API usage
ParsedInternalKey tombstone_start_parsed;
ParseInternalKey(tombstone_start.Encode(), &tombstone_start_parsed,
false /* log_err_key */)
.PermitUncheckedError();
// timestamp should be from where sequence number is from, which is from
// tombstone in this case
std::string ts =
tombstone_start_parsed.GetTimestamp(ucmp->timestamp_size())
.ToString();
tombstone_start_parsed.user_key = ExtractUserKey(*lower_bound);
tombstone_start.SetFrom(tombstone_start_parsed, ts);
}
if (upper_bound != nullptr &&
icmp.Compare(*upper_bound, tombstone_start.Encode()) < 0) {
break;
}
// Here we show that *only* range tombstones that overlap with
// [lower_bound, upper_bound] are added to the current file, and
// sanity checking invariants that should hold:
// - [tombstone_start, tombstone_end] overlaps with [lower_bound,
// upper_bound]
// - meta.smallest <= meta.largest
// Corresponding assertions are made, the proof is broken is any of them
// fails.
// TODO: show that *all* range tombstones that overlap with
// [lower_bound, upper_bound] are added.
// TODO: some invariant about boundaries are correctly updated.
//
// Note that `tombstone_start` is updated in the if condition above, we use
// tombstone_start to refer to its initial value, i.e.,
// it->Tombstone().first, and use tombstone_start* to refer to its value
// after the update.
//
// To show [lower_bound, upper_bound] overlaps with [tombstone_start,
// tombstone_end]:
// lower_bound <= upper_bound from the if condition right after all
// bounds are initialized. We assume each tombstone fragment has
// start_key.user_key < end_key.user_key, so
// tombstone_start < tombstone_end by
// FragmentedTombstoneIterator::Tombstone(). So these two ranges are both
// non-emtpy. The flag `reached_lower_bound` and the if logic before it
// ensures lower_bound <= tombstone_end. tombstone_start is only updated
// if it has a smaller user_key than lower_bound user_key, so
// tombstone_start <= tombstone_start*. The above if condition implies
// tombstone_start* <= upper_bound. So we have
// tombstone_start <= upper_bound and lower_bound <= tombstone_end
// and the two ranges overlap.
//
// To show meta.smallest <= meta.largest:
// From the implementation of UpdateBoundariesForRange(), it suffices to
// prove that when it is first called in this function, its parameters
// satisfy `start <= end`, where start = max(tombstone_start*, lower_bound)
// and end = min(tombstone_end, upper_bound). From the above proof we have
// lower_bound <= tombstone_end and lower_bound <= upper_bound. We only need
// to show that tombstone_start* <= min(tombstone_end, upper_bound).
// Note that tombstone_start*.user_key = max(tombstone_start.user_key,
// lower_bound.user_key). Assuming tombstone_end always has
// kMaxSequenceNumber and lower_bound.seqno < kMaxSequenceNumber.
// Since lower_bound <= tombstone_end and lower_bound.seqno <
// tombstone_end.seqno (in absolute number order, not internal key order),
// lower_bound.user_key < tombstone_end.user_key.
// Since lower_bound.user_key < tombstone_end.user_key and
// tombstone_start.user_key < tombstone_end.user_key, tombstone_start* <
// tombstone_end. Since tombstone_start* <= upper_bound from the above proof
// and tombstone_start* < tombstone_end, tombstone_start* <=
// min(tombstone_end, upper_bound), so the two ranges overlap.
// Range tombstone is not supported by output validator yet. // Range tombstone is not supported by output validator yet.
builder_->Add(kv.first.Encode(), kv.second); builder_->Add(kv.first.Encode(), kv.second);
InternalKey tombstone_start = std::move(kv.first); if (lower_bound &&
InternalKey smallest_candidate{tombstone_start}; icmp.Compare(tombstone_start.Encode(), *lower_bound) < 0) {
if (lower_bound != nullptr && tombstone_start.DecodeFrom(*lower_bound);
ucmp->CompareWithoutTimestamp(smallest_candidate.user_key(),
*lower_bound) <= 0) {
// Pretend the smallest key has the same user key as lower_bound
// (the max key in the previous table or subcompaction) in order for
// files to appear key-space partitioned.
if (lower_bound_from_sub_compact) {
// When lower_bound is chosen by a subcompaction
// (lower_bound_from_sub_compact), we know that subcompactions over
// smaller keys cannot contain any keys at lower_bound. We also know
// that smaller subcompactions exist, because otherwise the
// subcompaction woud be unbounded on the left. As a result, we know
// that no other files on the output level will contain actual keys at
// lower_bound (an output file may have a largest key of
// lower_bound@kMaxSequenceNumber, but this only indicates a large range
// tombstone was truncated). Therefore, it is safe to use the
// tombstone's sequence number, to ensure that keys at lower_bound at
// lower levels are covered by truncated tombstones.
if (ts_sz) {
assert(tombstone.ts_.size() == ts_sz);
smallest_candidate = InternalKey(*lower_bound, tombstone.seq_,
kTypeRangeDeletion, tombstone.ts_);
} else {
smallest_candidate =
InternalKey(*lower_bound, tombstone.seq_, kTypeRangeDeletion);
} }
} else { if (upper_bound && icmp.Compare(*upper_bound, tombstone_end.Encode()) < 0) {
// If lower_bound was chosen by the smallest data key in the file, tombstone_end.DecodeFrom(*upper_bound);
// choose lowest seqnum so this file's smallest internal key comes
// after the previous file's largest. The fake seqnum is OK because
// the read path's file-picking code only considers user key.
smallest_candidate = InternalKey(*lower_bound, 0, kTypeRangeDeletion);
} }
} assert(icmp.Compare(tombstone_start, tombstone_end) <= 0);
InternalKey tombstone_end = tombstone.SerializeEndKey(); meta.UpdateBoundariesForRange(tombstone_start, tombstone_end,
InternalKey largest_candidate{tombstone_end};
if (upper_bound != nullptr &&
ucmp->CompareWithoutTimestamp(*upper_bound,
largest_candidate.user_key()) <= 0) {
// Pretend the largest key has the same user key as upper_bound (the
// min key in the following table or subcompaction) in order for files
// to appear key-space partitioned.
//
// Choose highest seqnum so this file's largest internal key comes
// before the next file's/subcompaction's smallest. The fake seqnum is
// OK because the read path's file-picking code only considers the
// user key portion.
//
// Note Seek() also creates InternalKey with (user_key,
// kMaxSequenceNumber), but with kTypeDeletion (0x7) instead of
// kTypeRangeDeletion (0xF), so the range tombstone comes before the
// Seek() key in InternalKey's ordering. So Seek() will look in the
// next file for the user key
if (ts_sz) {
static constexpr char kTsMax[] = "\xff\xff\xff\xff\xff\xff\xff\xff\xff";
if (ts_sz <= strlen(kTsMax)) {
largest_candidate =
InternalKey(*upper_bound, kMaxSequenceNumber, kTypeRangeDeletion,
Slice(kTsMax, ts_sz));
} else {
largest_candidate =
InternalKey(*upper_bound, kMaxSequenceNumber, kTypeRangeDeletion,
std::string(ts_sz, '\xff'));
}
} else {
largest_candidate =
InternalKey(*upper_bound, kMaxSequenceNumber, kTypeRangeDeletion);
}
}
meta.UpdateBoundariesForRange(smallest_candidate, largest_candidate,
tombstone.seq_, icmp); tombstone.seq_, icmp);
if (!bottommost_level) { if (!bottommost_level) {
bool start_user_key_changed = bool start_user_key_changed =
@ -618,17 +702,8 @@ Status CompactionOutputs::AddRangeDels(
ucmp->CompareWithoutTimestamp(last_tombstone_start_user_key, ucmp->CompareWithoutTimestamp(last_tombstone_start_user_key,
it->start_key()) < 0; it->start_key()) < 0;
last_tombstone_start_user_key = it->start_key(); last_tombstone_start_user_key = it->start_key();
// Range tombstones are truncated at file boundaries
if (icmp.Compare(tombstone_start, meta.smallest) < 0) {
tombstone_start = meta.smallest;
}
if (icmp.Compare(tombstone_end, meta.largest) > 0) {
tombstone_end = meta.largest;
}
// this assertion validates invariant (2) in the comment below.
assert(icmp.Compare(tombstone_start, tombstone_end) <= 0);
if (start_user_key_changed) { if (start_user_key_changed) {
// if tombstone_start >= tombstone_end, then either no key range is // If tombstone_start >= tombstone_end, then either no key range is
// covered, or that they have the same user key. If they have the same // covered, or that they have the same user key. If they have the same
// user key, then the internal key range should only be within this // user key, then the internal key range should only be within this
// level, and no keys from older levels is covered. // level, and no keys from older levels is covered.
@ -646,138 +721,6 @@ Status CompactionOutputs::AddRangeDels(
} }
} }
} }
// TODO: show invariants that ensure all necessary range tombstones are
// added
// and that file boundaries ensure no coverage is lost.
// Each range tombstone with internal key range [tombstone_start,
// tombstone_end] is being added to the current compaction output file here.
// The range tombstone is going to be truncated at range [meta.smallest,
// meta.largest] during reading/scanning. We should maintain invariants
// (1) meta.smallest <= meta.largest and,
// (2) [tombstone_start, tombstone_end] and [meta.smallest, meta.largest]
// overlaps, as there is no point adding range tombstone with a range
// outside the file's range.
// Since `tombstone_end` is always some user_key@kMaxSeqno, it is okay to
// use either open or closed range. Using closed range here to make
// reasoning easier, and it is more consistent with an ongoing work that
// tries to simplify this method.
//
// There are two cases:
// Case 1. Output file has no point key:
// First we show this case only happens when the entire compaction output
// is range tombstone only. This is true if CompactionIterator does not
// emit any point key. Suppose CompactionIterator emits some point key.
// Based on the assumption that CompactionOutputs::ShouldStopBefore()
// always return false for the first point key, the first compaction
// output file always contains a point key. Each new compaction output
// file is created if there is a point key for which ShouldStopBefore()
// returns true, and the point key would be added to the new compaction
// output file. So each new compaction file always contains a point key.
// So Case 1 only happens when CompactionIterator does not emit any
// point key.
//
// To show (1) meta.smallest <= meta.largest:
// Since the compaction output is range tombstone only, `lower_bound` and
// `upper_bound` are either null or comp_start/end_user_key respectively.
// According to how UpdateBoundariesForRange() is implemented, it blindly
// updates meta.smallest and meta.largest to smallest_candidate and
// largest_candidate the first time it is called. Subsequently, it
// compares input parameter with meta.smallest and meta.largest and only
// updates them when input is smaller/larger. So we only need to show
// smallest_candidate <= largest_candidate the first time
// UpdateBoundariesForRange() is called. Here we show something stronger
// that smallest_candidate.user_key < largest_candidate.user_key always
// hold for Case 1.
// We assume comp_start_user_key < comp_end_user_key, if provided. We
// assume that tombstone_start < tombstone_end. This assumption is based
// on that each fragment in FragmentedTombstoneList has
// start_key < end_key (user_key) and that
// FragmentedTombstoneIterator::Tombstone() returns the pair
// (start_key@tombstone_seqno with op_type kTypeRangeDeletion, end_key).
// The logic in this loop sets smallest_candidate to
// max(tombstone_start.user_key, comp_start_user_key)@tombstone.seq_ with
// op_type kTypeRangeDeletion, largest_candidate to
// min(tombstone_end.user_key, comp_end_user_key)@kMaxSequenceNumber with
// op_type kTypeRangeDeletion. When a bound is null, there is no
// truncation on that end. To show that smallest_candidate.user_key <
// largest_candidate.user_key, it suffices to show
// tombstone_start.user_key < comp_end_user_key (if not null) AND
// comp_start_user_key (if not null) < tombstone_end.user_key.
// Since the file has no point key, `has_overlapping_endpoints` is false.
// In the first sanity check of this for-loop, we compare
// tombstone_start.user_key against upper_bound = comp_end_user_key,
// and only proceed if tombstone_start.user_key < comp_end_user_key.
// We assume FragmentedTombstoneIterator::Seek(k) lands
// on a tombstone with end_key > k. So the call it->Seek(*lower_bound)
// above implies compact_start_user_key < tombstone_end.user_key.
//
// To show (2) [tombstone_start, tombstone_end] and [meta.smallest,
// meta.largest] overlaps (after the call to UpdateBoundariesForRange()):
// In the proof for (1) we have shown that
// smallest_candidate <= largest_candidate. Since tombstone_start <=
// smallest_candidate <= largest_candidate <= tombstone_end, for (2) to
// hold, it suffices to show that [smallest_candidate, largest_candidate]
// overlaps with [meta.smallest, meta.largest]. too.
// Given meta.smallest <= meta.largest shown above, we need to show
// that it is impossible to have largest_candidate < meta.smallest or
// meta.largest < smallest_candidate. If the above
// meta.UpdateBoundariesForRange(smallest_candidate, largest_candidate)
// updates meta.largest or meta.smallest, then the two ranges overlap.
// So we assume meta.UpdateBoundariesForRange(smallest_candidate,
// largest_candidate) did not update meta.smallest nor meta.largest, which
// means meta.smallest < smallest_candidate and largest_candidate <
// meta.largest.
//
// Case 2. Output file has >= 1 point key. This means meta.smallest and
// meta.largest are not empty when AddRangeDels() is called.
// To show (1) meta.smallest <= meta.largest:
// Assume meta.smallest <= meta.largest when AddRangeDels() is called,
// this follow from how UpdateBoundariesForRange() is implemented where it
// takes min or max to update meta.smallest or meta.largest.
//
// To show (2) [tombstone_start, tombstone_end] and [meta.smallest,
// meta.largest] overlaps (after the call to UpdateBoundariesForRange()):
// When smallest_candidate <= largest_candidate, the proof in Case 1
// applies, so we only need to show (2) holds when smallest_candidate >
// largest_candidate. When both bounds are either null or from
// subcompaction boundary, the proof in Case 1 applies, so we only need to
// show (2) holds when at least one bound is from a point key (either
// meta.smallest for lower bound or next_table_min_key for upper bound).
//
// Suppose lower bound is meta.smallest.user_key. The call
// it->Seek(*lower_bound) implies tombstone_end.user_key >
// meta.smallest.user_key. We have smallest_candidate.user_key =
// max(tombstone_start.user_key, meta.smallest.user_key). For
// smallest_candidate to be > largest_candidate, we need
// largest_candidate.user_key = upper_bound = smallest_candidate.user_key,
// where tombstone_end is truncated to largest_candidate.
// Subcase 1:
// Suppose largest_candidate.user_key = comp_end_user_key (there is no
// next point key). Subcompaction ensures any point key from this
// subcompaction has a user_key < comp_end_user_key, so 1)
// meta.smallest.user_key < comp_end_user_key, 2)
// `has_overlapping_endpoints` is false, and the first if condition in
// this for-loop ensures tombstone_start.user_key < comp_end_user_key. So
// smallest_candidate.user_key < largest_candidate.user_key. This case
// cannot happen when smallest > largest_candidate.
// Subcase 2:
// Suppose largest_candidate.user_key = next_table_min_key.user_key.
// The first if condition in this for-loop together with
// smallest_candidate.user_key = next_table_min_key.user_key =
// upper_bound implies `has_overlapping_endpoints` is true (so meta
// largest.user_key = upper_bound) and
// tombstone.seq_ < meta.largest.seqno. So
// tombstone_start < meta.largest < tombstone_end.
//
// Suppose lower bound is comp_start_user_key and upper_bound is
// next_table_min_key. The call it->Seek(*lower_bound) implies we have
// tombstone_end_key.user_key > comp_start_user_key. So
// tombstone_end_key.user_key > smallest_candidate.user_key. For
// smallest_candidate to be > largest_candidate, we need
// tombstone_start.user_key = largest_candidate.user_key = upper_bound =
// next_table_min_key.user_key. This means `has_overlapping_endpoints` is
// true (so meta.largest.user_key = upper_bound) and tombstone.seq_ <
// meta.largest.seqno. So tombstone_start < meta.largest < tombstone_end.
} }
return Status::OK(); return Status::OK();
} }

View file

@ -167,9 +167,15 @@ class CompactionOutputs {
current_output_file_size_ = 0; current_output_file_size_ = 0;
} }
// Add range-dels from the aggregator to the current output file // Add range deletions from the range_del_agg_ to the current output file.
// Input parameters, `range_tombstone_lower_bound_` and current output's
// metadata determine the bounds on range deletions to add. Updates output
// file metadata boundary if extended by range tombstones.
//
// @param comp_start_user_key and comp_end_user_key include timestamp if // @param comp_start_user_key and comp_end_user_key include timestamp if
// user-defined timestamp is enabled. // user-defined timestamp is enabled. Their timestamp should be max timestamp.
// @param next_table_min_key internal key lower bound for the next compaction
// output.
// @param full_history_ts_low used for range tombstone garbage collection. // @param full_history_ts_low used for range tombstone garbage collection.
Status AddRangeDels(const Slice* comp_start_user_key, Status AddRangeDels(const Slice* comp_start_user_key,
const Slice* comp_end_user_key, const Slice* comp_end_user_key,
@ -314,6 +320,7 @@ class CompactionOutputs {
std::unique_ptr<SstPartitioner> partitioner_; std::unique_ptr<SstPartitioner> partitioner_;
// A flag determines if this subcompaction has been split by the cursor // A flag determines if this subcompaction has been split by the cursor
// for RoundRobin compaction
bool is_split_ = false; bool is_split_ = false;
// We also maintain the output split key for each subcompaction to avoid // We also maintain the output split key for each subcompaction to avoid
@ -345,6 +352,10 @@ class CompactionOutputs {
// for the current output file, how many file boundaries has it crossed, // for the current output file, how many file boundaries has it crossed,
// basically number of files overlapped * 2 // basically number of files overlapped * 2
size_t grandparent_boundary_switched_num_ = 0; size_t grandparent_boundary_switched_num_ = 0;
// The smallest key of the current output file, this is set when current
// output file's smallest key is a range tombstone start key.
InternalKey range_tombstone_lower_bound_;
}; };
// helper struct to concatenate the last level and penultimate level outputs // helper struct to concatenate the last level and penultimate level outputs

View file

@ -84,6 +84,11 @@ class SubcompactionState {
// Assign range dels aggregator, for each range_del, it can only be assigned // Assign range dels aggregator, for each range_del, it can only be assigned
// to one output level, for per_key_placement, it's going to be the // to one output level, for per_key_placement, it's going to be the
// penultimate level. // penultimate level.
// TODO: This does not work for per_key_placement + user-defined timestamp +
// DeleteRange() combo. If user-defined timestamp is enabled,
// it is possible for a range tombstone to belong to bottommost level (
// seqno < earliest snapshot) without being dropped (garbage collection
// for user-defined timestamp).
void AssignRangeDelAggregator( void AssignRangeDelAggregator(
std::unique_ptr<CompactionRangeDelAggregator>&& range_del_agg) { std::unique_ptr<CompactionRangeDelAggregator>&& range_del_agg) {
if (compaction->SupportsPerKeyPlacement()) { if (compaction->SupportsPerKeyPlacement()) {

View file

@ -1661,6 +1661,217 @@ TEST_F(DBRangeDelTest, RangeTombstoneWrittenToMinimalSsts) {
ASSERT_EQ(1, num_range_deletions); ASSERT_EQ(1, num_range_deletions);
} }
TEST_F(DBRangeDelTest, LevelCompactOutputCutAtRangeTombstoneForTtlFiles) {
Options options = CurrentOptions();
options.compression = kNoCompression;
options.compaction_pri = kMinOverlappingRatio;
options.disable_auto_compactions = true;
options.ttl = 24 * 60 * 60; // 24 hours
options.target_file_size_base = 8 << 10;
env_->SetMockSleep();
options.env = env_;
DestroyAndReopen(options);
Random rnd(301);
// Fill some data so that future compactions are not bottommost level
// compaction, and hence they would try cut around files for ttl
for (int i = 5; i < 10; ++i) {
ASSERT_OK(Put(Key(i), rnd.RandomString(1 << 10)));
}
ASSERT_OK(Flush());
MoveFilesToLevel(3);
ASSERT_EQ("0,0,0,1", FilesPerLevel());
for (int i = 5; i < 10; ++i) {
ASSERT_OK(Put(Key(i), rnd.RandomString(1 << 10)));
}
ASSERT_OK(Flush());
MoveFilesToLevel(1);
ASSERT_EQ("0,1,0,1", FilesPerLevel());
env_->MockSleepForSeconds(20 * 60 * 60);
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
Key(11), Key(12)));
ASSERT_OK(Put(Key(0), rnd.RandomString(1 << 10)));
ASSERT_OK(Flush());
ASSERT_EQ("1,1,0,1", FilesPerLevel());
// L0 file is new, L1 and L3 file are old and qualified for TTL
env_->MockSleepForSeconds(10 * 60 * 60);
MoveFilesToLevel(1);
// L1 output should be cut into 3 files:
// File 0: Key(0)
// File 1: (qualified for TTL): Key(5) - Key(10)
// File 1: DeleteRange [11, 12)
ASSERT_EQ("0,3,0,1", FilesPerLevel());
}
// Test SST partitioner cut after every single key
class SingleKeySstPartitioner : public SstPartitioner {
public:
const char* Name() const override { return "SingleKeySstPartitioner"; }
PartitionerResult ShouldPartition(
const PartitionerRequest& /*request*/) override {
return kRequired;
}
bool CanDoTrivialMove(const Slice& /*smallest_user_key*/,
const Slice& /*largest_user_key*/) override {
return false;
}
};
class SingleKeySstPartitionerFactory : public SstPartitionerFactory {
public:
static const char* kClassName() { return "SingleKeySstPartitionerFactory"; }
const char* Name() const override { return kClassName(); }
std::unique_ptr<SstPartitioner> CreatePartitioner(
const SstPartitioner::Context& /* context */) const override {
return std::unique_ptr<SstPartitioner>(new SingleKeySstPartitioner());
}
};
TEST_F(DBRangeDelTest, CompactionEmitRangeTombstoneToSSTPartitioner) {
Options options = CurrentOptions();
auto factory = std::make_shared<SingleKeySstPartitionerFactory>();
options.sst_partitioner_factory = factory;
options.disable_auto_compactions = true;
DestroyAndReopen(options);
Random rnd(301);
// range deletion keys are not processed when compacting to bottommost level,
// so creating a file at older level to make the next compaction not
// bottommost level
ASSERT_OK(db_->Put(WriteOptions(), Key(4), rnd.RandomString(10)));
ASSERT_OK(Flush());
MoveFilesToLevel(5);
ASSERT_OK(db_->Put(WriteOptions(), Key(1), rnd.RandomString(10)));
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), Key(2),
Key(5)));
ASSERT_OK(Flush());
ASSERT_EQ(1, NumTableFilesAtLevel(0));
MoveFilesToLevel(1);
// SSTPartitioner decides to cut when range tombstone start key is passed to
// it. Note that the range tombstone [2, 5) itself span multiple keys, but we
// are not able to partition within its range yet.
ASSERT_EQ(2, NumTableFilesAtLevel(1));
}
TEST_F(DBRangeDelTest, OversizeCompactionGapBetweenPointKeyAndTombstone) {
// L2 has 2 files
// L2_0: 0, 1, 2, 3, 4
// L2_1: 5, 6, 7
// L0 has 1 file
// L0: 0, [5, 6), 8
// max_compaction_bytes is less than the size of L2_0 and L2_1.
// When compacting L0 into L1, it should split into 3 files:
// compaction output should cut before key 5 and key 8 to
// limit future compaction size.
const int kNumPerFile = 4, kNumFiles = 2;
Options options = CurrentOptions();
options.disable_auto_compactions = true;
options.target_file_size_base = 9 * 1024;
options.max_compaction_bytes = 9 * 1024;
DestroyAndReopen(options);
Random rnd(301);
for (int i = 0; i < kNumFiles; ++i) {
std::vector<std::string> values;
for (int j = 0; j < kNumPerFile; j++) {
values.push_back(rnd.RandomString(3 << 10));
ASSERT_OK(Put(Key(i * kNumPerFile + j), values[j]));
}
}
ASSERT_OK(db_->Flush(FlushOptions()));
ASSERT_EQ(1, NumTableFilesAtLevel(0));
MoveFilesToLevel(2);
ASSERT_EQ(2, NumTableFilesAtLevel(2));
ASSERT_OK(Put(Key(0), rnd.RandomString(1 << 10)));
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), Key(5),
Key(6)));
ASSERT_OK(Put(Key(8), rnd.RandomString(1 << 10)));
ASSERT_OK(db_->Flush(FlushOptions()));
ASSERT_EQ(1, NumTableFilesAtLevel(0));
ASSERT_OK(dbfull()->TEST_CompactRange(0, nullptr, nullptr, nullptr,
true /* disallow_trivial_move */));
ASSERT_EQ(3, NumTableFilesAtLevel(1));
}
TEST_F(DBRangeDelTest, OversizeCompactionGapBetweenTombstone) {
// L2 has two files
// L2_0: 0, 1, 2, 3, 4. L2_1: 5, 6, 7
// L0 has two range tombstones [0, 1), [7, 8).
// max_compaction_bytes is less than the size of L2_0.
// When compacting L0 into L1, the two range tombstones should be
// split into two files.
const int kNumPerFile = 4, kNumFiles = 2;
Options options = CurrentOptions();
options.disable_auto_compactions = true;
options.target_file_size_base = 9 * 1024;
options.max_compaction_bytes = 9 * 1024;
DestroyAndReopen(options);
Random rnd(301);
for (int i = 0; i < kNumFiles; ++i) {
std::vector<std::string> values;
// Write 12K (4 values, each 3K)
for (int j = 0; j < kNumPerFile; j++) {
values.push_back(rnd.RandomString(3 << 10));
ASSERT_OK(Put(Key(i * kNumPerFile + j), values[j]));
}
}
ASSERT_OK(db_->Flush(FlushOptions()));
ASSERT_EQ(1, NumTableFilesAtLevel(0));
MoveFilesToLevel(2);
ASSERT_EQ(2, NumTableFilesAtLevel(2));
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), Key(0),
Key(1)));
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), Key(7),
Key(8)));
ASSERT_OK(db_->Flush(FlushOptions()));
ASSERT_EQ(1, NumTableFilesAtLevel(0));
ASSERT_OK(dbfull()->TEST_CompactRange(0, nullptr, nullptr, nullptr,
true /* disallow_trivial_move */));
// This is L0 -> L1 compaction
// The two range tombstones are broken up into two output files
// to limit compaction size.
ASSERT_EQ(2, NumTableFilesAtLevel(1));
}
TEST_F(DBRangeDelTest, OversizeCompactionPointKeyWithinRangetombstone) {
// L2 has two files
// L2_0: 0, 1, 2, 3, 4. L2_1: 6, 7, 8
// L0 has [0, 9) and point key 5
// max_compaction_bytes is less than the size of L2_0.
// When compacting L0 into L1, the compaction should cut at point key 5.
Options options = CurrentOptions();
options.disable_auto_compactions = true;
options.target_file_size_base = 9 * 1024;
options.max_compaction_bytes = 9 * 1024;
DestroyAndReopen(options);
Random rnd(301);
for (int i = 0; i < 9; ++i) {
if (i == 5) {
++i;
}
ASSERT_OK(Put(Key(i), rnd.RandomString(3 << 10)));
}
ASSERT_OK(db_->Flush(FlushOptions()));
ASSERT_EQ(1, NumTableFilesAtLevel(0));
MoveFilesToLevel(2);
ASSERT_EQ(2, NumTableFilesAtLevel(2));
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), Key(0),
Key(9)));
ASSERT_OK(Put(Key(5), rnd.RandomString(1 << 10)));
ASSERT_OK(db_->Flush(FlushOptions()));
ASSERT_EQ(1, NumTableFilesAtLevel(0));
ASSERT_OK(dbfull()->TEST_CompactRange(0, nullptr, nullptr, nullptr,
true /* disallow_trivial_move */));
ASSERT_EQ(2, NumTableFilesAtLevel(1));
}
TEST_F(DBRangeDelTest, OverlappedTombstones) { TEST_F(DBRangeDelTest, OverlappedTombstones) {
const int kNumPerFile = 4, kNumFiles = 2; const int kNumPerFile = 4, kNumFiles = 2;
Options options = CurrentOptions(); Options options = CurrentOptions();
@ -2093,6 +2304,7 @@ TEST_F(DBRangeDelTest, NonOverlappingTombstonAtBoundary) {
options.compression = kNoCompression; options.compression = kNoCompression;
options.disable_auto_compactions = true; options.disable_auto_compactions = true;
options.target_file_size_base = 2 * 1024; options.target_file_size_base = 2 * 1024;
options.level_compaction_dynamic_file_size = false;
DestroyAndReopen(options); DestroyAndReopen(options);
Random rnd(301); Random rnd(301);
@ -2508,7 +2720,7 @@ TEST_F(DBRangeDelTest, LeftSentinelKeyTest) {
options.compression = kNoCompression; options.compression = kNoCompression;
options.disable_auto_compactions = true; options.disable_auto_compactions = true;
options.target_file_size_base = 3 * 1024; options.target_file_size_base = 3 * 1024;
options.max_compaction_bytes = 1024; options.max_compaction_bytes = 2048;
DestroyAndReopen(options); DestroyAndReopen(options);
// L2 // L2
@ -2554,7 +2766,7 @@ TEST_F(DBRangeDelTest, LeftSentinelKeyTestWithNewerKey) {
options.compression = kNoCompression; options.compression = kNoCompression;
options.disable_auto_compactions = true; options.disable_auto_compactions = true;
options.target_file_size_base = 3 * 1024; options.target_file_size_base = 3 * 1024;
options.max_compaction_bytes = 1024; options.max_compaction_bytes = 3 * 1024;
DestroyAndReopen(options); DestroyAndReopen(options);
// L2 // L2
@ -3015,6 +3227,183 @@ TEST_F(DBRangeDelTest, DoubleCountRangeTombstoneCompensatedSize) {
db_->ReleaseSnapshot(snapshot); db_->ReleaseSnapshot(snapshot);
} }
TEST_F(DBRangeDelTest, AddRangeDelsSameLowerAndUpperBound) {
// Test for an edge case where CompactionOutputs::AddRangeDels()
// is called with an empty range: `range_tombstone_lower_bound_` is not empty
// and have the same user_key and sequence number as `next_table_min_key.
// This used to cause file's smallest and largest key to be incorrectly set
// such that smallest > largest, and fail some assertions in iterator and/or
// assertion in VersionSet::ApproximateSize().
Options opts = CurrentOptions();
opts.disable_auto_compactions = true;
opts.target_file_size_base = 1 << 10;
opts.level_compaction_dynamic_file_size = false;
DestroyAndReopen(opts);
Random rnd(301);
// Create file at bottommost level so the manual compaction below is
// non-bottommost level and goes through code path like compensate range
// tombstone size.
ASSERT_OK(Put(Key(1), "v1"));
ASSERT_OK(Put(Key(4), "v2"));
ASSERT_OK(Flush());
MoveFilesToLevel(6);
ASSERT_OK(Put(Key(1), rnd.RandomString(4 << 10)));
ASSERT_OK(Put(Key(3), rnd.RandomString(4 << 10)));
// So Key(3) does not get dropped.
const Snapshot* snapshot = db_->GetSnapshot();
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), Key(2),
Key(4)));
ASSERT_OK(Flush());
ASSERT_OK(Put(Key(3), rnd.RandomString(4 << 10)));
ASSERT_OK(Put(Key(4), rnd.RandomString(4 << 10)));
ASSERT_OK(Flush());
MoveFilesToLevel(1);
// Each file will have two keys, with Key(3) straddle between two files.
// File 1: Key(1)@1, Key(3)@6, DeleteRange ends at Key(3)@6
// File 2: Key(3)@4, Key(4)@7, DeleteRange start from Key(3)@4
ASSERT_EQ(NumTableFilesAtLevel(1), 2);
// Manually update compaction output file cutting decisions
// to cut before range tombstone sentinel Key(3)@4
// and the point key Key(3)@4 itself
SyncPoint::GetInstance()->SetCallBack(
"CompactionOutputs::ShouldStopBefore::manual_decision", [opts](void* p) {
auto* pair = (std::pair<bool*, const Slice>*)p;
if ((opts.comparator->Compare(ExtractUserKey(pair->second), Key(3)) ==
0) &&
(GetInternalKeySeqno(pair->second) <= 4)) {
*(pair->first) = true;
}
});
SyncPoint::GetInstance()->EnableProcessing();
std::string begin_key = Key(0);
std::string end_key = Key(5);
Slice begin_slice{begin_key};
Slice end_slice{end_key};
ASSERT_OK(dbfull()->RunManualCompaction(
static_cast_with_check<ColumnFamilyHandleImpl>(db_->DefaultColumnFamily())
->cfd(),
1, 2, CompactRangeOptions(), &begin_slice, &end_slice, true,
true /* disallow_trivial_move */,
std::numeric_limits<uint64_t>::max() /*max_file_num_to_ignore*/,
"" /*trim_ts*/));
// iterate through to check if any assertion breaks
std::unique_ptr<Iterator> iter{db_->NewIterator(ReadOptions())};
iter->SeekToFirst();
std::vector<int> expected{1, 3, 4};
for (auto i : expected) {
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->key(), Key(i));
iter->Next();
}
ASSERT_TRUE(iter->status().ok() && !iter->Valid());
db_->ReleaseSnapshot(snapshot);
}
TEST_F(DBRangeDelTest, AddRangeDelsSingleUserKeyTombstoneOnlyFile) {
// Test for an edge case where CompactionOutputs::AddRangeDels()
// is called with an SST file that has no point keys, and that
// the lower bound and upper bound have the same user key.
// This could cause a file's smallest and largest key to be incorrectly set
// such that smallest > largest, and fail some assertions in iterator and/or
// assertion in VersionSet::ApproximateSize().
Options opts = CurrentOptions();
opts.disable_auto_compactions = true;
opts.target_file_size_base = 1 << 10;
opts.level_compaction_dynamic_file_size = false;
DestroyAndReopen(opts);
Random rnd(301);
// Create file at bottommost level so the manual compaction below is
// non-bottommost level and goes through code path like compensate range
// tombstone size.
ASSERT_OK(Put(Key(1), "v1"));
ASSERT_OK(Put(Key(4), "v2"));
ASSERT_OK(Flush());
MoveFilesToLevel(6);
ASSERT_OK(Put(Key(1), rnd.RandomString(10)));
// Key(3)@4
ASSERT_OK(Put(Key(3), rnd.RandomString(10)));
const Snapshot* snapshot1 = db_->GetSnapshot();
// Key(3)@5
ASSERT_OK(Put(Key(3), rnd.RandomString(10)));
const Snapshot* snapshot2 = db_->GetSnapshot();
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), Key(2),
Key(4)));
// Key(3)@7
ASSERT_OK(Put(Key(3), rnd.RandomString(10)));
ASSERT_OK(Flush());
// L0 -> L1 compaction: cut output into two files:
// File 1: Key(1), Key(3)@7, Range tombstone ends at Key(3)@7
// File 2: Key(3)@5, Key(3)@4, Range tombstone starts from Key(3)@5
SyncPoint::GetInstance()->SetCallBack(
"CompactionOutputs::ShouldStopBefore::manual_decision", [opts](void* p) {
auto* pair = (std::pair<bool*, const Slice>*)p;
if ((opts.comparator->Compare(ExtractUserKey(pair->second), Key(3)) ==
0) &&
(GetInternalKeySeqno(pair->second) <= 6)) {
*(pair->first) = true;
SyncPoint::GetInstance()->DisableProcessing();
}
});
SyncPoint::GetInstance()->EnableProcessing();
std::string begin_key = Key(0);
std::string end_key = Key(5);
Slice begin_slice{begin_key};
Slice end_slice{end_key};
ASSERT_OK(dbfull()->RunManualCompaction(
static_cast_with_check<ColumnFamilyHandleImpl>(db_->DefaultColumnFamily())
->cfd(),
0, 1, CompactRangeOptions(), &begin_slice, &end_slice, true,
true /* disallow_trivial_move */,
std::numeric_limits<uint64_t>::max() /*max_file_num_to_ignore*/,
"" /*trim_ts*/));
ASSERT_EQ(NumTableFilesAtLevel(1), 2);
// L1 -> L2 compaction, drop the snapshot protecting Key(3)@5.
// Let ShouldStopBefore() return true for Key(3)@5 (delete range sentinel)
// and Key(3)@4.
// Output should have two files:
// File 1: Key(1), Key(3)@7, range tombstone ends at Key(3)@7
// File dropped: range tombstone only file (from Key(3)@5 to Key(3)@4)
// File 2: Range tombstone starting from Key(3)@4, Key(3)@4
db_->ReleaseSnapshot(snapshot2);
SyncPoint::GetInstance()->SetCallBack(
"CompactionOutputs::ShouldStopBefore::manual_decision", [opts](void* p) {
auto* pair = (std::pair<bool*, const Slice>*)p;
if ((opts.comparator->Compare(ExtractUserKey(pair->second), Key(3)) ==
0) &&
(GetInternalKeySeqno(pair->second) <= 6)) {
*(pair->first) = true;
}
});
SyncPoint::GetInstance()->EnableProcessing();
ASSERT_OK(dbfull()->RunManualCompaction(
static_cast_with_check<ColumnFamilyHandleImpl>(db_->DefaultColumnFamily())
->cfd(),
1, 2, CompactRangeOptions(), &begin_slice, &end_slice, true,
true /* disallow_trivial_move */,
std::numeric_limits<uint64_t>::max() /*max_file_num_to_ignore*/,
"" /*trim_ts*/));
ASSERT_EQ(NumTableFilesAtLevel(2), 2);
// iterate through to check if any assertion breaks
std::unique_ptr<Iterator> iter{db_->NewIterator(ReadOptions())};
iter->SeekToFirst();
std::vector<int> expected{1, 3, 4};
for (auto i : expected) {
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->key(), Key(i));
iter->Next();
}
ASSERT_TRUE(iter->status().ok() && !iter->Valid());
db_->ReleaseSnapshot(snapshot1);
}
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE

View file

@ -86,8 +86,10 @@ inline bool IsValueType(ValueType t) {
// Checks whether a type is from user operation // Checks whether a type is from user operation
// kTypeRangeDeletion is in meta block so this API is separated from above // kTypeRangeDeletion is in meta block so this API is separated from above
// kTypeMaxValid can be from keys generated by
// TruncatedRangeDelIterator::start_key()
inline bool IsExtendedValueType(ValueType t) { inline bool IsExtendedValueType(ValueType t) {
return IsValueType(t) || t == kTypeRangeDeletion; return IsValueType(t) || t == kTypeRangeDeletion || t == kTypeMaxValid;
} }
// We leave eight bits empty at the bottom so a type and sequence# // We leave eight bits empty at the bottom so a type and sequence#

View file

@ -82,6 +82,10 @@ class HistoryTrimmingIterator : public InternalIterator {
bool IsValuePinned() const override { return input_->IsValuePinned(); } bool IsValuePinned() const override { return input_->IsValuePinned(); }
bool IsDeleteRangeSentinelKey() const override {
return input_->IsDeleteRangeSentinelKey();
}
private: private:
InternalIterator* input_; InternalIterator* input_;
const std::string filter_ts_; const std::string filter_ts_;

View file

@ -231,6 +231,10 @@ Status MergeHelper::MergeUntil(InternalIterator* iter,
s = Status::ShutdownInProgress(); s = Status::ShutdownInProgress();
return s; return s;
} }
// Skip range tombstones emitted by the compaction iterator.
if (iter->IsDeleteRangeSentinelKey()) {
continue;
}
ParsedInternalKey ikey; ParsedInternalKey ikey;
assert(keys_.size() == merge_context_.GetNumOperands()); assert(keys_.size() == merge_context_.GetNumOperands());

View file

@ -36,6 +36,7 @@ TruncatedRangeDelIterator::TruncatedRangeDelIterator(
Status pik_status = ParseInternalKey(smallest->Encode(), &parsed_smallest, Status pik_status = ParseInternalKey(smallest->Encode(), &parsed_smallest,
false /* log_err_key */); // TODO false /* log_err_key */); // TODO
pik_status.PermitUncheckedError(); pik_status.PermitUncheckedError();
parsed_smallest.type = kTypeMaxValid;
assert(pik_status.ok()); assert(pik_status.ok());
smallest_ = &parsed_smallest; smallest_ = &parsed_smallest;
} }
@ -70,7 +71,7 @@ TruncatedRangeDelIterator::TruncatedRangeDelIterator(
parsed_largest.sequence -= 1; parsed_largest.sequence -= 1;
// This line is not needed for correctness, but it ensures that the // This line is not needed for correctness, but it ensures that the
// truncated end key is not covering keys from the next SST file. // truncated end key is not covering keys from the next SST file.
parsed_largest.type = kValueTypeForSeek; parsed_largest.type = kTypeMaxValid;
} }
largest_ = &parsed_largest; largest_ = &parsed_largest;
} }
@ -393,21 +394,20 @@ bool CompactionRangeDelAggregator::ShouldDelete(const ParsedInternalKey& parsed,
namespace { namespace {
// Produce a sorted (by start internal key) stream of range tombstones from // Produce a sorted (by start internal key) stream of range tombstones from
// `children`. lower_bound and upper_bound on user key can be // `children`. lower_bound and upper_bound on internal key can be
// optionally specified. Range tombstones that ends before lower_bound or starts // optionally specified. Range tombstones that ends before lower_bound or starts
// after upper_bound are excluded. // after upper_bound are excluded.
// If user-defined timestamp is enabled, lower_bound and upper_bound should // If user-defined timestamp is enabled, lower_bound and upper_bound should
// contain timestamp, but comparison is done ignoring timestamps. // contain timestamp.
class TruncatedRangeDelMergingIter : public InternalIterator { class TruncatedRangeDelMergingIter : public InternalIterator {
public: public:
TruncatedRangeDelMergingIter( TruncatedRangeDelMergingIter(
const InternalKeyComparator* icmp, const Slice* lower_bound, const InternalKeyComparator* icmp, const Slice* lower_bound,
const Slice* upper_bound, bool upper_bound_inclusive, const Slice* upper_bound,
const std::vector<std::unique_ptr<TruncatedRangeDelIterator>>& children) const std::vector<std::unique_ptr<TruncatedRangeDelIterator>>& children)
: icmp_(icmp), : icmp_(icmp),
lower_bound_(lower_bound), lower_bound_(lower_bound),
upper_bound_(upper_bound), upper_bound_(upper_bound),
upper_bound_inclusive_(upper_bound_inclusive),
heap_(StartKeyMinComparator(icmp)), heap_(StartKeyMinComparator(icmp)),
ts_sz_(icmp_->user_comparator()->timestamp_size()) { ts_sz_(icmp_->user_comparator()->timestamp_size()) {
for (auto& child : children) { for (auto& child : children) {
@ -420,7 +420,7 @@ class TruncatedRangeDelMergingIter : public InternalIterator {
} }
bool Valid() const override { bool Valid() const override {
return !heap_.empty() && BeforeEndKey(heap_.top()); return !heap_.empty() && !AfterEndKey(heap_.top());
} }
Status status() const override { return Status::OK(); } Status status() const override { return Status::OK(); }
@ -428,7 +428,13 @@ class TruncatedRangeDelMergingIter : public InternalIterator {
heap_.clear(); heap_.clear();
for (auto& child : children_) { for (auto& child : children_) {
if (lower_bound_ != nullptr) { if (lower_bound_ != nullptr) {
child->Seek(*lower_bound_); child->Seek(ExtractUserKey(*lower_bound_));
// Since the above `Seek()` operates on a user key while `lower_bound_`
// is an internal key, we may need to advance `child` farther for it to
// be in bounds.
while (child->Valid() && BeforeStartKey(child)) {
child->InternalNext();
}
} else { } else {
child->SeekToFirst(); child->SeekToFirst();
} }
@ -481,19 +487,23 @@ class TruncatedRangeDelMergingIter : public InternalIterator {
void SeekToLast() override { assert(false); } void SeekToLast() override { assert(false); }
private: private:
bool BeforeEndKey(const TruncatedRangeDelIterator* iter) const { bool BeforeStartKey(const TruncatedRangeDelIterator* iter) const {
if (upper_bound_ == nullptr) { if (lower_bound_ == nullptr) {
return true; return false;
} }
int cmp = icmp_->user_comparator()->CompareWithoutTimestamp( return icmp_->Compare(iter->end_key(), *lower_bound_) <= 0;
iter->start_key().user_key, *upper_bound_); }
return upper_bound_inclusive_ ? cmp <= 0 : cmp < 0;
bool AfterEndKey(const TruncatedRangeDelIterator* iter) const {
if (upper_bound_ == nullptr) {
return false;
}
return icmp_->Compare(iter->start_key(), *upper_bound_) > 0;
} }
const InternalKeyComparator* icmp_; const InternalKeyComparator* icmp_;
const Slice* lower_bound_; const Slice* lower_bound_;
const Slice* upper_bound_; const Slice* upper_bound_;
bool upper_bound_inclusive_;
BinaryHeap<TruncatedRangeDelIterator*, StartKeyMinComparator> heap_; BinaryHeap<TruncatedRangeDelIterator*, StartKeyMinComparator> heap_;
std::vector<TruncatedRangeDelIterator*> children_; std::vector<TruncatedRangeDelIterator*> children_;
@ -506,11 +516,10 @@ class TruncatedRangeDelMergingIter : public InternalIterator {
std::unique_ptr<FragmentedRangeTombstoneIterator> std::unique_ptr<FragmentedRangeTombstoneIterator>
CompactionRangeDelAggregator::NewIterator(const Slice* lower_bound, CompactionRangeDelAggregator::NewIterator(const Slice* lower_bound,
const Slice* upper_bound, const Slice* upper_bound) {
bool upper_bound_inclusive) {
InvalidateRangeDelMapPositions(); InvalidateRangeDelMapPositions();
auto merging_iter = std::make_unique<TruncatedRangeDelMergingIter>( auto merging_iter = std::make_unique<TruncatedRangeDelMergingIter>(
icmp_, lower_bound, upper_bound, upper_bound_inclusive, parent_iters_); icmp_, lower_bound, upper_bound, parent_iters_);
auto fragmented_tombstone_list = auto fragmented_tombstone_list =
std::make_shared<FragmentedRangeTombstoneList>( std::make_shared<FragmentedRangeTombstoneList>(

View file

@ -452,16 +452,15 @@ class CompactionRangeDelAggregator : public RangeDelAggregator {
} }
// Creates an iterator over all the range tombstones in the aggregator, for // Creates an iterator over all the range tombstones in the aggregator, for
// use in compaction. Nullptr arguments indicate that the iterator range is // use in compaction.
// unbounded. //
// NOTE: the boundaries are used for optimization purposes to reduce the // NOTE: the internal key boundaries are used for optimization purposes to
// number of tombstones that are passed to the fragmenter; they do not // reduce the number of tombstones that are passed to the fragmenter; they do
// guarantee that the resulting iterator only contains range tombstones that // not guarantee that the resulting iterator only contains range tombstones
// cover keys in the provided range. If required, these bounds must be // that cover keys in the provided range. If required, these bounds must be
// enforced during iteration. // enforced during iteration.
std::unique_ptr<FragmentedRangeTombstoneIterator> NewIterator( std::unique_ptr<FragmentedRangeTombstoneIterator> NewIterator(
const Slice* lower_bound = nullptr, const Slice* upper_bound = nullptr, const Slice* lower_bound = nullptr, const Slice* upper_bound = nullptr);
bool upper_bound_inclusive = false);
private: private:
std::vector<std::unique_ptr<TruncatedRangeDelIterator>> parent_iters_; std::vector<std::unique_ptr<TruncatedRangeDelIterator>> parent_iters_;

View file

@ -224,26 +224,32 @@ TEST_F(RangeDelAggregatorTest, UntruncatedIter) {
TruncatedRangeDelIterator iter(std::move(input_iter), &bytewise_icmp, nullptr, TruncatedRangeDelIterator iter(std::move(input_iter), &bytewise_icmp, nullptr,
nullptr); nullptr);
VerifyIterator(&iter, bytewise_icmp, VerifyIterator(
{{UncutEndpoint("a"), UncutEndpoint("e"), 10}, &iter, bytewise_icmp,
{UncutEndpoint("e"), UncutEndpoint("g"), 8}, {{InternalValue("a", 10, kTypeRangeDeletion), UncutEndpoint("e"), 10},
{UncutEndpoint("j"), UncutEndpoint("n"), 4}}); {InternalValue("e", 8, kTypeRangeDeletion), UncutEndpoint("g"), 8},
{InternalValue("j", 4, kTypeRangeDeletion), UncutEndpoint("n"), 4}});
VerifySeek( VerifySeek(
&iter, bytewise_icmp, &iter, bytewise_icmp,
{{"d", UncutEndpoint("a"), UncutEndpoint("e"), 10}, {{"d", InternalValue("a", 10, kTypeRangeDeletion), UncutEndpoint("e"),
{"e", UncutEndpoint("e"), UncutEndpoint("g"), 8}, 10},
{"ia", UncutEndpoint("j"), UncutEndpoint("n"), 4}, {"e", InternalValue("e", 8, kTypeRangeDeletion), UncutEndpoint("g"), 8},
{"n", UncutEndpoint(""), UncutEndpoint(""), 0, true /* invalid */}, {"ia", InternalValue("j", 4, kTypeRangeDeletion), UncutEndpoint("n"), 4},
{"", UncutEndpoint("a"), UncutEndpoint("e"), 10}}); {"n", InternalValue("", 0, kTypeRangeDeletion), UncutEndpoint(""), 0,
true /* invalid */},
{"", InternalValue("a", 10, kTypeRangeDeletion), UncutEndpoint("e"),
10}});
VerifySeekForPrev( VerifySeekForPrev(
&iter, bytewise_icmp, &iter, bytewise_icmp,
{{"d", UncutEndpoint("a"), UncutEndpoint("e"), 10}, {{"d", InternalValue("a", 10, kTypeRangeDeletion), UncutEndpoint("e"),
{"e", UncutEndpoint("e"), UncutEndpoint("g"), 8}, 10},
{"ia", UncutEndpoint("e"), UncutEndpoint("g"), 8}, {"e", InternalValue("e", 8, kTypeRangeDeletion), UncutEndpoint("g"), 8},
{"n", UncutEndpoint("j"), UncutEndpoint("n"), 4}, {"ia", InternalValue("e", 8, kTypeRangeDeletion), UncutEndpoint("g"), 8},
{"", UncutEndpoint(""), UncutEndpoint(""), 0, true /* invalid */}}); {"n", InternalValue("j", 4, kTypeRangeDeletion), UncutEndpoint("n"), 4},
{"", InternalValue("", 0, kTypeRangeDeletion), UncutEndpoint(""), 0,
true /* invalid */}});
} }
TEST_F(RangeDelAggregatorTest, UntruncatedIterWithSnapshot) { TEST_F(RangeDelAggregatorTest, UntruncatedIterWithSnapshot) {
@ -258,25 +264,29 @@ TEST_F(RangeDelAggregatorTest, UntruncatedIterWithSnapshot) {
TruncatedRangeDelIterator iter(std::move(input_iter), &bytewise_icmp, nullptr, TruncatedRangeDelIterator iter(std::move(input_iter), &bytewise_icmp, nullptr,
nullptr); nullptr);
VerifyIterator(&iter, bytewise_icmp, VerifyIterator(
{{UncutEndpoint("e"), UncutEndpoint("g"), 8}, &iter, bytewise_icmp,
{UncutEndpoint("j"), UncutEndpoint("n"), 4}}); {{InternalValue("e", 8, kTypeRangeDeletion), UncutEndpoint("g"), 8},
{InternalValue("j", 4, kTypeRangeDeletion), UncutEndpoint("n"), 4}});
VerifySeek( VerifySeek(
&iter, bytewise_icmp, &iter, bytewise_icmp,
{{"d", UncutEndpoint("e"), UncutEndpoint("g"), 8}, {{"d", InternalValue("e", 8, kTypeRangeDeletion), UncutEndpoint("g"), 8},
{"e", UncutEndpoint("e"), UncutEndpoint("g"), 8}, {"e", InternalValue("e", 8, kTypeRangeDeletion), UncutEndpoint("g"), 8},
{"ia", UncutEndpoint("j"), UncutEndpoint("n"), 4}, {"ia", InternalValue("j", 4, kTypeRangeDeletion), UncutEndpoint("n"), 4},
{"n", UncutEndpoint(""), UncutEndpoint(""), 0, true /* invalid */}, {"n", InternalValue("", 0, kTypeRangeDeletion), UncutEndpoint(""), 0,
{"", UncutEndpoint("e"), UncutEndpoint("g"), 8}}); true /* invalid */},
{"", InternalValue("e", 8, kTypeRangeDeletion), UncutEndpoint("g"), 8}});
VerifySeekForPrev( VerifySeekForPrev(
&iter, bytewise_icmp, &iter, bytewise_icmp,
{{"d", UncutEndpoint(""), UncutEndpoint(""), 0, true /* invalid */}, {{"d", InternalValue("", 0, kTypeRangeDeletion), UncutEndpoint(""), 0,
{"e", UncutEndpoint("e"), UncutEndpoint("g"), 8}, true /* invalid */},
{"ia", UncutEndpoint("e"), UncutEndpoint("g"), 8}, {"e", InternalValue("e", 8, kTypeRangeDeletion), UncutEndpoint("g"), 8},
{"n", UncutEndpoint("j"), UncutEndpoint("n"), 4}, {"ia", InternalValue("e", 8, kTypeRangeDeletion), UncutEndpoint("g"), 8},
{"", UncutEndpoint(""), UncutEndpoint(""), 0, true /* invalid */}}); {"n", InternalValue("j", 4, kTypeRangeDeletion), UncutEndpoint("n"), 4},
{"", InternalValue("", 0, kTypeRangeDeletion), UncutEndpoint(""), 0,
true /* invalid */}});
} }
TEST_F(RangeDelAggregatorTest, TruncatedIterPartiallyCutTombstones) { TEST_F(RangeDelAggregatorTest, TruncatedIterPartiallyCutTombstones) {
@ -295,27 +305,30 @@ TEST_F(RangeDelAggregatorTest, TruncatedIterPartiallyCutTombstones) {
VerifyIterator( VerifyIterator(
&iter, bytewise_icmp, &iter, bytewise_icmp,
{{InternalValue("d", 7), UncutEndpoint("e"), 10}, {{InternalValue("d", 7, kTypeMaxValid), UncutEndpoint("e"), 10},
{UncutEndpoint("e"), UncutEndpoint("g"), 8}, {InternalValue("e", 8, kTypeRangeDeletion), UncutEndpoint("g"), 8},
{UncutEndpoint("j"), InternalValue("m", 8, kValueTypeForSeek), 4}}); {InternalValue("j", 4, kTypeRangeDeletion),
InternalValue("m", 8, kTypeMaxValid), 4}});
VerifySeek( VerifySeek(
&iter, bytewise_icmp, &iter, bytewise_icmp,
{{"d", InternalValue("d", 7), UncutEndpoint("e"), 10}, {{"d", InternalValue("d", 7, kTypeMaxValid), UncutEndpoint("e"), 10},
{"e", UncutEndpoint("e"), UncutEndpoint("g"), 8}, {"e", InternalValue("e", 8, kTypeRangeDeletion), UncutEndpoint("g"), 8},
{"ia", UncutEndpoint("j"), InternalValue("m", 8, kValueTypeForSeek), 4, {"ia", InternalValue("j", 4, kTypeRangeDeletion),
false /* invalid */}, InternalValue("m", 8, kTypeMaxValid), 4, false /* invalid */},
{"n", UncutEndpoint(""), UncutEndpoint(""), 0, true /* invalid */}, {"n", InternalValue("", 0, kTypeRangeDeletion), UncutEndpoint(""), 0,
{"", InternalValue("d", 7), UncutEndpoint("e"), 10}}); true /* invalid */},
{"", InternalValue("d", 7, kTypeMaxValid), UncutEndpoint("e"), 10}});
VerifySeekForPrev( VerifySeekForPrev(
&iter, bytewise_icmp, &iter, bytewise_icmp,
{{"d", InternalValue("d", 7), UncutEndpoint("e"), 10}, {{"d", InternalValue("d", 7, kTypeMaxValid), UncutEndpoint("e"), 10},
{"e", UncutEndpoint("e"), UncutEndpoint("g"), 8}, {"e", InternalValue("e", 8, kTypeRangeDeletion), UncutEndpoint("g"), 8},
{"ia", UncutEndpoint("e"), UncutEndpoint("g"), 8}, {"ia", InternalValue("e", 8, kTypeRangeDeletion), UncutEndpoint("g"), 8},
{"n", UncutEndpoint("j"), InternalValue("m", 8, kValueTypeForSeek), 4, {"n", InternalValue("j", 4, kTypeRangeDeletion),
false /* invalid */}, InternalValue("m", 8, kTypeMaxValid), 4, false /* invalid */},
{"", UncutEndpoint(""), UncutEndpoint(""), 0, true /* invalid */}}); {"", InternalValue("", 0, kTypeRangeDeletion), UncutEndpoint(""), 0,
true /* invalid */}});
} }
TEST_F(RangeDelAggregatorTest, TruncatedIterFullyCutTombstones) { TEST_F(RangeDelAggregatorTest, TruncatedIterFullyCutTombstones) {
@ -332,20 +345,23 @@ TEST_F(RangeDelAggregatorTest, TruncatedIterFullyCutTombstones) {
TruncatedRangeDelIterator iter(std::move(input_iter), &bytewise_icmp, TruncatedRangeDelIterator iter(std::move(input_iter), &bytewise_icmp,
&smallest, &largest); &smallest, &largest);
VerifyIterator(&iter, bytewise_icmp, VerifyIterator(
{{InternalValue("f", 7), UncutEndpoint("g"), 8}}); &iter, bytewise_icmp,
{{InternalValue("f", 7, kTypeMaxValid), UncutEndpoint("g"), 8}});
VerifySeek( VerifySeek(
&iter, bytewise_icmp, &iter, bytewise_icmp,
{{"d", InternalValue("f", 7), UncutEndpoint("g"), 8}, {{"d", InternalValue("f", 7, kTypeMaxValid), UncutEndpoint("g"), 8},
{"f", InternalValue("f", 7), UncutEndpoint("g"), 8}, {"f", InternalValue("f", 7, kTypeMaxValid), UncutEndpoint("g"), 8},
{"j", UncutEndpoint(""), UncutEndpoint(""), 0, true /* invalid */}}); {"j", InternalValue("", 0, kTypeRangeDeletion), UncutEndpoint(""), 0,
true /* invalid */}});
VerifySeekForPrev( VerifySeekForPrev(
&iter, bytewise_icmp, &iter, bytewise_icmp,
{{"d", UncutEndpoint(""), UncutEndpoint(""), 0, true /* invalid */}, {{"d", InternalValue("", 0, kTypeRangeDeletion), UncutEndpoint(""), 0,
{"f", InternalValue("f", 7), UncutEndpoint("g"), 8}, true /* invalid */},
{"j", InternalValue("f", 7), UncutEndpoint("g"), 8}}); {"f", InternalValue("f", 7, kTypeMaxValid), UncutEndpoint("g"), 8},
{"j", InternalValue("f", 7, kTypeMaxValid), UncutEndpoint("g"), 8}});
} }
TEST_F(RangeDelAggregatorTest, SingleIterInAggregator) { TEST_F(RangeDelAggregatorTest, SingleIterInAggregator) {
@ -627,15 +643,12 @@ TEST_F(RangeDelAggregatorTest, CompactionAggregatorEmptyIteratorRight) {
range_del_agg.AddTombstones(std::move(input_iter)); range_del_agg.AddTombstones(std::move(input_iter));
} }
Slice start("p"); InternalKey start_buf("p", 0, kTypeRangeDeletion);
Slice end("q"); InternalKey end_buf("q", 0, kTypeRangeDeletion);
auto range_del_compaction_iter1 = Slice start = start_buf.Encode();
range_del_agg.NewIterator(&start, &end, false /* end_key_inclusive */); Slice end = end_buf.Encode();
VerifyFragmentedRangeDels(range_del_compaction_iter1.get(), {}); auto range_del_compaction_iter = range_del_agg.NewIterator(&start, &end);
VerifyFragmentedRangeDels(range_del_compaction_iter.get(), {});
auto range_del_compaction_iter2 =
range_del_agg.NewIterator(&start, &end, true /* end_key_inclusive */);
VerifyFragmentedRangeDels(range_del_compaction_iter2.get(), {});
} }
TEST_F(RangeDelAggregatorTest, CompactionAggregatorBoundedIterator) { TEST_F(RangeDelAggregatorTest, CompactionAggregatorBoundedIterator) {
@ -652,18 +665,13 @@ TEST_F(RangeDelAggregatorTest, CompactionAggregatorBoundedIterator) {
range_del_agg.AddTombstones(std::move(input_iter)); range_del_agg.AddTombstones(std::move(input_iter));
} }
Slice start("bb"); InternalKey start_buf("bb", 0, kTypeRangeDeletion);
Slice end("e"); InternalKey end_buf("e", 9, kTypeRangeDeletion);
auto range_del_compaction_iter1 = Slice start = start_buf.Encode();
range_del_agg.NewIterator(&start, &end, false /* end_key_inclusive */); Slice end = end_buf.Encode();
VerifyFragmentedRangeDels(range_del_compaction_iter1.get(), auto range_del_compaction_iter = range_del_agg.NewIterator(&start, &end);
VerifyFragmentedRangeDels(range_del_compaction_iter.get(),
{{"a", "c", 10}, {"c", "e", 10}, {"c", "e", 8}}); {{"a", "c", 10}, {"c", "e", 10}, {"c", "e", 8}});
auto range_del_compaction_iter2 =
range_del_agg.NewIterator(&start, &end, true /* end_key_inclusive */);
VerifyFragmentedRangeDels(
range_del_compaction_iter2.get(),
{{"a", "c", 10}, {"c", "e", 10}, {"c", "e", 8}, {"e", "g", 8}});
} }
TEST_F(RangeDelAggregatorTest, TEST_F(RangeDelAggregatorTest,
@ -681,22 +689,12 @@ TEST_F(RangeDelAggregatorTest,
range_del_agg.AddTombstones(std::move(input_iter)); range_del_agg.AddTombstones(std::move(input_iter));
} }
Slice start("bb"); InternalKey start_buf("bb", 0, kTypeRangeDeletion);
Slice end("e"); InternalKey end_buf("e", 0, kTypeRangeDeletion);
auto range_del_compaction_iter1 = Slice start = start_buf.Encode();
range_del_agg.NewIterator(&start, &end, false /* end_key_inclusive */); Slice end = end_buf.Encode();
VerifyFragmentedRangeDels(range_del_compaction_iter1.get(), {{"a", "b", 10}, auto range_del_compaction_iter = range_del_agg.NewIterator(&start, &end);
{"b", "c", 20}, VerifyFragmentedRangeDels(range_del_compaction_iter.get(), {{"a", "b", 10},
{"b", "c", 10},
{"c", "d", 10},
{"c", "d", 8},
{"d", "f", 30},
{"d", "f", 8},
{"f", "g", 8}});
auto range_del_compaction_iter2 =
range_del_agg.NewIterator(&start, &end, true /* end_key_inclusive */);
VerifyFragmentedRangeDels(range_del_compaction_iter2.get(), {{"a", "b", 10},
{"b", "c", 20}, {"b", "c", 20},
{"b", "c", 10}, {"b", "c", 10},
{"c", "d", 10}, {"c", "d", 10},

View file

@ -218,8 +218,7 @@ class FragmentedRangeTombstoneIterator : public InternalIterator {
} }
ParsedInternalKey parsed_start_key() const { ParsedInternalKey parsed_start_key() const {
return ParsedInternalKey(pos_->start_key, kMaxSequenceNumber, return ParsedInternalKey(pos_->start_key, seq(), kTypeRangeDeletion);
kTypeRangeDeletion);
} }
ParsedInternalKey parsed_end_key() const { ParsedInternalKey parsed_end_key() const {
return ParsedInternalKey(pos_->end_key, kMaxSequenceNumber, return ParsedInternalKey(pos_->end_key, kMaxSequenceNumber,

View file

@ -38,6 +38,8 @@
#include "db/table_cache.h" #include "db/table_cache.h"
#include "db/version_builder.h" #include "db/version_builder.h"
#include "db/version_edit_handler.h" #include "db/version_edit_handler.h"
#include "table/compaction_merging_iterator.h"
#if USE_COROUTINES #if USE_COROUTINES
#include "folly/experimental/coro/BlockingWait.h" #include "folly/experimental/coro/BlockingWait.h"
#include "folly/experimental/coro/Collect.h" #include "folly/experimental/coro/Collect.h"
@ -6635,6 +6637,14 @@ InternalIterator* VersionSet::MakeInputIterator(
c->num_input_levels() - 1 c->num_input_levels() - 1
: c->num_input_levels()); : c->num_input_levels());
InternalIterator** list = new InternalIterator*[space]; InternalIterator** list = new InternalIterator*[space];
// First item in the pair is a pointer to range tombstones.
// Second item is a pointer to a member of a LevelIterator,
// that will be initialized to where CompactionMergingIterator stores
// pointer to its range tombstones. This is used by LevelIterator
// to update pointer to range tombstones as it traverse different SST files.
std::vector<
std::pair<TruncatedRangeDelIterator*, TruncatedRangeDelIterator***>>
range_tombstones;
size_t num = 0; size_t num = 0;
for (size_t which = 0; which < c->num_input_levels(); which++) { for (size_t which = 0; which < c->num_input_levels(); which++) {
if (c->input_levels(which)->num_files != 0) { if (c->input_levels(which)->num_files != 0) {
@ -6655,7 +6665,7 @@ InternalIterator* VersionSet::MakeInputIterator(
end.value(), fmd.smallest.user_key()) < 0) { end.value(), fmd.smallest.user_key()) < 0) {
continue; continue;
} }
TruncatedRangeDelIterator* range_tombstone_iter = nullptr;
list[num++] = cfd->table_cache()->NewIterator( list[num++] = cfd->table_cache()->NewIterator(
read_options, file_options_compactions, read_options, file_options_compactions,
cfd->internal_comparator(), fmd, range_del_agg, cfd->internal_comparator(), fmd, range_del_agg,
@ -6668,10 +6678,13 @@ InternalIterator* VersionSet::MakeInputIterator(
MaxFileSizeForL0MetaPin(*c->mutable_cf_options()), MaxFileSizeForL0MetaPin(*c->mutable_cf_options()),
/*smallest_compaction_key=*/nullptr, /*smallest_compaction_key=*/nullptr,
/*largest_compaction_key=*/nullptr, /*largest_compaction_key=*/nullptr,
/*allow_unprepared_value=*/false); /*allow_unprepared_value=*/false,
/*range_del_iter=*/&range_tombstone_iter);
range_tombstones.emplace_back(range_tombstone_iter, nullptr);
} }
} else { } else {
// Create concatenating iterator for the files from this level // Create concatenating iterator for the files from this level
TruncatedRangeDelIterator*** tombstone_iter_ptr = nullptr;
list[num++] = new LevelIterator( list[num++] = new LevelIterator(
cfd->table_cache(), read_options, file_options_compactions, cfd->table_cache(), read_options, file_options_compactions,
cfd->internal_comparator(), c->input_levels(which), cfd->internal_comparator(), c->input_levels(which),
@ -6680,14 +6693,15 @@ InternalIterator* VersionSet::MakeInputIterator(
/*no per level latency histogram=*/nullptr, /*no per level latency histogram=*/nullptr,
TableReaderCaller::kCompaction, /*skip_filters=*/false, TableReaderCaller::kCompaction, /*skip_filters=*/false,
/*level=*/static_cast<int>(c->level(which)), range_del_agg, /*level=*/static_cast<int>(c->level(which)), range_del_agg,
c->boundaries(which)); c->boundaries(which), false, &tombstone_iter_ptr);
range_tombstones.emplace_back(nullptr, tombstone_iter_ptr);
} }
} }
} }
assert(num <= space); assert(num <= space);
InternalIterator* result = InternalIterator* result = NewCompactionMergingIterator(
NewMergingIterator(&c->column_family_data()->internal_comparator(), list, &c->column_family_data()->internal_comparator(), list,
static_cast<int>(num)); static_cast<int>(num), range_tombstones);
delete[] list; delete[] list;
return result; return result;
} }

1
src.mk
View file

@ -198,6 +198,7 @@ LIB_SOURCES = \
table/get_context.cc \ table/get_context.cc \
table/iterator.cc \ table/iterator.cc \
table/merging_iterator.cc \ table/merging_iterator.cc \
table/compaction_merging_iterator.cc \
table/meta_blocks.cc \ table/meta_blocks.cc \
table/persistent_cache_helper.cc \ table/persistent_cache_helper.cc \
table/plain/plain_table_bloom.cc \ table/plain/plain_table_bloom.cc \

View file

@ -0,0 +1,370 @@
// Copyright (c) Meta Platforms, Inc. and affiliates.
//
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include "table/compaction_merging_iterator.h"
namespace ROCKSDB_NAMESPACE {
class CompactionMergingIterator : public InternalIterator {
public:
CompactionMergingIterator(
const InternalKeyComparator* comparator, InternalIterator** children,
int n, bool is_arena_mode,
std::vector<
std::pair<TruncatedRangeDelIterator*, TruncatedRangeDelIterator***>>
range_tombstones)
: is_arena_mode_(is_arena_mode),
comparator_(comparator),
current_(nullptr),
minHeap_(CompactionHeapItemComparator(comparator_)),
pinned_iters_mgr_(nullptr) {
children_.resize(n);
for (int i = 0; i < n; i++) {
children_[i].level = i;
children_[i].iter.Set(children[i]);
assert(children_[i].type == HeapItem::ITERATOR);
}
assert(range_tombstones.size() == static_cast<size_t>(n));
for (auto& p : range_tombstones) {
range_tombstone_iters_.push_back(p.first);
}
pinned_heap_item_.resize(n);
for (int i = 0; i < n; ++i) {
if (range_tombstones[i].second) {
// for LevelIterator
*range_tombstones[i].second = &range_tombstone_iters_[i];
}
pinned_heap_item_[i].level = i;
pinned_heap_item_[i].type = HeapItem::DELETE_RANGE_START;
}
}
void considerStatus(const Status& s) {
if (!s.ok() && status_.ok()) {
status_ = s;
}
}
~CompactionMergingIterator() override {
// TODO: use unique_ptr for range_tombstone_iters_
for (auto child : range_tombstone_iters_) {
delete child;
}
for (auto& child : children_) {
child.iter.DeleteIter(is_arena_mode_);
}
status_.PermitUncheckedError();
}
bool Valid() const override { return current_ != nullptr && status_.ok(); }
Status status() const override { return status_; }
void SeekToFirst() override;
void Seek(const Slice& target) override;
void Next() override;
Slice key() const override {
assert(Valid());
return current_->key();
}
Slice value() const override {
assert(Valid());
if (LIKELY(current_->type == HeapItem::ITERATOR)) {
return current_->iter.value();
} else {
return dummy_tombstone_val;
}
}
// Here we simply relay MayBeOutOfLowerBound/MayBeOutOfUpperBound result
// from current child iterator. Potentially as long as one of child iterator
// report out of bound is not possible, we know current key is within bound.
bool MayBeOutOfLowerBound() override {
assert(Valid());
return current_->type == HeapItem::DELETE_RANGE_START ||
current_->iter.MayBeOutOfLowerBound();
}
IterBoundCheck UpperBoundCheckResult() override {
assert(Valid());
return current_->type == HeapItem::DELETE_RANGE_START
? IterBoundCheck::kUnknown
: current_->iter.UpperBoundCheckResult();
}
void SetPinnedItersMgr(PinnedIteratorsManager* pinned_iters_mgr) override {
pinned_iters_mgr_ = pinned_iters_mgr;
for (auto& child : children_) {
child.iter.SetPinnedItersMgr(pinned_iters_mgr);
}
}
bool IsDeleteRangeSentinelKey() const override {
assert(Valid());
return current_->type == HeapItem::DELETE_RANGE_START;
}
// Compaction uses the above subset of InternalIterator interface.
void SeekToLast() override { assert(false); }
void SeekForPrev(const Slice&) override { assert(false); }
void Prev() override { assert(false); }
bool NextAndGetResult(IterateResult*) override {
assert(false);
return false;
}
bool IsKeyPinned() const override {
assert(false);
return false;
}
bool IsValuePinned() const override {
assert(false);
return false;
}
bool PrepareValue() override {
assert(false);
return false;
}
private:
struct HeapItem {
HeapItem() = default;
IteratorWrapper iter;
size_t level = 0;
std::string tombstone_str;
enum Type { ITERATOR, DELETE_RANGE_START };
Type type = ITERATOR;
explicit HeapItem(size_t _level, InternalIteratorBase<Slice>* _iter)
: level(_level), type(Type::ITERATOR) {
iter.Set(_iter);
}
void SetTombstoneForCompaction(const ParsedInternalKey&& pik) {
tombstone_str.clear();
AppendInternalKey(&tombstone_str, pik);
}
[[nodiscard]] Slice key() const {
return type == ITERATOR ? iter.key() : tombstone_str;
}
};
class CompactionHeapItemComparator {
public:
explicit CompactionHeapItemComparator(
const InternalKeyComparator* comparator)
: comparator_(comparator) {}
bool operator()(HeapItem* a, HeapItem* b) const {
int r = comparator_->Compare(a->key(), b->key());
// For each file, we assume all range tombstone start keys come before
// its file boundary sentinel key (file's meta.largest key).
// In the case when meta.smallest = meta.largest and range tombstone start
// key is truncated at meta.smallest, the start key will have op_type =
// kMaxValid to make it smaller (see TruncatedRangeDelIterator
// constructor). The following assertion validates this assumption.
assert(a->type == b->type || r != 0);
return r > 0;
}
private:
const InternalKeyComparator* comparator_;
};
using CompactionMinHeap = BinaryHeap<HeapItem*, CompactionHeapItemComparator>;
bool is_arena_mode_;
const InternalKeyComparator* comparator_;
// HeapItem for all child point iterators.
std::vector<HeapItem> children_;
// HeapItem for range tombstones. pinned_heap_item_[i] corresponds to the
// current range tombstone from range_tombstone_iters_[i].
std::vector<HeapItem> pinned_heap_item_;
// range_tombstone_iters_[i] contains range tombstones in the sorted run that
// corresponds to children_[i]. range_tombstone_iters_[i] ==
// nullptr means the sorted run of children_[i] does not have range
// tombstones (or the current SSTable does not have range tombstones in the
// case of LevelIterator).
std::vector<TruncatedRangeDelIterator*> range_tombstone_iters_;
// Used as value for range tombstone keys
std::string dummy_tombstone_val{};
// Skip file boundary sentinel keys.
void FindNextVisibleKey();
// top of minHeap_
HeapItem* current_;
// If any of the children have non-ok status, this is one of them.
Status status_;
CompactionMinHeap minHeap_;
PinnedIteratorsManager* pinned_iters_mgr_;
// Process a child that is not in the min heap.
// If valid, add to the min heap. Otherwise, check status.
void AddToMinHeapOrCheckStatus(HeapItem*);
HeapItem* CurrentForward() const {
return !minHeap_.empty() ? minHeap_.top() : nullptr;
}
void InsertRangeTombstoneAtLevel(size_t level) {
if (range_tombstone_iters_[level]->Valid()) {
pinned_heap_item_[level].SetTombstoneForCompaction(
range_tombstone_iters_[level]->start_key());
minHeap_.push(&pinned_heap_item_[level]);
}
}
};
void CompactionMergingIterator::SeekToFirst() {
minHeap_.clear();
status_ = Status::OK();
for (auto& child : children_) {
child.iter.SeekToFirst();
AddToMinHeapOrCheckStatus(&child);
}
for (size_t i = 0; i < range_tombstone_iters_.size(); ++i) {
if (range_tombstone_iters_[i]) {
range_tombstone_iters_[i]->SeekToFirst();
InsertRangeTombstoneAtLevel(i);
}
}
FindNextVisibleKey();
current_ = CurrentForward();
}
void CompactionMergingIterator::Seek(const Slice& target) {
minHeap_.clear();
status_ = Status::OK();
for (auto& child : children_) {
child.iter.Seek(target);
AddToMinHeapOrCheckStatus(&child);
}
ParsedInternalKey pik;
ParseInternalKey(target, &pik, false /* log_err_key */)
.PermitUncheckedError();
for (size_t i = 0; i < range_tombstone_iters_.size(); ++i) {
if (range_tombstone_iters_[i]) {
range_tombstone_iters_[i]->Seek(pik.user_key);
// For compaction, output keys should all be after seek target.
while (range_tombstone_iters_[i]->Valid() &&
comparator_->Compare(range_tombstone_iters_[i]->start_key(), pik) <
0) {
range_tombstone_iters_[i]->Next();
}
InsertRangeTombstoneAtLevel(i);
}
}
FindNextVisibleKey();
current_ = CurrentForward();
}
void CompactionMergingIterator::Next() {
assert(Valid());
// For the heap modifications below to be correct, current_ must be the
// current top of the heap.
assert(current_ == CurrentForward());
// as the current points to the current record. move the iterator forward.
if (current_->type == HeapItem::ITERATOR) {
current_->iter.Next();
if (current_->iter.Valid()) {
// current is still valid after the Next() call above. Call
// replace_top() to restore the heap property. When the same child
// iterator yields a sequence of keys, this is cheap.
assert(current_->iter.status().ok());
minHeap_.replace_top(current_);
} else {
// current stopped being valid, remove it from the heap.
considerStatus(current_->iter.status());
minHeap_.pop();
}
} else {
assert(current_->type == HeapItem::DELETE_RANGE_START);
size_t level = current_->level;
assert(range_tombstone_iters_[level]);
range_tombstone_iters_[level]->Next();
if (range_tombstone_iters_[level]->Valid()) {
pinned_heap_item_[level].SetTombstoneForCompaction(
range_tombstone_iters_[level]->start_key());
minHeap_.replace_top(&pinned_heap_item_[level]);
} else {
minHeap_.pop();
}
}
FindNextVisibleKey();
current_ = CurrentForward();
}
void CompactionMergingIterator::FindNextVisibleKey() {
while (!minHeap_.empty()) {
HeapItem* current = minHeap_.top();
// IsDeleteRangeSentinelKey() here means file boundary sentinel keys.
if (current->type != HeapItem::ITERATOR ||
!current->iter.IsDeleteRangeSentinelKey()) {
return;
}
// range tombstone start keys from the same SSTable should have been
// exhausted
assert(!range_tombstone_iters_[current->level] ||
!range_tombstone_iters_[current->level]->Valid());
// current->iter is a LevelIterator, and it enters a new SST file in the
// Next() call here.
current->iter.Next();
if (current->iter.Valid()) {
assert(current->iter.status().ok());
minHeap_.replace_top(current);
} else {
minHeap_.pop();
}
if (range_tombstone_iters_[current->level]) {
InsertRangeTombstoneAtLevel(current->level);
}
}
}
void CompactionMergingIterator::AddToMinHeapOrCheckStatus(HeapItem* child) {
if (child->iter.Valid()) {
assert(child->iter.status().ok());
minHeap_.push(child);
} else {
considerStatus(child->iter.status());
}
}
InternalIterator* NewCompactionMergingIterator(
const InternalKeyComparator* comparator, InternalIterator** children, int n,
std::vector<std::pair<TruncatedRangeDelIterator*,
TruncatedRangeDelIterator***>>& range_tombstone_iters,
Arena* arena) {
assert(n >= 0);
if (n == 0) {
return NewEmptyInternalIterator<Slice>(arena);
} else {
if (arena == nullptr) {
return new CompactionMergingIterator(comparator, children, n,
false /* is_arena_mode */,
range_tombstone_iters);
} else {
auto mem = arena->AllocateAligned(sizeof(CompactionMergingIterator));
return new (mem) CompactionMergingIterator(comparator, children, n,
true /* is_arena_mode */,
range_tombstone_iters);
}
}
}
} // namespace ROCKSDB_NAMESPACE

View file

@ -0,0 +1,44 @@
// Copyright (c) Meta Platforms, Inc. and affiliates.
//
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#pragma once
#include "db/range_del_aggregator.h"
#include "rocksdb/slice.h"
#include "rocksdb/types.h"
#include "table/merging_iterator.h"
namespace ROCKSDB_NAMESPACE {
/*
* This is a simplified version of MergingIterator and is specifically used for
* compaction. It merges the input `children` iterators into a sorted stream of
* keys. Range tombstone start keys are also emitted to prevent oversize
* compactions. For example, consider an L1 file with content [a, b), y, z,
* where [a, b) is a range tombstone and y and z are point keys. This could
* cause an oversize compaction as it can overlap with a wide range of key space
* in L2.
*
* CompactionMergingIterator emits range tombstone start keys from each LSM
* level's range tombstone iterator, and for each range tombstone
* [start,end)@seqno, the key will be start@seqno with op_type
* kTypeRangeDeletion unless truncated at file boundary (see detail in
* TruncatedRangeDelIterator::start_key()).
*
* Caller should use CompactionMergingIterator::IsDeleteRangeSentinelKey() to
* check if the current key is a range tombstone key.
* TODO(cbi): IsDeleteRangeSentinelKey() is used for two kinds of keys at
* different layers: file boundary and range tombstone keys. Separate them into
* two APIs for clarity.
*/
class CompactionMergingIterator;
InternalIterator* NewCompactionMergingIterator(
const InternalKeyComparator* comparator, InternalIterator** children, int n,
std::vector<std::pair<TruncatedRangeDelIterator*,
TruncatedRangeDelIterator***>>& range_tombstone_iters,
Arena* arena = nullptr);
} // namespace ROCKSDB_NAMESPACE

View file

@ -10,121 +10,8 @@
#include "table/merging_iterator.h" #include "table/merging_iterator.h"
#include "db/arena_wrapped_db_iter.h" #include "db/arena_wrapped_db_iter.h"
#include "db/dbformat.h"
#include "db/pinned_iterators_manager.h"
#include "memory/arena.h"
#include "monitoring/perf_context_imp.h"
#include "rocksdb/comparator.h"
#include "rocksdb/iterator.h"
#include "rocksdb/options.h"
#include "table/internal_iterator.h"
#include "table/iter_heap.h"
#include "table/iterator_wrapper.h"
#include "test_util/sync_point.h"
#include "util/autovector.h"
#include "util/heap.h"
#include "util/stop_watch.h"
namespace ROCKSDB_NAMESPACE { namespace ROCKSDB_NAMESPACE {
// For merging iterator to process range tombstones, we treat the start and end
// keys of a range tombstone as point keys and put them into the minHeap/maxHeap
// used in merging iterator. Take minHeap for example, we are able to keep track
// of currently "active" range tombstones (the ones whose start keys are popped
// but end keys are still in the heap) in `active_`. This `active_` set of range
// tombstones is then used to quickly determine whether the point key at heap
// top is deleted (by heap property, the point key at heap top must be within
// internal key range of active range tombstones).
//
// The HeapItem struct represents 3 types of elements in the minHeap/maxHeap:
// point key and the start and end keys of a range tombstone.
struct HeapItem {
HeapItem() = default;
enum Type { ITERATOR, DELETE_RANGE_START, DELETE_RANGE_END };
IteratorWrapper iter;
size_t level = 0;
ParsedInternalKey parsed_ikey;
// Will be overwritten before use, initialize here so compiler does not
// complain.
Type type = ITERATOR;
explicit HeapItem(size_t _level, InternalIteratorBase<Slice>* _iter)
: level(_level), type(Type::ITERATOR) {
iter.Set(_iter);
}
void SetTombstoneKey(ParsedInternalKey&& pik) {
// op_type is already initialized in MergingIterator::Finish().
parsed_ikey.user_key = pik.user_key;
parsed_ikey.sequence = pik.sequence;
}
Slice key() const {
assert(type == ITERATOR);
return iter.key();
}
bool IsDeleteRangeSentinelKey() const {
if (type == Type::ITERATOR) {
return iter.IsDeleteRangeSentinelKey();
}
return false;
}
};
class MinHeapItemComparator {
public:
MinHeapItemComparator(const InternalKeyComparator* comparator)
: comparator_(comparator) {}
bool operator()(HeapItem* a, HeapItem* b) const {
if (LIKELY(a->type == HeapItem::ITERATOR)) {
if (LIKELY(b->type == HeapItem::ITERATOR)) {
return comparator_->Compare(a->key(), b->key()) > 0;
} else {
return comparator_->Compare(a->key(), b->parsed_ikey) > 0;
}
} else {
if (LIKELY(b->type == HeapItem::ITERATOR)) {
return comparator_->Compare(a->parsed_ikey, b->key()) > 0;
} else {
return comparator_->Compare(a->parsed_ikey, b->parsed_ikey) > 0;
}
}
}
private:
const InternalKeyComparator* comparator_;
};
class MaxHeapItemComparator {
public:
MaxHeapItemComparator(const InternalKeyComparator* comparator)
: comparator_(comparator) {}
bool operator()(HeapItem* a, HeapItem* b) const {
if (LIKELY(a->type == HeapItem::ITERATOR)) {
if (LIKELY(b->type == HeapItem::ITERATOR)) {
return comparator_->Compare(a->key(), b->key()) < 0;
} else {
return comparator_->Compare(a->key(), b->parsed_ikey) < 0;
}
} else {
if (LIKELY(b->type == HeapItem::ITERATOR)) {
return comparator_->Compare(a->parsed_ikey, b->key()) < 0;
} else {
return comparator_->Compare(a->parsed_ikey, b->parsed_ikey) < 0;
}
}
}
private:
const InternalKeyComparator* comparator_;
};
// Without anonymous namespace here, we fail the warning -Wmissing-prototypes
namespace {
using MergerMinIterHeap = BinaryHeap<HeapItem*, MinHeapItemComparator>;
using MergerMaxIterHeap = BinaryHeap<HeapItem*, MaxHeapItemComparator>;
} // namespace
class MergingIterator : public InternalIterator { class MergingIterator : public InternalIterator {
public: public:
MergingIterator(const InternalKeyComparator* comparator, MergingIterator(const InternalKeyComparator* comparator,
@ -136,7 +23,7 @@ class MergingIterator : public InternalIterator {
direction_(kForward), direction_(kForward),
comparator_(comparator), comparator_(comparator),
current_(nullptr), current_(nullptr),
minHeap_(comparator_), minHeap_(MinHeapItemComparator(comparator_)),
pinned_iters_mgr_(nullptr), pinned_iters_mgr_(nullptr),
iterate_upper_bound_(iterate_upper_bound) { iterate_upper_bound_(iterate_upper_bound) {
children_.resize(n); children_.resize(n);
@ -199,7 +86,7 @@ class MergingIterator : public InternalIterator {
// TruncatedRangeDelIterator since untruncated tombstone end points // TruncatedRangeDelIterator since untruncated tombstone end points
// always have kMaxSequenceNumber and kTypeRangeDeletion (see // always have kMaxSequenceNumber and kTypeRangeDeletion (see
// TruncatedRangeDelIterator::start_key()/end_key()). // TruncatedRangeDelIterator::start_key()/end_key()).
pinned_heap_item_[i].parsed_ikey.type = kTypeMaxValid; pinned_heap_item_[i].tombstone_pik.type = kTypeMaxValid;
} }
} }
} }
@ -549,6 +436,92 @@ class MergingIterator : public InternalIterator {
} }
private: private:
// For merging iterator to process range tombstones, we treat the start and
// end
// keys of a range tombstone as point keys and put them into the
// minHeap/maxHeap used in merging iterator. Take minHeap for example, we are
// able to keep track of currently "active" range tombstones (the ones whose
// start keys are popped but end keys are still in the heap) in `active_`.
// This `active_` set of range tombstones is then used to quickly determine
// whether the point key at heap top is deleted (by heap property, the point
// key at heap top must be within internal key range of active range
// tombstones).
//
// The HeapItem struct represents 3 types of elements in the minHeap/maxHeap:
// point key and the start and end keys of a range tombstone.
struct HeapItem {
HeapItem() = default;
IteratorWrapper iter;
size_t level = 0;
ParsedInternalKey tombstone_pik;
// Will be overwritten before use, initialize here so compiler does not
// complain.
enum Type { ITERATOR, DELETE_RANGE_START, DELETE_RANGE_END };
Type type = ITERATOR;
explicit HeapItem(size_t _level, InternalIteratorBase<Slice>* _iter)
: level(_level), type(Type::ITERATOR) {
iter.Set(_iter);
}
void SetTombstoneKey(ParsedInternalKey&& pik) {
// op_type is already initialized in MergingIterator::Finish().
tombstone_pik.user_key = pik.user_key;
tombstone_pik.sequence = pik.sequence;
}
};
class MinHeapItemComparator {
public:
explicit MinHeapItemComparator(const InternalKeyComparator* comparator)
: comparator_(comparator) {}
bool operator()(HeapItem* a, HeapItem* b) const {
if (LIKELY(a->type == HeapItem::ITERATOR)) {
if (LIKELY(b->type == HeapItem::ITERATOR)) {
return comparator_->Compare(a->iter.key(), b->iter.key()) > 0;
} else {
return comparator_->Compare(a->iter.key(), b->tombstone_pik) > 0;
}
} else {
if (LIKELY(b->type == HeapItem::ITERATOR)) {
return comparator_->Compare(a->tombstone_pik, b->iter.key()) > 0;
} else {
return comparator_->Compare(a->tombstone_pik, b->tombstone_pik) > 0;
}
}
}
private:
const InternalKeyComparator* comparator_;
};
class MaxHeapItemComparator {
public:
explicit MaxHeapItemComparator(const InternalKeyComparator* comparator)
: comparator_(comparator) {}
bool operator()(HeapItem* a, HeapItem* b) const {
if (LIKELY(a->type == HeapItem::ITERATOR)) {
if (LIKELY(b->type == HeapItem::ITERATOR)) {
return comparator_->Compare(a->iter.key(), b->iter.key()) < 0;
} else {
return comparator_->Compare(a->iter.key(), b->tombstone_pik) < 0;
}
} else {
if (LIKELY(b->type == HeapItem::ITERATOR)) {
return comparator_->Compare(a->tombstone_pik, b->iter.key()) < 0;
} else {
return comparator_->Compare(a->tombstone_pik, b->tombstone_pik) < 0;
}
}
}
private:
const InternalKeyComparator* comparator_;
};
using MergerMinIterHeap = BinaryHeap<HeapItem*, MinHeapItemComparator>;
using MergerMaxIterHeap = BinaryHeap<HeapItem*, MaxHeapItemComparator>;
friend class MergeIteratorBuilder; friend class MergeIteratorBuilder;
// Clears heaps for both directions, used when changing direction or seeking // Clears heaps for both directions, used when changing direction or seeking
void ClearHeaps(bool clear_active = true); void ClearHeaps(bool clear_active = true);
@ -1177,7 +1150,7 @@ void MergingIterator::SwitchToForward() {
if (child.iter.status() == Status::TryAgain()) { if (child.iter.status() == Status::TryAgain()) {
continue; continue;
} }
if (child.iter.Valid() && comparator_->Equal(target, child.key())) { if (child.iter.Valid() && comparator_->Equal(target, child.iter.key())) {
assert(child.iter.status().ok()); assert(child.iter.status().ok());
child.iter.Next(); child.iter.Next();
} }
@ -1188,7 +1161,7 @@ void MergingIterator::SwitchToForward() {
for (auto& child : children_) { for (auto& child : children_) {
if (child.iter.status() == Status::TryAgain()) { if (child.iter.status() == Status::TryAgain()) {
child.iter.Seek(target); child.iter.Seek(target);
if (child.iter.Valid() && comparator_->Equal(target, child.key())) { if (child.iter.Valid() && comparator_->Equal(target, child.iter.key())) {
assert(child.iter.status().ok()); assert(child.iter.status().ok());
child.iter.Next(); child.iter.Next();
} }
@ -1239,7 +1212,7 @@ void MergingIterator::SwitchToBackward() {
if (&child.iter != current_) { if (&child.iter != current_) {
child.iter.SeekForPrev(target); child.iter.SeekForPrev(target);
TEST_SYNC_POINT_CALLBACK("MergeIterator::Prev:BeforePrev", &child); TEST_SYNC_POINT_CALLBACK("MergeIterator::Prev:BeforePrev", &child);
if (child.iter.Valid() && comparator_->Equal(target, child.key())) { if (child.iter.Valid() && comparator_->Equal(target, child.iter.key())) {
assert(child.iter.status().ok()); assert(child.iter.status().ok());
child.iter.Prev(); child.iter.Prev();
} }
@ -1297,7 +1270,8 @@ void MergingIterator::ClearHeaps(bool clear_active) {
void MergingIterator::InitMaxHeap() { void MergingIterator::InitMaxHeap() {
if (!maxHeap_) { if (!maxHeap_) {
maxHeap_ = std::make_unique<MergerMaxIterHeap>(comparator_); maxHeap_ =
std::make_unique<MergerMaxIterHeap>(MaxHeapItemComparator(comparator_));
} }
} }
@ -1308,11 +1282,13 @@ void MergingIterator::InitMaxHeap() {
// key's level, then the current child iterator is simply advanced to its next // key's level, then the current child iterator is simply advanced to its next
// key without reseeking. // key without reseeking.
inline void MergingIterator::FindNextVisibleKey() { inline void MergingIterator::FindNextVisibleKey() {
// When active_ is empty, we know heap top cannot be a range tombstone end
// key. It cannot be a range tombstone start key per PopDeleteRangeStart().
PopDeleteRangeStart(); PopDeleteRangeStart();
while (!minHeap_.empty() && // PopDeleteRangeStart() implies heap top is not DELETE_RANGE_START
(!active_.empty() || minHeap_.top()->IsDeleteRangeSentinelKey()) && // active_ being empty implies no DELETE_RANGE_END in heap.
// So minHeap_->top() must be of type ITERATOR.
while (
!minHeap_.empty() &&
(!active_.empty() || minHeap_.top()->iter.IsDeleteRangeSentinelKey()) &&
SkipNextDeleted()) { SkipNextDeleted()) {
PopDeleteRangeStart(); PopDeleteRangeStart();
} }
@ -1320,8 +1296,12 @@ inline void MergingIterator::FindNextVisibleKey() {
inline void MergingIterator::FindPrevVisibleKey() { inline void MergingIterator::FindPrevVisibleKey() {
PopDeleteRangeEnd(); PopDeleteRangeEnd();
while (!maxHeap_->empty() && // PopDeleteRangeEnd() implies heap top is not DELETE_RANGE_END
(!active_.empty() || maxHeap_->top()->IsDeleteRangeSentinelKey()) && // active_ being empty implies no DELETE_RANGE_START in heap.
// So maxHeap_->top() must be of type ITERATOR.
while (
!maxHeap_->empty() &&
(!active_.empty() || maxHeap_->top()->iter.IsDeleteRangeSentinelKey()) &&
SkipPrevDeleted()) { SkipPrevDeleted()) {
PopDeleteRangeEnd(); PopDeleteRangeEnd();
} }

View file

@ -12,6 +12,7 @@
#include "db/range_del_aggregator.h" #include "db/range_del_aggregator.h"
#include "rocksdb/slice.h" #include "rocksdb/slice.h"
#include "rocksdb/types.h" #include "rocksdb/types.h"
#include "table/iterator_wrapper.h"
namespace ROCKSDB_NAMESPACE { namespace ROCKSDB_NAMESPACE {