diff --git a/CMakeLists.txt b/CMakeLists.txt index 2a4bdb6b22..dd8adb9d37 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -503,6 +503,7 @@ set(SOURCES db/merge_helper.cc db/merge_operator.cc db/range_del_aggregator.cc + db/range_del_aggregator_v2.cc db/range_tombstone_fragmenter.cc db/repair.cc db/snapshot_impl.cc @@ -904,6 +905,7 @@ if(WITH_TESTS) db/plain_table_db_test.cc db/prefix_test.cc db/range_del_aggregator_test.cc + db/range_del_aggregator_v2_test.cc db/range_tombstone_fragmenter_test.cc db/repair_test.cc db/table_properties_collector_test.cc diff --git a/Makefile b/Makefile index 53fecf58ea..5d68772825 100644 --- a/Makefile +++ b/Makefile @@ -554,6 +554,7 @@ TESTS = \ trace_analyzer_test \ repeatable_thread_test \ range_tombstone_fragmenter_test \ + range_del_aggregator_v2_test \ PARALLEL_TEST = \ backupable_db_test \ @@ -1586,6 +1587,9 @@ repeatable_thread_test: util/repeatable_thread_test.o $(LIBOBJECTS) $(TESTHARNES range_tombstone_fragmenter_test: db/range_tombstone_fragmenter_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) +range_del_aggregator_v2_test: db/range_del_aggregator_v2_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS) + $(AM_LINK) + #------------------------------------------------- # make install related stuff INSTALL_PATH ?= /usr/local diff --git a/TARGETS b/TARGETS index 5040ff1539..5ee4fe2f8f 100644 --- a/TARGETS +++ b/TARGETS @@ -123,6 +123,7 @@ cpp_library( "db/merge_helper.cc", "db/merge_operator.cc", "db/range_del_aggregator.cc", + "db/range_del_aggregator_v2.cc", "db/range_tombstone_fragmenter.cc", "db/repair.cc", "db/snapshot_impl.cc", @@ -932,6 +933,11 @@ ROCKS_TESTS = [ "db/range_del_aggregator_test.cc", "serial", ], + [ + "range_del_aggregator_v2_test", + "db/range_del_aggregator_v2_test.cc", + "serial", + ], [ "range_tombstone_fragmenter_test", "db/range_tombstone_fragmenter_test.cc", diff --git a/db/column_family.cc b/db/column_family.cc index 70bc9e8983..30d39307a3 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -24,6 +24,7 @@ #include "db/db_impl.h" #include "db/internal_stats.h" #include "db/job_context.h" +#include "db/range_del_aggregator_v2.h" #include "db/table_properties_collector.h" #include "db/version_set.h" #include "db/write_controller.h" @@ -950,9 +951,8 @@ Status ColumnFamilyData::RangesOverlapWithMemtables( } super_version->imm->AddRangeTombstoneIterators(read_opts, &memtable_range_del_iters); - RangeDelAggregator range_del_agg(internal_comparator_, {} /* snapshots */, - false /* collapse_deletions */); - Status status; + RangeDelAggregatorV2 range_del_agg(&internal_comparator_, + kMaxSequenceNumber /* upper_bound */); { std::unique_ptr memtable_range_del_iter( NewMergingIterator(&internal_comparator_, @@ -960,8 +960,9 @@ Status ColumnFamilyData::RangesOverlapWithMemtables( ? nullptr : &memtable_range_del_iters[0], static_cast(memtable_range_del_iters.size()))); - status = range_del_agg.AddTombstones(std::move(memtable_range_del_iter)); + range_del_agg.AddUnfragmentedTombstones(std::move(memtable_range_del_iter)); } + Status status; for (size_t i = 0; i < ranges.size() && status.ok() && !*overlap; ++i) { auto* vstorage = super_version->current->storage_info(); auto* ucmp = vstorage->InternalComparator()->user_comparator(); diff --git a/db/compaction_job.cc b/db/compaction_job.cc index 7fd4f9c722..8a878fe725 100644 --- a/db/compaction_job.cc +++ b/db/compaction_job.cc @@ -36,6 +36,7 @@ #include "db/memtable_list.h" #include "db/merge_context.h" #include "db/merge_helper.h" +#include "db/range_del_aggregator_v2.h" #include "db/version_set.h" #include "monitoring/iostats_context_imp.h" #include "monitoring/perf_context_imp.h" @@ -804,10 +805,15 @@ Status CompactionJob::Install(const MutableCFOptions& mutable_cf_options) { void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { assert(sub_compact != nullptr); ColumnFamilyData* cfd = sub_compact->compaction->column_family_data(); - std::unique_ptr range_del_agg( - new RangeDelAggregator(cfd->internal_comparator(), existing_snapshots_)); + RangeDelAggregatorV2 range_del_agg_v2(&cfd->internal_comparator(), + kMaxSequenceNumber /* upper_bound */); + auto* range_del_agg = + range_del_agg_v2.DelegateToRangeDelAggregator(existing_snapshots_); + + // Although the v2 aggregator is what the level iterator(s) know about, + // the AddTombstones calls will be propagated down to the v1 aggregator. std::unique_ptr input(versions_->MakeInputIterator( - sub_compact->compaction, range_del_agg.get(), env_optiosn_for_read_)); + sub_compact->compaction, &range_del_agg_v2, env_optiosn_for_read_)); AutoThreadOperationStageUpdater stage_updater( ThreadStatus::STAGE_COMPACTION_PROCESS_KV); @@ -896,8 +902,8 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { input.get(), cfd->user_comparator(), &merge, versions_->LastSequence(), &existing_snapshots_, earliest_write_conflict_snapshot_, snapshot_checker_, env_, ShouldReportDetailedTime(env_, stats_), false, - range_del_agg.get(), sub_compact->compaction, compaction_filter, - shutting_down_, preserve_deletes_seqnum_)); + range_del_agg, sub_compact->compaction, compaction_filter, shutting_down_, + preserve_deletes_seqnum_)); auto c_iter = sub_compact->c_iter.get(); c_iter->SeekToFirst(); if (c_iter->Valid() && sub_compact->compaction->output_level() != 0) { @@ -1034,9 +1040,9 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { next_key = &c_iter->key(); } CompactionIterationStats range_del_out_stats; - status = FinishCompactionOutputFile(input_status, sub_compact, - range_del_agg.get(), - &range_del_out_stats, next_key); + status = + FinishCompactionOutputFile(input_status, sub_compact, range_del_agg, + &range_del_out_stats, next_key); RecordDroppedKeys(range_del_out_stats, &sub_compact->compaction_job_stats); if (sub_compact->outputs.size() == 1) { @@ -1096,8 +1102,8 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { // close the output file. if (sub_compact->builder != nullptr) { CompactionIterationStats range_del_out_stats; - Status s = FinishCompactionOutputFile( - status, sub_compact, range_del_agg.get(), &range_del_out_stats); + Status s = FinishCompactionOutputFile(status, sub_compact, range_del_agg, + &range_del_out_stats); if (status.ok()) { status = s; } diff --git a/db/db_compaction_filter_test.cc b/db/db_compaction_filter_test.cc index 0c906948e6..1b94930318 100644 --- a/db/db_compaction_filter_test.cc +++ b/db/db_compaction_filter_test.cc @@ -340,7 +340,8 @@ TEST_F(DBTestCompactionFilter, CompactionFilter) { Arena arena; { InternalKeyComparator icmp(options.comparator); - RangeDelAggregator range_del_agg(icmp, {} /* snapshots */); + RangeDelAggregatorV2 range_del_agg(&icmp, + kMaxSequenceNumber /* upper_bound */); ScopedArenaIterator iter( dbfull()->NewInternalIterator(&arena, &range_del_agg, handles_[1])); iter->SeekToFirst(); @@ -429,7 +430,8 @@ TEST_F(DBTestCompactionFilter, CompactionFilter) { count = 0; { InternalKeyComparator icmp(options.comparator); - RangeDelAggregator range_del_agg(icmp, {} /* snapshots */); + RangeDelAggregatorV2 range_del_agg(&icmp, + kMaxSequenceNumber /* upper_bound */); ScopedArenaIterator iter( dbfull()->NewInternalIterator(&arena, &range_del_agg, handles_[1])); iter->SeekToFirst(); @@ -646,7 +648,8 @@ TEST_F(DBTestCompactionFilter, CompactionFilterContextManual) { int total = 0; Arena arena; InternalKeyComparator icmp(options.comparator); - RangeDelAggregator range_del_agg(icmp, {} /* snapshots */); + RangeDelAggregatorV2 range_del_agg(&icmp, + kMaxSequenceNumber /* snapshots */); ScopedArenaIterator iter( dbfull()->NewInternalIterator(&arena, &range_del_agg)); iter->SeekToFirst(); @@ -848,7 +851,7 @@ TEST_F(DBTestCompactionFilter, SkipUntilWithBloomFilter) { DestroyAndReopen(options); Put("0000000010", "v10"); - Put("0000000020", "v20"); // skipped + Put("0000000020", "v20"); // skipped Put("0000000050", "v50"); Flush(); diff --git a/db/db_impl.cc b/db/db_impl.cc index 68f03d6c77..61a55de5d2 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1032,7 +1032,7 @@ bool DBImpl::SetPreserveDeletesSequenceNumber(SequenceNumber seqnum) { } InternalIterator* DBImpl::NewInternalIterator( - Arena* arena, RangeDelAggregator* range_del_agg, + Arena* arena, RangeDelAggregatorV2* range_del_agg, ColumnFamilyHandle* column_family) { ColumnFamilyData* cfd; if (column_family == nullptr) { @@ -1152,7 +1152,7 @@ static void CleanupIteratorState(void* arg1, void* /*arg2*/) { InternalIterator* DBImpl::NewInternalIterator( const ReadOptions& read_options, ColumnFamilyData* cfd, SuperVersion* super_version, Arena* arena, - RangeDelAggregator* range_del_agg) { + RangeDelAggregatorV2* range_del_agg) { InternalIterator* internal_iter; assert(arena != nullptr); assert(range_del_agg != nullptr); @@ -1169,7 +1169,7 @@ InternalIterator* DBImpl::NewInternalIterator( if (!read_options.ignore_range_deletions) { range_del_iter.reset( super_version->mem->NewRangeTombstoneIterator(read_options)); - s = range_del_agg->AddTombstones(std::move(range_del_iter)); + range_del_agg->AddUnfragmentedTombstones(std::move(range_del_iter)); } // Collect all needed child iterators for immutable memtables if (s.ok()) { diff --git a/db/db_impl.h b/db/db_impl.h index ef108aaff6..d8740cf950 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -31,6 +31,7 @@ #include "db/log_writer.h" #include "db/logs_with_prep_tracker.h" #include "db/pre_release_callback.h" +#include "db/range_del_aggregator_v2.h" #include "db/read_callback.h" #include "db/snapshot_checker.h" #include "db/snapshot_impl.h" @@ -373,7 +374,7 @@ class DBImpl : public DB { // The keys of this iterator are internal keys (see format.h). // The returned iterator should be deleted when no longer needed. InternalIterator* NewInternalIterator( - Arena* arena, RangeDelAggregator* range_del_agg, + Arena* arena, RangeDelAggregatorV2* range_del_agg, ColumnFamilyHandle* column_family = nullptr); LogsWithPrepTracker* logs_with_prep_tracker() { @@ -581,7 +582,7 @@ class DBImpl : public DB { ColumnFamilyData* cfd, SuperVersion* super_version, Arena* arena, - RangeDelAggregator* range_del_agg); + RangeDelAggregatorV2* range_del_agg); // hollow transactions shell used for recovery. // these will then be passed to TransactionDB so that diff --git a/db/db_iter.cc b/db/db_iter.cc index 6bca929821..f36a41fc26 100644 --- a/db/db_iter.cc +++ b/db/db_iter.cc @@ -134,8 +134,7 @@ class DBIter final: public Iterator { prefix_same_as_start_(read_options.prefix_same_as_start), pin_thru_lifetime_(read_options.pin_data), total_order_seek_(read_options.total_order_seek), - range_del_agg_(cf_options.internal_comparator, s, - true /* collapse_deletions */), + range_del_agg_(&cf_options.internal_comparator, s), read_callback_(read_callback), db_impl_(db_impl), cfd_(cfd), @@ -172,7 +171,7 @@ class DBIter final: public Iterator { iter_ = iter; iter_->SetPinnedItersMgr(&pinned_iters_mgr_); } - virtual RangeDelAggregator* GetRangeDelAggregator() { + virtual RangeDelAggregatorV2* GetRangeDelAggregator() { return &range_del_agg_; } @@ -342,7 +341,7 @@ class DBIter final: public Iterator { const bool total_order_seek_; // List of operands for merge operator. MergeContext merge_context_; - RangeDelAggregator range_del_agg_; + RangeDelAggregatorV2 range_del_agg_; LocalStatistics local_stats_; PinnedIteratorsManager pinned_iters_mgr_; ReadCallback* read_callback_; @@ -1480,7 +1479,7 @@ Iterator* NewDBIterator(Env* env, const ReadOptions& read_options, ArenaWrappedDBIter::~ArenaWrappedDBIter() { db_iter_->~DBIter(); } -RangeDelAggregator* ArenaWrappedDBIter::GetRangeDelAggregator() { +RangeDelAggregatorV2* ArenaWrappedDBIter::GetRangeDelAggregator() { return db_iter_->GetRangeDelAggregator(); } diff --git a/db/db_iter.h b/db/db_iter.h index 8e18f03fc3..3d359bbb1e 100644 --- a/db/db_iter.h +++ b/db/db_iter.h @@ -12,7 +12,7 @@ #include #include "db/db_impl.h" #include "db/dbformat.h" -#include "db/range_del_aggregator.h" +#include "db/range_del_aggregator_v2.h" #include "options/cf_options.h" #include "rocksdb/db.h" #include "rocksdb/iterator.h" @@ -48,7 +48,7 @@ class ArenaWrappedDBIter : public Iterator { // Get the arena to be used to allocate memory for DBIter to be wrapped, // as well as child iterators in it. virtual Arena* GetArena() { return &arena_; } - virtual RangeDelAggregator* GetRangeDelAggregator(); + virtual RangeDelAggregatorV2* GetRangeDelAggregator(); // Set the internal iterator wrapped inside the DB Iterator. Usually it is // a merging iterator. diff --git a/db/db_test_util.cc b/db/db_test_util.cc index fd4d44144f..93e5e07ead 100644 --- a/db/db_test_util.cc +++ b/db/db_test_util.cc @@ -814,7 +814,8 @@ std::string DBTestBase::AllEntriesFor(const Slice& user_key, int cf) { Arena arena; auto options = CurrentOptions(); InternalKeyComparator icmp(options.comparator); - RangeDelAggregator range_del_agg(icmp, {} /* snapshots */); + RangeDelAggregatorV2 range_del_agg(&icmp, + kMaxSequenceNumber /* upper_bound */); ScopedArenaIterator iter; if (cf == 0) { iter.set(dbfull()->NewInternalIterator(&arena, &range_del_agg)); @@ -1225,7 +1226,8 @@ void DBTestBase::validateNumberOfEntries(int numValues, int cf) { Arena arena; auto options = CurrentOptions(); InternalKeyComparator icmp(options.comparator); - RangeDelAggregator range_del_agg(icmp, {} /* snapshots */); + RangeDelAggregatorV2 range_del_agg(&icmp, + kMaxSequenceNumber /* upper_bound */); // This should be defined after range_del_agg so that it destructs the // assigned iterator before it range_del_agg is already destructed. ScopedArenaIterator iter; @@ -1433,7 +1435,8 @@ void DBTestBase::VerifyDBInternal( std::vector> true_data) { Arena arena; InternalKeyComparator icmp(last_options_.comparator); - RangeDelAggregator range_del_agg(icmp, {}); + RangeDelAggregatorV2 range_del_agg(&icmp, + kMaxSequenceNumber /* upper_bound */); auto iter = dbfull()->NewInternalIterator(&arena, &range_del_agg); iter->SeekToFirst(); for (auto p : true_data) { diff --git a/db/forward_iterator.cc b/db/forward_iterator.cc index 01afac79ee..96b514596d 100644 --- a/db/forward_iterator.cc +++ b/db/forward_iterator.cc @@ -15,6 +15,7 @@ #include "db/db_iter.h" #include "db/dbformat.h" #include "db/job_context.h" +#include "db/range_del_aggregator_v2.h" #include "rocksdb/env.h" #include "rocksdb/slice.h" #include "rocksdb/slice_transform.h" @@ -71,8 +72,8 @@ class ForwardLevelIterator : public InternalIterator { delete file_iter_; } - RangeDelAggregator range_del_agg( - cfd_->internal_comparator(), {} /* snapshots */); + RangeDelAggregatorV2 range_del_agg(&cfd_->internal_comparator(), + kMaxSequenceNumber /* upper_bound */); file_iter_ = cfd_->table_cache()->NewIterator( read_options_, *(cfd_->soptions()), cfd_->internal_comparator(), *files_[file_index_], @@ -608,14 +609,14 @@ void ForwardIterator::RebuildIterators(bool refresh_sv) { // New sv_ = cfd_->GetReferencedSuperVersion(&(db_->mutex_)); } - RangeDelAggregator range_del_agg( - cfd_->internal_comparator(), {} /* snapshots */); + RangeDelAggregatorV2 range_del_agg(&cfd_->internal_comparator(), + kMaxSequenceNumber /* upper_bound */); mutable_iter_ = sv_->mem->NewIterator(read_options_, &arena_); sv_->imm->AddIterators(read_options_, &imm_iters_, &arena_); if (!read_options_.ignore_range_deletions) { std::unique_ptr range_del_iter( sv_->mem->NewRangeTombstoneIterator(read_options_)); - range_del_agg.AddTombstones(std::move(range_del_iter)); + range_del_agg.AddUnfragmentedTombstones(std::move(range_del_iter)); sv_->imm->AddRangeTombstoneIterators(read_options_, &arena_, &range_del_agg); } @@ -666,12 +667,12 @@ void ForwardIterator::RenewIterators() { mutable_iter_ = svnew->mem->NewIterator(read_options_, &arena_); svnew->imm->AddIterators(read_options_, &imm_iters_, &arena_); - RangeDelAggregator range_del_agg( - cfd_->internal_comparator(), {} /* snapshots */); + RangeDelAggregatorV2 range_del_agg(&cfd_->internal_comparator(), + kMaxSequenceNumber /* upper_bound */); if (!read_options_.ignore_range_deletions) { std::unique_ptr range_del_iter( svnew->mem->NewRangeTombstoneIterator(read_options_)); - range_del_agg.AddTombstones(std::move(range_del_iter)); + range_del_agg.AddUnfragmentedTombstones(std::move(range_del_iter)); svnew->imm->AddRangeTombstoneIterators(read_options_, &arena_, &range_del_agg); } diff --git a/db/memtable_list.cc b/db/memtable_list.cc index 069487f546..83da2ca8e0 100644 --- a/db/memtable_list.cc +++ b/db/memtable_list.cc @@ -158,15 +158,12 @@ bool MemTableListVersion::GetFromList( Status MemTableListVersion::AddRangeTombstoneIterators( const ReadOptions& read_opts, Arena* /*arena*/, - RangeDelAggregator* range_del_agg) { + RangeDelAggregatorV2* range_del_agg) { assert(range_del_agg != nullptr); for (auto& m : memlist_) { std::unique_ptr range_del_iter( m->NewRangeTombstoneIterator(read_opts)); - Status s = range_del_agg->AddTombstones(std::move(range_del_iter)); - if (!s.ok()) { - return s; - } + range_del_agg->AddUnfragmentedTombstones(std::move(range_del_iter)); } return Status::OK(); } diff --git a/db/memtable_list.h b/db/memtable_list.h index 8f23def13c..a6352ae7f1 100644 --- a/db/memtable_list.h +++ b/db/memtable_list.h @@ -15,7 +15,7 @@ #include "db/dbformat.h" #include "db/logs_with_prep_tracker.h" #include "db/memtable.h" -#include "db/range_del_aggregator.h" +#include "db/range_del_aggregator_v2.h" #include "monitoring/instrumented_mutex.h" #include "rocksdb/db.h" #include "rocksdb/iterator.h" @@ -91,7 +91,7 @@ class MemTableListVersion { } Status AddRangeTombstoneIterators(const ReadOptions& read_opts, Arena* arena, - RangeDelAggregator* range_del_agg); + RangeDelAggregatorV2* range_del_agg); Status AddRangeTombstoneIterators( const ReadOptions& read_opts, std::vector* range_del_iters); diff --git a/db/range_del_aggregator_bench.cc b/db/range_del_aggregator_bench.cc index 2e7f9de3fc..f8efba24f4 100644 --- a/db/range_del_aggregator_bench.cc +++ b/db/range_del_aggregator_bench.cc @@ -20,6 +20,8 @@ int main() { #include #include "db/range_del_aggregator.h" +#include "db/range_del_aggregator_v2.h" +#include "db/range_tombstone_fragmenter.h" #include "rocksdb/comparator.h" #include "rocksdb/env.h" #include "util/coding.h" @@ -33,7 +35,7 @@ using GFLAGS_NAMESPACE::ParseCommandLineFlags; DEFINE_int32(num_range_tombstones, 1000, "number of range tombstones created"); -DEFINE_int32(num_runs, 10000, "number of test runs"); +DEFINE_int32(num_runs, 1000, "number of test runs"); DEFINE_int32(tombstone_start_upper_bound, 1000, "exclusive upper bound on range tombstone start keys"); @@ -55,6 +57,8 @@ DEFINE_int32(should_deletes_per_run, 1, "number of ShouldDelete calls per run"); DEFINE_int32(add_tombstones_per_run, 1, "number of AddTombstones calls per run"); +DEFINE_bool(use_v2_aggregator, false, "benchmark RangeDelAggregatorV2"); + namespace { struct Stats { @@ -85,6 +89,8 @@ std::ostream& operator<<(std::ostream& os, const Stats& s) { return os; } +auto icmp = rocksdb::InternalKeyComparator(rocksdb::BytewiseComparator()); + } // anonymous namespace namespace rocksdb { @@ -186,9 +192,13 @@ int main(int argc, char** argv) { : rocksdb::RangeDelPositioningMode::kFullScan; for (int i = 0; i < FLAGS_num_runs; i++) { - auto icmp = rocksdb::InternalKeyComparator(rocksdb::BytewiseComparator()); rocksdb::RangeDelAggregator range_del_agg(icmp, {} /* snapshots */, FLAGS_use_collapsed); + rocksdb::RangeDelAggregatorV2 range_del_agg_v2( + &icmp, rocksdb::kMaxSequenceNumber /* upper_bound */); + + std::vector > + fragmented_range_tombstone_lists(FLAGS_add_tombstones_per_run); for (auto& persistent_range_tombstones : all_persistent_range_tombstones) { // TODO(abhimadan): consider whether creating the range tombstones right @@ -203,10 +213,27 @@ int main(int argc, char** argv) { auto range_del_iter = rocksdb::MakeRangeDelIterator(persistent_range_tombstones); - rocksdb::StopWatchNano stop_watch_add_tombstones(rocksdb::Env::Default(), - true /* auto_start */); - range_del_agg.AddTombstones(std::move(range_del_iter)); - stats.time_add_tombstones += stop_watch_add_tombstones.ElapsedNanos(); + fragmented_range_tombstone_lists.emplace_back( + new rocksdb::FragmentedRangeTombstoneList( + rocksdb::MakeRangeDelIterator(persistent_range_tombstones), icmp, + false /* one_time_use */)); + std::unique_ptr + fragmented_range_del_iter( + new rocksdb::FragmentedRangeTombstoneIterator( + fragmented_range_tombstone_lists.back().get(), + rocksdb::kMaxSequenceNumber, icmp)); + + if (FLAGS_use_v2_aggregator) { + rocksdb::StopWatchNano stop_watch_add_tombstones( + rocksdb::Env::Default(), true /* auto_start */); + range_del_agg_v2.AddTombstones(std::move(fragmented_range_del_iter)); + stats.time_add_tombstones += stop_watch_add_tombstones.ElapsedNanos(); + } else { + rocksdb::StopWatchNano stop_watch_add_tombstones( + rocksdb::Env::Default(), true /* auto_start */); + range_del_agg.AddTombstones(std::move(range_del_iter)); + stats.time_add_tombstones += stop_watch_add_tombstones.ElapsedNanos(); + } } rocksdb::ParsedInternalKey parsed_key; @@ -220,10 +247,18 @@ int main(int argc, char** argv) { std::string key_string = rocksdb::Key(first_key + j); parsed_key.user_key = key_string; - rocksdb::StopWatchNano stop_watch_should_delete(rocksdb::Env::Default(), - true /* auto_start */); - range_del_agg.ShouldDelete(parsed_key, mode); - uint64_t call_time = stop_watch_should_delete.ElapsedNanos(); + uint64_t call_time; + if (FLAGS_use_v2_aggregator) { + rocksdb::StopWatchNano stop_watch_should_delete(rocksdb::Env::Default(), + true /* auto_start */); + range_del_agg_v2.ShouldDelete(parsed_key, mode); + call_time = stop_watch_should_delete.ElapsedNanos(); + } else { + rocksdb::StopWatchNano stop_watch_should_delete(rocksdb::Env::Default(), + true /* auto_start */); + range_del_agg.ShouldDelete(parsed_key, mode); + call_time = stop_watch_should_delete.ElapsedNanos(); + } if (j == 0) { stats.time_first_should_delete += call_time; diff --git a/db/range_del_aggregator_v2.cc b/db/range_del_aggregator_v2.cc new file mode 100644 index 0000000000..015a5b2d82 --- /dev/null +++ b/db/range_del_aggregator_v2.cc @@ -0,0 +1,195 @@ +// Copyright (c) 2018-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "db/range_del_aggregator_v2.h" + +#include "db/compaction_iteration_stats.h" +#include "db/dbformat.h" +#include "db/pinned_iterators_manager.h" +#include "db/range_del_aggregator.h" +#include "db/range_tombstone_fragmenter.h" +#include "db/version_edit.h" +#include "include/rocksdb/comparator.h" +#include "include/rocksdb/types.h" +#include "table/internal_iterator.h" +#include "table/scoped_arena_iterator.h" +#include "table/table_builder.h" +#include "util/heap.h" +#include "util/kv_map.h" +#include "util/vector_iterator.h" + +namespace rocksdb { + +TruncatedRangeDelIterator::TruncatedRangeDelIterator( + std::unique_ptr iter, + const InternalKeyComparator* icmp, const InternalKey* smallest, + const InternalKey* largest) + : iter_(std::move(iter)), icmp_(icmp) { + if (smallest != nullptr) { + pinned_bounds_.emplace_back(); + auto& parsed_smallest = pinned_bounds_.back(); + if (!ParseInternalKey(smallest->Encode(), &parsed_smallest)) { + assert(false); + } + smallest_ = &parsed_smallest; + } + if (largest != nullptr) { + pinned_bounds_.emplace_back(); + auto& parsed_largest = pinned_bounds_.back(); + if (!ParseInternalKey(largest->Encode(), &parsed_largest)) { + assert(false); + } + if (parsed_largest.type == kTypeRangeDeletion && + parsed_largest.sequence == kMaxSequenceNumber) { + // The file boundary has been artificially extended by a range tombstone. + // We do not need to adjust largest to properly truncate range + // tombstones that extend past the boundary. + } else if (parsed_largest.sequence == 0) { + // The largest key in the sstable has a sequence number of 0. Since we + // guarantee that no internal keys with the same user key and sequence + // number can exist in a DB, we know that the largest key in this sstable + // cannot exist as the smallest key in the next sstable. This further + // implies that no range tombstone in this sstable covers largest; + // otherwise, the file boundary would have been artificially extended. + // + // Therefore, we will never truncate a range tombstone at largest, so we + // can leave it unchanged. + } else { + // The same user key may straddle two sstable boundaries. To ensure that + // the truncated end key can cover the largest key in this sstable, reduce + // its sequence number by 1. + parsed_largest.sequence -= 1; + } + largest_ = &parsed_largest; + } +} + +bool TruncatedRangeDelIterator::Valid() const { + return iter_->Valid() && + (smallest_ == nullptr || + icmp_->Compare(*smallest_, iter_->parsed_end_key()) < 0) && + (largest_ == nullptr || + icmp_->Compare(iter_->parsed_start_key(), *largest_) < 0); +} + +void TruncatedRangeDelIterator::Next() { iter_->TopNext(); } + +void TruncatedRangeDelIterator::Prev() { iter_->TopPrev(); } + +// NOTE: target is a user key +void TruncatedRangeDelIterator::Seek(const Slice& target) { + if (largest_ != nullptr && + icmp_->Compare(*largest_, ParsedInternalKey(target, kMaxSequenceNumber, + kTypeRangeDeletion)) <= 0) { + iter_->Invalidate(); + return; + } + iter_->Seek(target); +} + +// NOTE: target is a user key +void TruncatedRangeDelIterator::SeekForPrev(const Slice& target) { + if (smallest_ != nullptr && + icmp_->Compare(ParsedInternalKey(target, 0, kTypeRangeDeletion), + *smallest_) < 0) { + iter_->Invalidate(); + return; + } + iter_->SeekForPrev(target); +} + +void TruncatedRangeDelIterator::SeekToFirst() { iter_->SeekToTopFirst(); } + +void TruncatedRangeDelIterator::SeekToLast() { iter_->SeekToTopLast(); } + +RangeDelAggregatorV2::RangeDelAggregatorV2(const InternalKeyComparator* icmp, + SequenceNumber upper_bound) + : icmp_(icmp), upper_bound_(upper_bound) {} + +void RangeDelAggregatorV2::AddTombstones( + std::unique_ptr input_iter, + const InternalKey* smallest, const InternalKey* largest) { + if (input_iter == nullptr || input_iter->empty()) { + return; + } + if (wrapped_range_del_agg != nullptr) { + wrapped_range_del_agg->AddTombstones(std::move(input_iter), smallest, + largest); + // TODO: this eats the status of the wrapped call; may want to propagate it + return; + } + iters_.emplace_back(new TruncatedRangeDelIterator(std::move(input_iter), + icmp_, smallest, largest)); +} + +void RangeDelAggregatorV2::AddUnfragmentedTombstones( + std::unique_ptr input_iter) { + assert(wrapped_range_del_agg == nullptr); + if (input_iter == nullptr) { + return; + } + pinned_fragments_.emplace_back(new FragmentedRangeTombstoneList( + std::move(input_iter), *icmp_, false /* one_time_use */)); + auto fragmented_iter = new FragmentedRangeTombstoneIterator( + pinned_fragments_.back().get(), upper_bound_, *icmp_); + AddTombstones( + std::unique_ptr(fragmented_iter)); +} + +bool RangeDelAggregatorV2::ShouldDelete(const ParsedInternalKey& parsed, + RangeDelPositioningMode mode) { + if (wrapped_range_del_agg != nullptr) { + return wrapped_range_del_agg->ShouldDelete(parsed, mode); + } + // TODO: avoid re-seeking every call + for (auto& iter : iters_) { + iter->Seek(parsed.user_key); + if (iter->Valid() && icmp_->Compare(iter->start_key(), parsed) <= 0 && + iter->seq() > parsed.sequence) { + return true; + } + } + return false; +} + +bool RangeDelAggregatorV2::IsRangeOverlapped(const Slice& start, + const Slice& end) { + assert(wrapped_range_del_agg == nullptr); + + // Set the internal start/end keys so that: + // - if start_ikey has the same user key and sequence number as the current + // end key, start_ikey will be considered greater; and + // - if end_ikey has the same user key and sequence number as the current + // start key, end_ikey will be considered greater. + ParsedInternalKey start_ikey(start, kMaxSequenceNumber, + static_cast(0)); + ParsedInternalKey end_ikey(end, 0, static_cast(0)); + for (auto& iter : iters_) { + bool checked_candidate_tombstones = false; + for (iter->SeekForPrev(start); + iter->Valid() && icmp_->Compare(iter->start_key(), end_ikey) <= 0; + iter->Next()) { + checked_candidate_tombstones = true; + if (icmp_->Compare(start_ikey, iter->end_key()) < 0 && + icmp_->Compare(iter->start_key(), end_ikey) <= 0) { + return true; + } + } + + if (!checked_candidate_tombstones) { + // Do an additional check for when the end of the range is the begin key + // of a tombstone, which we missed earlier since SeekForPrev'ing to the + // start was invalid. + iter->SeekForPrev(end); + if (iter->Valid() && icmp_->Compare(start_ikey, iter->end_key()) < 0 && + icmp_->Compare(iter->start_key(), end_ikey) <= 0) { + return true; + } + } + } + return false; +} + +} // namespace rocksdb diff --git a/db/range_del_aggregator_v2.h b/db/range_del_aggregator_v2.h new file mode 100644 index 0000000000..6689fad506 --- /dev/null +++ b/db/range_del_aggregator_v2.h @@ -0,0 +1,134 @@ +// Copyright (c) 2018-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#include +#include +#include +#include +#include + +#include "db/compaction_iteration_stats.h" +#include "db/dbformat.h" +#include "db/pinned_iterators_manager.h" +#include "db/range_del_aggregator.h" +#include "db/range_tombstone_fragmenter.h" +#include "db/version_edit.h" +#include "include/rocksdb/comparator.h" +#include "include/rocksdb/types.h" +#include "table/internal_iterator.h" +#include "table/scoped_arena_iterator.h" +#include "table/table_builder.h" +#include "util/heap.h" +#include "util/kv_map.h" + +namespace rocksdb { + +class RangeDelAggregatorV2; + +class TruncatedRangeDelIterator { + public: + TruncatedRangeDelIterator( + std::unique_ptr iter, + const InternalKeyComparator* icmp, const InternalKey* smallest, + const InternalKey* largest); + + bool Valid() const; + + void Next(); + void Prev(); + + // Seeks to the tombstone with the highest viisble sequence number that covers + // target (a user key). If no such tombstone exists, the position will be at + // the earliest tombstone that ends after target. + void Seek(const Slice& target); + + // Seeks to the tombstone with the highest viisble sequence number that covers + // target (a user key). If no such tombstone exists, the position will be at + // the latest tombstone that starts before target. + void SeekForPrev(const Slice& target); + + void SeekToFirst(); + void SeekToLast(); + + ParsedInternalKey start_key() const { + return (smallest_ == nullptr || + icmp_->Compare(*smallest_, iter_->parsed_start_key()) <= 0) + ? iter_->parsed_start_key() + : *smallest_; + } + + ParsedInternalKey end_key() const { + return (largest_ == nullptr || + icmp_->Compare(iter_->parsed_end_key(), *largest_) <= 0) + ? iter_->parsed_end_key() + : *largest_; + } + + SequenceNumber seq() const { return iter_->seq(); } + + private: + std::unique_ptr iter_; + const InternalKeyComparator* icmp_; + const ParsedInternalKey* smallest_ = nullptr; + const ParsedInternalKey* largest_ = nullptr; + std::list pinned_bounds_; +}; + +class RangeDelAggregatorV2 { + public: + RangeDelAggregatorV2(const InternalKeyComparator* icmp, + SequenceNumber upper_bound); + + void AddTombstones( + std::unique_ptr input_iter, + const InternalKey* smallest = nullptr, + const InternalKey* largest = nullptr); + + void AddUnfragmentedTombstones(std::unique_ptr input_iter); + + bool ShouldDelete(const ParsedInternalKey& parsed, + RangeDelPositioningMode mode); + + bool IsRangeOverlapped(const Slice& start, const Slice& end); + + // TODO: no-op for now, but won't be once ShouldDelete leverages positioning + // mode and doesn't re-seek every ShouldDelete + void InvalidateRangeDelMapPositions() {} + + bool IsEmpty() const { return iters_.empty(); } + bool AddFile(uint64_t file_number) { + return files_seen_.insert(file_number).second; + } + + // Adaptor method to pass calls through to an old-style RangeDelAggregator. + // Will be removed once this new version supports an iterator that can be used + // during flush/compaction. + RangeDelAggregator* DelegateToRangeDelAggregator( + const std::vector& snapshots) { + wrapped_range_del_agg.reset(new RangeDelAggregator( + *icmp_, snapshots, true /* collapse_deletions */)); + return wrapped_range_del_agg.get(); + } + + std::unique_ptr NewIterator() { + assert(wrapped_range_del_agg != nullptr); + return wrapped_range_del_agg->NewIterator(); + } + + private: + const InternalKeyComparator* icmp_; + SequenceNumber upper_bound_; + + std::vector> iters_; + std::list> pinned_fragments_; + std::set files_seen_; + + // TODO: remove once V2 supports exposing tombstone iterators + std::unique_ptr wrapped_range_del_agg; +}; + +} // namespace rocksdb diff --git a/db/range_del_aggregator_v2_test.cc b/db/range_del_aggregator_v2_test.cc new file mode 100644 index 0000000000..6c9b51e358 --- /dev/null +++ b/db/range_del_aggregator_v2_test.cc @@ -0,0 +1,469 @@ +// Copyright (c) 2018-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "db/range_del_aggregator_v2.h" + +#include +#include +#include + +#include "db/db_test_util.h" +#include "db/dbformat.h" +#include "db/range_tombstone_fragmenter.h" +#include "util/testutil.h" + +namespace rocksdb { + +class RangeDelAggregatorV2Test : public testing::Test {}; + +namespace { + +static auto bytewise_icmp = InternalKeyComparator(BytewiseComparator()); + +std::unique_ptr MakeRangeDelIter( + const std::vector& range_dels) { + std::vector keys, values; + for (const auto& range_del : range_dels) { + auto key_and_value = range_del.Serialize(); + keys.push_back(key_and_value.first.Encode().ToString()); + values.push_back(key_and_value.second.ToString()); + } + return std::unique_ptr( + new test::VectorIterator(keys, values)); +} + +std::vector> +MakeFragmentedTombstoneLists( + const std::vector>& range_dels_list) { + std::vector> fragment_lists; + for (const auto& range_dels : range_dels_list) { + auto range_del_iter = MakeRangeDelIter(range_dels); + fragment_lists.emplace_back(new FragmentedRangeTombstoneList( + std::move(range_del_iter), bytewise_icmp, false /* one_time_use */)); + } + return fragment_lists; +} + +struct TruncatedIterScanTestCase { + ParsedInternalKey start; + ParsedInternalKey end; + SequenceNumber seq; +}; + +struct TruncatedIterSeekTestCase { + Slice target; + ParsedInternalKey start; + ParsedInternalKey end; + SequenceNumber seq; + bool invalid; +}; + +struct ShouldDeleteTestCase { + ParsedInternalKey lookup_key; + bool result; +}; + +struct IsRangeOverlappedTestCase { + Slice start; + Slice end; + bool result; +}; + +ParsedInternalKey UncutEndpoint(const Slice& s) { + return ParsedInternalKey(s, kMaxSequenceNumber, kTypeRangeDeletion); +} + +ParsedInternalKey InternalValue(const Slice& key, SequenceNumber seq) { + return ParsedInternalKey(key, seq, kTypeValue); +} + +void VerifyIterator( + TruncatedRangeDelIterator* iter, const InternalKeyComparator& icmp, + const std::vector& expected_range_dels) { + // Test forward iteration. + iter->SeekToFirst(); + for (size_t i = 0; i < expected_range_dels.size(); i++, iter->Next()) { + ASSERT_TRUE(iter->Valid()); + EXPECT_EQ(0, icmp.Compare(iter->start_key(), expected_range_dels[i].start)); + EXPECT_EQ(0, icmp.Compare(iter->end_key(), expected_range_dels[i].end)); + EXPECT_EQ(expected_range_dels[i].seq, iter->seq()); + } + EXPECT_FALSE(iter->Valid()); + + // Test reverse iteration. + iter->SeekToLast(); + std::vector reverse_expected_range_dels( + expected_range_dels.rbegin(), expected_range_dels.rend()); + for (size_t i = 0; i < reverse_expected_range_dels.size(); + i++, iter->Prev()) { + ASSERT_TRUE(iter->Valid()); + EXPECT_EQ(0, icmp.Compare(iter->start_key(), + reverse_expected_range_dels[i].start)); + EXPECT_EQ( + 0, icmp.Compare(iter->end_key(), reverse_expected_range_dels[i].end)); + EXPECT_EQ(reverse_expected_range_dels[i].seq, iter->seq()); + } + EXPECT_FALSE(iter->Valid()); +} + +void VerifySeek(TruncatedRangeDelIterator* iter, + const InternalKeyComparator& icmp, + const std::vector& test_cases) { + for (const auto& test_case : test_cases) { + iter->Seek(test_case.target); + if (test_case.invalid) { + ASSERT_FALSE(iter->Valid()); + } else { + ASSERT_TRUE(iter->Valid()); + EXPECT_EQ(0, icmp.Compare(iter->start_key(), test_case.start)); + EXPECT_EQ(0, icmp.Compare(iter->end_key(), test_case.end)); + EXPECT_EQ(test_case.seq, iter->seq()); + } + } +} + +void VerifySeekForPrev( + TruncatedRangeDelIterator* iter, const InternalKeyComparator& icmp, + const std::vector& test_cases) { + for (const auto& test_case : test_cases) { + iter->SeekForPrev(test_case.target); + if (test_case.invalid) { + ASSERT_FALSE(iter->Valid()); + } else { + ASSERT_TRUE(iter->Valid()); + EXPECT_EQ(0, icmp.Compare(iter->start_key(), test_case.start)); + EXPECT_EQ(0, icmp.Compare(iter->end_key(), test_case.end)); + EXPECT_EQ(test_case.seq, iter->seq()); + } + } +} + +void VerifyShouldDelete(RangeDelAggregatorV2* range_del_agg, + const std::vector& test_cases) { + for (const auto& test_case : test_cases) { + EXPECT_EQ( + test_case.result, + range_del_agg->ShouldDelete( + test_case.lookup_key, RangeDelPositioningMode::kForwardTraversal)); + } + for (auto it = test_cases.rbegin(); it != test_cases.rend(); ++it) { + const auto& test_case = *it; + EXPECT_EQ( + test_case.result, + range_del_agg->ShouldDelete( + test_case.lookup_key, RangeDelPositioningMode::kBackwardTraversal)); + } +} + +void VerifyIsRangeOverlapped( + RangeDelAggregatorV2* range_del_agg, + const std::vector& test_cases) { + for (const auto& test_case : test_cases) { + EXPECT_EQ(test_case.result, + range_del_agg->IsRangeOverlapped(test_case.start, test_case.end)); + } +} + +} // namespace + +TEST_F(RangeDelAggregatorV2Test, EmptyTruncatedIter) { + auto range_del_iter = MakeRangeDelIter({}); + FragmentedRangeTombstoneList fragment_list( + std::move(range_del_iter), bytewise_icmp, true /* one_time_use */); + std::unique_ptr input_iter( + new FragmentedRangeTombstoneIterator(&fragment_list, kMaxSequenceNumber, + bytewise_icmp)); + + TruncatedRangeDelIterator iter(std::move(input_iter), &bytewise_icmp, nullptr, + nullptr); + + iter.SeekToFirst(); + ASSERT_FALSE(iter.Valid()); + + iter.SeekToLast(); + ASSERT_FALSE(iter.Valid()); +} + +TEST_F(RangeDelAggregatorV2Test, UntruncatedIter) { + auto range_del_iter = + MakeRangeDelIter({{"a", "e", 10}, {"e", "g", 8}, {"j", "n", 4}}); + FragmentedRangeTombstoneList fragment_list( + std::move(range_del_iter), bytewise_icmp, false /* one_time_use */); + std::unique_ptr input_iter( + new FragmentedRangeTombstoneIterator(&fragment_list, kMaxSequenceNumber, + bytewise_icmp)); + + TruncatedRangeDelIterator iter(std::move(input_iter), &bytewise_icmp, nullptr, + nullptr); + + VerifyIterator(&iter, bytewise_icmp, + {{UncutEndpoint("a"), UncutEndpoint("e"), 10}, + {UncutEndpoint("e"), UncutEndpoint("g"), 8}, + {UncutEndpoint("j"), UncutEndpoint("n"), 4}}); + + VerifySeek( + &iter, bytewise_icmp, + {{"d", UncutEndpoint("a"), UncutEndpoint("e"), 10}, + {"e", UncutEndpoint("e"), UncutEndpoint("g"), 8}, + {"ia", UncutEndpoint("j"), UncutEndpoint("n"), 4}, + {"n", UncutEndpoint(""), UncutEndpoint(""), 0, true /* invalid */}, + {"", UncutEndpoint("a"), UncutEndpoint("e"), 10}}); + + VerifySeekForPrev( + &iter, bytewise_icmp, + {{"d", UncutEndpoint("a"), UncutEndpoint("e"), 10}, + {"e", UncutEndpoint("e"), UncutEndpoint("g"), 8}, + {"ia", UncutEndpoint("e"), UncutEndpoint("g"), 8}, + {"n", UncutEndpoint("j"), UncutEndpoint("n"), 4}, + {"", UncutEndpoint(""), UncutEndpoint(""), 0, true /* invalid */}}); +} + +TEST_F(RangeDelAggregatorV2Test, UntruncatedIterWithSnapshot) { + auto range_del_iter = + MakeRangeDelIter({{"a", "e", 10}, {"e", "g", 8}, {"j", "n", 4}}); + FragmentedRangeTombstoneList fragment_list( + std::move(range_del_iter), bytewise_icmp, false /* one_time_use */); + std::unique_ptr input_iter( + new FragmentedRangeTombstoneIterator(&fragment_list, 9 /* snapshot */, + bytewise_icmp)); + + TruncatedRangeDelIterator iter(std::move(input_iter), &bytewise_icmp, nullptr, + nullptr); + + VerifyIterator(&iter, bytewise_icmp, + {{UncutEndpoint("e"), UncutEndpoint("g"), 8}, + {UncutEndpoint("j"), UncutEndpoint("n"), 4}}); + + VerifySeek( + &iter, bytewise_icmp, + {{"d", UncutEndpoint("e"), UncutEndpoint("g"), 8}, + {"e", UncutEndpoint("e"), UncutEndpoint("g"), 8}, + {"ia", UncutEndpoint("j"), UncutEndpoint("n"), 4}, + {"n", UncutEndpoint(""), UncutEndpoint(""), 0, true /* invalid */}, + {"", UncutEndpoint("e"), UncutEndpoint("g"), 8}}); + + VerifySeekForPrev( + &iter, bytewise_icmp, + {{"d", UncutEndpoint(""), UncutEndpoint(""), 0, true /* invalid */}, + {"e", UncutEndpoint("e"), UncutEndpoint("g"), 8}, + {"ia", UncutEndpoint("e"), UncutEndpoint("g"), 8}, + {"n", UncutEndpoint("j"), UncutEndpoint("n"), 4}, + {"", UncutEndpoint(""), UncutEndpoint(""), 0, true /* invalid */}}); +} + +TEST_F(RangeDelAggregatorV2Test, TruncatedIter) { + auto range_del_iter = + MakeRangeDelIter({{"a", "e", 10}, {"e", "g", 8}, {"j", "n", 4}}); + FragmentedRangeTombstoneList fragment_list( + std::move(range_del_iter), bytewise_icmp, false /* one_time_use */); + std::unique_ptr input_iter( + new FragmentedRangeTombstoneIterator(&fragment_list, kMaxSequenceNumber, + bytewise_icmp)); + + InternalKey smallest("d", 7, kTypeValue); + InternalKey largest("m", 9, kTypeValue); + TruncatedRangeDelIterator iter(std::move(input_iter), &bytewise_icmp, + &smallest, &largest); + + VerifyIterator(&iter, bytewise_icmp, + {{InternalValue("d", 7), UncutEndpoint("e"), 10}, + {UncutEndpoint("e"), UncutEndpoint("g"), 8}, + {UncutEndpoint("j"), InternalValue("m", 8), 4}}); + + VerifySeek( + &iter, bytewise_icmp, + {{"d", InternalValue("d", 7), UncutEndpoint("e"), 10}, + {"e", UncutEndpoint("e"), UncutEndpoint("g"), 8}, + {"ia", UncutEndpoint("j"), InternalValue("m", 8), 4}, + {"n", UncutEndpoint(""), UncutEndpoint(""), 0, true /* invalid */}, + {"", InternalValue("d", 7), UncutEndpoint("e"), 10}}); + + VerifySeekForPrev( + &iter, bytewise_icmp, + {{"d", InternalValue("d", 7), UncutEndpoint("e"), 10}, + {"e", UncutEndpoint("e"), UncutEndpoint("g"), 8}, + {"ia", UncutEndpoint("e"), UncutEndpoint("g"), 8}, + {"n", UncutEndpoint("j"), InternalValue("m", 8), 4}, + {"", UncutEndpoint(""), UncutEndpoint(""), 0, true /* invalid */}}); +} + +TEST_F(RangeDelAggregatorV2Test, SingleIterInAggregator) { + auto range_del_iter = MakeRangeDelIter({{"a", "e", 10}, {"c", "g", 8}}); + FragmentedRangeTombstoneList fragment_list( + std::move(range_del_iter), bytewise_icmp, false /* one_time_use */); + std::unique_ptr input_iter( + new FragmentedRangeTombstoneIterator(&fragment_list, kMaxSequenceNumber, + bytewise_icmp)); + + RangeDelAggregatorV2 range_del_agg(&bytewise_icmp, kMaxSequenceNumber); + range_del_agg.AddTombstones(std::move(input_iter)); + + VerifyShouldDelete(&range_del_agg, {{InternalValue("a", 19), false}, + {InternalValue("b", 9), true}, + {InternalValue("d", 9), true}, + {InternalValue("e", 7), true}, + {InternalValue("g", 7), false}}); + + VerifyIsRangeOverlapped(&range_del_agg, {{"", "_", false}, + {"_", "a", true}, + {"a", "c", true}, + {"d", "f", true}, + {"g", "l", false}}); +} + +TEST_F(RangeDelAggregatorV2Test, MultipleItersInAggregator) { + auto fragment_lists = MakeFragmentedTombstoneLists( + {{{"a", "e", 10}, {"c", "g", 8}}, + {{"a", "b", 20}, {"h", "i", 25}, {"ii", "j", 15}}}); + + RangeDelAggregatorV2 range_del_agg(&bytewise_icmp, kMaxSequenceNumber); + for (const auto& fragment_list : fragment_lists) { + std::unique_ptr input_iter( + new FragmentedRangeTombstoneIterator( + fragment_list.get(), kMaxSequenceNumber, bytewise_icmp)); + range_del_agg.AddTombstones(std::move(input_iter)); + } + + VerifyShouldDelete(&range_del_agg, {{InternalValue("a", 19), true}, + {InternalValue("b", 19), false}, + {InternalValue("b", 9), true}, + {InternalValue("d", 9), true}, + {InternalValue("e", 7), true}, + {InternalValue("g", 7), false}, + {InternalValue("h", 24), true}, + {InternalValue("i", 24), false}, + {InternalValue("ii", 14), true}, + {InternalValue("j", 14), false}}); + + VerifyIsRangeOverlapped(&range_del_agg, {{"", "_", false}, + {"_", "a", true}, + {"a", "c", true}, + {"d", "f", true}, + {"g", "l", true}, + {"x", "y", false}}); +} + +TEST_F(RangeDelAggregatorV2Test, MultipleItersInAggregatorWithUpperBound) { + auto fragment_lists = MakeFragmentedTombstoneLists( + {{{"a", "e", 10}, {"c", "g", 8}}, + {{"a", "b", 20}, {"h", "i", 25}, {"ii", "j", 15}}}); + + RangeDelAggregatorV2 range_del_agg(&bytewise_icmp, 19); + for (const auto& fragment_list : fragment_lists) { + std::unique_ptr input_iter( + new FragmentedRangeTombstoneIterator(fragment_list.get(), + 19 /* snapshot */, bytewise_icmp)); + range_del_agg.AddTombstones(std::move(input_iter)); + } + + VerifyShouldDelete(&range_del_agg, {{InternalValue("a", 19), false}, + {InternalValue("a", 9), true}, + {InternalValue("b", 9), true}, + {InternalValue("d", 9), true}, + {InternalValue("e", 7), true}, + {InternalValue("g", 7), false}, + {InternalValue("h", 24), false}, + {InternalValue("i", 24), false}, + {InternalValue("ii", 14), true}, + {InternalValue("j", 14), false}}); + + VerifyIsRangeOverlapped(&range_del_agg, {{"", "_", false}, + {"_", "a", true}, + {"a", "c", true}, + {"d", "f", true}, + {"g", "l", true}, + {"x", "y", false}}); +} + +TEST_F(RangeDelAggregatorV2Test, MultipleTruncatedItersInAggregator) { + auto fragment_lists = MakeFragmentedTombstoneLists( + {{{"a", "z", 10}}, {{"a", "z", 10}}, {{"a", "z", 10}}}); + std::vector> iter_bounds = { + {InternalKey("a", 4, kTypeValue), + InternalKey("m", kMaxSequenceNumber, kTypeRangeDeletion)}, + {InternalKey("m", 20, kTypeValue), + InternalKey("x", kMaxSequenceNumber, kTypeRangeDeletion)}, + {InternalKey("x", 5, kTypeValue), InternalKey("zz", 30, kTypeValue)}}; + + RangeDelAggregatorV2 range_del_agg(&bytewise_icmp, 19); + for (size_t i = 0; i < fragment_lists.size(); i++) { + const auto& fragment_list = fragment_lists[i]; + const auto& bounds = iter_bounds[i]; + std::unique_ptr input_iter( + new FragmentedRangeTombstoneIterator(fragment_list.get(), + 19 /* snapshot */, bytewise_icmp)); + range_del_agg.AddTombstones(std::move(input_iter), &bounds.first, + &bounds.second); + } + + VerifyShouldDelete(&range_del_agg, {{InternalValue("a", 10), false}, + {InternalValue("a", 9), false}, + {InternalValue("a", 4), true}, + {InternalValue("m", 10), false}, + {InternalValue("m", 9), true}, + {InternalValue("x", 10), false}, + {InternalValue("x", 9), false}, + {InternalValue("x", 5), true}, + {InternalValue("z", 9), false}}); + + VerifyIsRangeOverlapped(&range_del_agg, {{"", "_", false}, + {"_", "a", true}, + {"a", "n", true}, + {"l", "x", true}, + {"w", "z", true}, + {"zzz", "zz", false}, + {"zz", "zzz", false}}); +} + +TEST_F(RangeDelAggregatorV2Test, MultipleTruncatedItersInAggregatorSameLevel) { + auto fragment_lists = MakeFragmentedTombstoneLists( + {{{"a", "z", 10}}, {{"a", "z", 10}}, {{"a", "z", 10}}}); + std::vector> iter_bounds = { + {InternalKey("a", 4, kTypeValue), + InternalKey("m", kMaxSequenceNumber, kTypeRangeDeletion)}, + {InternalKey("m", 20, kTypeValue), + InternalKey("x", kMaxSequenceNumber, kTypeRangeDeletion)}, + {InternalKey("x", 5, kTypeValue), InternalKey("zz", 30, kTypeValue)}}; + + RangeDelAggregatorV2 range_del_agg(&bytewise_icmp, 19); + + auto add_iter_to_agg = [&](size_t i) { + std::unique_ptr input_iter( + new FragmentedRangeTombstoneIterator(fragment_lists[i].get(), + 19 /* snapshot */, bytewise_icmp)); + range_del_agg.AddTombstones(std::move(input_iter), &iter_bounds[i].first, + &iter_bounds[i].second); + }; + + add_iter_to_agg(0); + VerifyShouldDelete(&range_del_agg, {{InternalValue("a", 10), false}, + {InternalValue("a", 9), false}, + {InternalValue("a", 4), true}}); + + add_iter_to_agg(1); + VerifyShouldDelete(&range_del_agg, {{InternalValue("m", 10), false}, + {InternalValue("m", 9), true}}); + + add_iter_to_agg(2); + VerifyShouldDelete(&range_del_agg, {{InternalValue("x", 10), false}, + {InternalValue("x", 9), false}, + {InternalValue("x", 5), true}, + {InternalValue("z", 9), false}}); + + VerifyIsRangeOverlapped(&range_del_agg, {{"", "_", false}, + {"_", "a", true}, + {"a", "n", true}, + {"l", "x", true}, + {"w", "z", true}, + {"zzz", "zz", false}, + {"zz", "zzz", false}}); +} + +} // namespace rocksdb + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/db/range_tombstone_fragmenter.cc b/db/range_tombstone_fragmenter.cc index ce6251ccd7..df4c1b2f76 100644 --- a/db/range_tombstone_fragmenter.cc +++ b/db/range_tombstone_fragmenter.cc @@ -231,10 +231,33 @@ void FragmentedRangeTombstoneIterator::SeekToFirst() { seq_pos_ = tombstones_->seq_begin(); } +void FragmentedRangeTombstoneIterator::SeekToTopFirst() { + if (tombstones_->empty()) { + Invalidate(); + return; + } + pos_ = tombstones_->begin(); + seq_pos_ = std::lower_bound(tombstones_->seq_iter(pos_->seq_start_idx), + tombstones_->seq_iter(pos_->seq_end_idx), + snapshot_, std::greater()); + ScanForwardToVisibleTombstone(); +} + void FragmentedRangeTombstoneIterator::SeekToLast() { - pos_ = tombstones_->end(); - seq_pos_ = tombstones_->seq_end(); - Prev(); + pos_ = std::prev(tombstones_->end()); + seq_pos_ = std::prev(tombstones_->seq_end()); +} + +void FragmentedRangeTombstoneIterator::SeekToTopLast() { + if (tombstones_->empty()) { + Invalidate(); + return; + } + pos_ = std::prev(tombstones_->end()); + seq_pos_ = std::lower_bound(tombstones_->seq_iter(pos_->seq_start_idx), + tombstones_->seq_iter(pos_->seq_end_idx), + snapshot_, std::greater()); + ScanBackwardToVisibleTombstone(); } void FragmentedRangeTombstoneIterator::Seek(const Slice& target) { @@ -243,16 +266,7 @@ void FragmentedRangeTombstoneIterator::Seek(const Slice& target) { return; } SeekToCoveringTombstone(target); - while (pos_ != tombstones_->end() && - seq_pos_ == tombstones_->seq_iter(pos_->seq_end_idx)) { - ++pos_; - if (pos_ == tombstones_->end()) { - return; - } - seq_pos_ = std::lower_bound(tombstones_->seq_iter(pos_->seq_start_idx), - tombstones_->seq_iter(pos_->seq_end_idx), - snapshot_, std::greater()); - } + ScanForwardToVisibleTombstone(); } void FragmentedRangeTombstoneIterator::SeekForPrev(const Slice& target) { @@ -261,17 +275,7 @@ void FragmentedRangeTombstoneIterator::SeekForPrev(const Slice& target) { return; } SeekForPrevToCoveringTombstone(target); - while (pos_ != tombstones_->end() && - seq_pos_ == tombstones_->seq_iter(pos_->seq_end_idx)) { - if (pos_ == tombstones_->begin()) { - Invalidate(); - return; - } - --pos_; - seq_pos_ = std::lower_bound(tombstones_->seq_iter(pos_->seq_start_idx), - tombstones_->seq_iter(pos_->seq_end_idx), - snapshot_, std::greater()); - } + ScanBackwardToVisibleTombstone(); } void FragmentedRangeTombstoneIterator::SeekToCoveringTombstone( @@ -307,6 +311,33 @@ void FragmentedRangeTombstoneIterator::SeekForPrevToCoveringTombstone( snapshot_, std::greater()); } +void FragmentedRangeTombstoneIterator::ScanForwardToVisibleTombstone() { + while (pos_ != tombstones_->end() && + seq_pos_ == tombstones_->seq_iter(pos_->seq_end_idx)) { + ++pos_; + if (pos_ == tombstones_->end()) { + return; + } + seq_pos_ = std::lower_bound(tombstones_->seq_iter(pos_->seq_start_idx), + tombstones_->seq_iter(pos_->seq_end_idx), + snapshot_, std::greater()); + } +} + +void FragmentedRangeTombstoneIterator::ScanBackwardToVisibleTombstone() { + while (pos_ != tombstones_->end() && + seq_pos_ == tombstones_->seq_iter(pos_->seq_end_idx)) { + if (pos_ == tombstones_->begin()) { + Invalidate(); + return; + } + --pos_; + seq_pos_ = std::lower_bound(tombstones_->seq_iter(pos_->seq_start_idx), + tombstones_->seq_iter(pos_->seq_end_idx), + snapshot_, std::greater()); + } +} + void FragmentedRangeTombstoneIterator::Next() { ++seq_pos_; if (seq_pos_ == tombstones_->seq_iter(pos_->seq_end_idx)) { @@ -314,6 +345,17 @@ void FragmentedRangeTombstoneIterator::Next() { } } +void FragmentedRangeTombstoneIterator::TopNext() { + ++pos_; + if (pos_ == tombstones_->end()) { + return; + } + seq_pos_ = std::lower_bound(tombstones_->seq_iter(pos_->seq_start_idx), + tombstones_->seq_iter(pos_->seq_end_idx), + snapshot_, std::greater()); + ScanForwardToVisibleTombstone(); +} + void FragmentedRangeTombstoneIterator::Prev() { if (seq_pos_ == tombstones_->seq_begin()) { pos_ = tombstones_->end(); @@ -327,6 +369,18 @@ void FragmentedRangeTombstoneIterator::Prev() { } } +void FragmentedRangeTombstoneIterator::TopPrev() { + if (pos_ == tombstones_->begin()) { + Invalidate(); + return; + } + --pos_; + seq_pos_ = std::lower_bound(tombstones_->seq_iter(pos_->seq_start_idx), + tombstones_->seq_iter(pos_->seq_end_idx), + snapshot_, std::greater()); + ScanBackwardToVisibleTombstone(); +} + bool FragmentedRangeTombstoneIterator::Valid() const { return tombstones_ != nullptr && pos_ != tombstones_->end(); } diff --git a/db/range_tombstone_fragmenter.h b/db/range_tombstone_fragmenter.h index 2d6ca691f6..a144eb079c 100644 --- a/db/range_tombstone_fragmenter.h +++ b/db/range_tombstone_fragmenter.h @@ -95,9 +95,13 @@ class FragmentedRangeTombstoneIterator : public InternalIterator { FragmentedRangeTombstoneIterator( const std::shared_ptr& tombstones, SequenceNumber snapshot, const InternalKeyComparator& icmp); + void SeekToFirst() override; void SeekToLast() override; + void SeekToTopFirst(); + void SeekToTopLast(); + // NOTE: Seek and SeekForPrev do not behave in the way InternalIterator // seeking should behave. This is OK because they are not currently used, but // eventually FragmentedRangeTombstoneIterator should no longer implement @@ -114,6 +118,10 @@ class FragmentedRangeTombstoneIterator : public InternalIterator { void Next() override; void Prev() override; + + void TopNext(); + void TopPrev(); + bool Valid() const override; Slice key() const override { MaybePinKey(); @@ -124,9 +132,30 @@ class FragmentedRangeTombstoneIterator : public InternalIterator { bool IsValuePinned() const override { return true; } Status status() const override { return Status::OK(); } + bool empty() const { return tombstones_->empty(); } + void Invalidate() { + pos_ = tombstones_->end(); + seq_pos_ = tombstones_->seq_end(); + } + + // TODO: implement properly + RangeTombstone tombstone() const { + return RangeTombstone(start_key(), end_key(), seq()); + } Slice start_key() const { return pos_->start_key; } Slice end_key() const { return pos_->end_key; } SequenceNumber seq() const { return *seq_pos_; } + ParsedInternalKey parsed_start_key() const { + return ParsedInternalKey(pos_->start_key, kMaxSequenceNumber, + kTypeRangeDeletion); + } + ParsedInternalKey parsed_end_key() const { + return ParsedInternalKey(pos_->end_key, kMaxSequenceNumber, + kTypeRangeDeletion); + } + ParsedInternalKey internal_key() const { + return ParsedInternalKey(pos_->start_key, *seq_pos_, kTypeRangeDeletion); + } SequenceNumber MaxCoveringTombstoneSeqnum(const Slice& user_key); @@ -182,10 +211,8 @@ class FragmentedRangeTombstoneIterator : public InternalIterator { void SeekToCoveringTombstone(const Slice& key); void SeekForPrevToCoveringTombstone(const Slice& key); - void Invalidate() { - pos_ = tombstones_->end(); - seq_pos_ = tombstones_->seq_end(); - } + void ScanForwardToVisibleTombstone(); + void ScanBackwardToVisibleTombstone(); bool ValidPos() const { return Valid() && seq_pos_ != tombstones_->seq_iter(pos_->seq_end_idx); } diff --git a/db/range_tombstone_fragmenter_test.cc b/db/range_tombstone_fragmenter_test.cc index f9ea356542..9d71c4f714 100644 --- a/db/range_tombstone_fragmenter_test.cc +++ b/db/range_tombstone_fragmenter_test.cc @@ -42,6 +42,19 @@ void VerifyFragmentedRangeDels( EXPECT_FALSE(iter->Valid()); } +void VerifyVisibleTombstones( + FragmentedRangeTombstoneIterator* iter, + const std::vector& expected_tombstones) { + iter->SeekToTopFirst(); + for (size_t i = 0; i < expected_tombstones.size() && iter->Valid(); + i++, iter->TopNext()) { + EXPECT_EQ(iter->start_key(), expected_tombstones[i].start_key_); + EXPECT_EQ(iter->value(), expected_tombstones[i].end_key_); + EXPECT_EQ(iter->seq(), expected_tombstones[i].seq_); + } + EXPECT_FALSE(iter->Valid()); +} + struct SeekTestCase { Slice seek_target; RangeTombstone expected_position; @@ -262,14 +275,37 @@ TEST_F(RangeTombstoneFragmenterTest, OverlapAndRepeatedStartKeyMultiUse) { {"j", "l", 2}, {"l", "n", 4}}); } + + VerifyVisibleTombstones(&iter1, {{"a", "c", 10}, + {"c", "e", 10}, + {"e", "g", 8}, + {"g", "i", 6}, + {"j", "l", 4}, + {"l", "n", 4}}); VerifyMaxCoveringTombstoneSeqnum( &iter1, {{"a", 10}, {"c", 10}, {"e", 8}, {"i", 0}, {"j", 4}, {"m", 4}}); + + VerifyVisibleTombstones(&iter2, {{"c", "e", 8}, + {"e", "g", 8}, + {"g", "i", 6}, + {"j", "l", 4}, + {"l", "n", 4}}); VerifyMaxCoveringTombstoneSeqnum( &iter2, {{"a", 0}, {"c", 8}, {"e", 8}, {"i", 0}, {"j", 4}, {"m", 4}}); + + VerifyVisibleTombstones(&iter3, {{"c", "e", 6}, + {"e", "g", 6}, + {"g", "i", 6}, + {"j", "l", 4}, + {"l", "n", 4}}); VerifyMaxCoveringTombstoneSeqnum( &iter3, {{"a", 0}, {"c", 6}, {"e", 6}, {"i", 0}, {"j", 4}, {"m", 4}}); + + VerifyVisibleTombstones(&iter4, {{"j", "l", 4}, {"l", "n", 4}}); VerifyMaxCoveringTombstoneSeqnum( &iter4, {{"a", 0}, {"c", 0}, {"e", 0}, {"i", 0}, {"j", 4}, {"m", 4}}); + + VerifyVisibleTombstones(&iter5, {{"j", "l", 2}}); VerifyMaxCoveringTombstoneSeqnum( &iter5, {{"a", 0}, {"c", 0}, {"e", 0}, {"i", 0}, {"j", 2}, {"m", 0}}); } diff --git a/db/table_cache.cc b/db/table_cache.cc index 7c558b7850..b491a162bd 100644 --- a/db/table_cache.cc +++ b/db/table_cache.cc @@ -183,7 +183,7 @@ Status TableCache::FindTable(const EnvOptions& env_options, InternalIterator* TableCache::NewIterator( const ReadOptions& options, const EnvOptions& env_options, const InternalKeyComparator& icomparator, const FileMetaData& file_meta, - RangeDelAggregator* range_del_agg, const SliceTransform* prefix_extractor, + RangeDelAggregatorV2* range_del_agg, const SliceTransform* prefix_extractor, TableReader** table_reader_ptr, HistogramImpl* file_read_hist, bool for_compaction, Arena* arena, bool skip_filters, int level, const InternalKey* smallest_compaction_key, @@ -264,8 +264,9 @@ InternalIterator* TableCache::NewIterator( } if (s.ok() && range_del_agg != nullptr && !options.ignore_range_deletions) { if (range_del_agg->AddFile(fd.GetNumber())) { - std::unique_ptr range_del_iter( - table_reader->NewRangeTombstoneIterator(options)); + std::unique_ptr range_del_iter( + static_cast( + table_reader->NewRangeTombstoneIterator(options))); if (range_del_iter != nullptr) { s = range_del_iter->status(); } @@ -278,8 +279,8 @@ InternalIterator* TableCache::NewIterator( if (largest_compaction_key != nullptr) { largest = largest_compaction_key; } - s = range_del_agg->AddTombstones(std::move(range_del_iter), smallest, - largest); + range_del_agg->AddTombstones(std::move(range_del_iter), smallest, + largest); } } } diff --git a/db/table_cache.h b/db/table_cache.h index e3936ab44a..04485c4dcc 100644 --- a/db/table_cache.h +++ b/db/table_cache.h @@ -15,7 +15,7 @@ #include #include "db/dbformat.h" -#include "db/range_del_aggregator.h" +#include "db/range_del_aggregator_v2.h" #include "options/cf_options.h" #include "port/port.h" #include "rocksdb/cache.h" @@ -52,7 +52,7 @@ class TableCache { InternalIterator* NewIterator( const ReadOptions& options, const EnvOptions& toptions, const InternalKeyComparator& internal_comparator, - const FileMetaData& file_meta, RangeDelAggregator* range_del_agg, + const FileMetaData& file_meta, RangeDelAggregatorV2* range_del_agg, const SliceTransform* prefix_extractor = nullptr, TableReader** table_reader_ptr = nullptr, HistogramImpl* file_read_hist = nullptr, bool for_compaction = false, diff --git a/db/version_set.cc b/db/version_set.cc index 1856a6be6a..fc758e7ea2 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -454,15 +454,14 @@ namespace { class LevelIterator final : public InternalIterator { public: - LevelIterator(TableCache* table_cache, const ReadOptions& read_options, - const EnvOptions& env_options, - const InternalKeyComparator& icomparator, - const LevelFilesBrief* flevel, - const SliceTransform* prefix_extractor, bool should_sample, - HistogramImpl* file_read_hist, bool for_compaction, - bool skip_filters, int level, RangeDelAggregator* range_del_agg, - const std::vector* - compaction_boundaries = nullptr) + LevelIterator( + TableCache* table_cache, const ReadOptions& read_options, + const EnvOptions& env_options, const InternalKeyComparator& icomparator, + const LevelFilesBrief* flevel, const SliceTransform* prefix_extractor, + bool should_sample, HistogramImpl* file_read_hist, bool for_compaction, + bool skip_filters, int level, RangeDelAggregatorV2* range_del_agg, + const std::vector* compaction_boundaries = + nullptr) : table_cache_(table_cache), read_options_(read_options), env_options_(env_options), @@ -572,7 +571,7 @@ class LevelIterator final : public InternalIterator { bool skip_filters_; size_t file_index_; int level_; - RangeDelAggregator* range_del_agg_; + RangeDelAggregatorV2* range_del_agg_; IteratorWrapper file_iter_; // May be nullptr PinnedIteratorsManager* pinned_iters_mgr_; @@ -986,7 +985,7 @@ double VersionStorageInfo::GetEstimatedCompressionRatioAtLevel( void Version::AddIterators(const ReadOptions& read_options, const EnvOptions& soptions, MergeIteratorBuilder* merge_iter_builder, - RangeDelAggregator* range_del_agg) { + RangeDelAggregatorV2* range_del_agg) { assert(storage_info_.finalized_); for (int level = 0; level < storage_info_.num_non_empty_levels(); level++) { @@ -999,7 +998,7 @@ void Version::AddIteratorsForLevel(const ReadOptions& read_options, const EnvOptions& soptions, MergeIteratorBuilder* merge_iter_builder, int level, - RangeDelAggregator* range_del_agg) { + RangeDelAggregatorV2* range_del_agg) { assert(storage_info_.finalized_); if (level >= storage_info_.num_non_empty_levels()) { // This is an empty level @@ -1058,7 +1057,8 @@ Status Version::OverlapWithLevelIterator(const ReadOptions& read_options, Arena arena; Status status; - RangeDelAggregator range_del_agg(icmp, {}, false); + RangeDelAggregatorV2 range_del_agg(&icmp, + kMaxSequenceNumber /* upper_bound */); *overlap = false; @@ -4254,7 +4254,7 @@ void VersionSet::AddLiveFiles(std::vector* live_list) { } InternalIterator* VersionSet::MakeInputIterator( - const Compaction* c, RangeDelAggregator* range_del_agg, + const Compaction* c, RangeDelAggregatorV2* range_del_agg, const EnvOptions& env_options_compactions) { auto cfd = c->column_family_data(); ReadOptions read_options; diff --git a/db/version_set.h b/db/version_set.h index b50f653ba4..ec9084beb6 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -34,7 +34,7 @@ #include "db/dbformat.h" #include "db/file_indexer.h" #include "db/log_reader.h" -#include "db/range_del_aggregator.h" +#include "db/range_del_aggregator_v2.h" #include "db/read_callback.h" #include "db/table_cache.h" #include "db/version_builder.h" @@ -538,11 +538,11 @@ class Version { // REQUIRES: This version has been saved (see VersionSet::SaveTo) void AddIterators(const ReadOptions&, const EnvOptions& soptions, MergeIteratorBuilder* merger_iter_builder, - RangeDelAggregator* range_del_agg); + RangeDelAggregatorV2* range_del_agg); void AddIteratorsForLevel(const ReadOptions&, const EnvOptions& soptions, MergeIteratorBuilder* merger_iter_builder, - int level, RangeDelAggregator* range_del_agg); + int level, RangeDelAggregatorV2* range_del_agg); Status OverlapWithLevelIterator(const ReadOptions&, const EnvOptions&, const Slice& smallest_user_key, @@ -935,7 +935,7 @@ class VersionSet { // Create an iterator that reads over the compaction inputs for "*c". // The caller should delete the iterator when no longer needed. InternalIterator* MakeInputIterator( - const Compaction* c, RangeDelAggregator* range_del_agg, + const Compaction* c, RangeDelAggregatorV2* range_del_agg, const EnvOptions& env_options_compactions); // Add all files listed in any live version to *live. diff --git a/src.mk b/src.mk index cfe9dcd2f2..4f895b981d 100644 --- a/src.mk +++ b/src.mk @@ -43,6 +43,7 @@ LIB_SOURCES = \ db/merge_helper.cc \ db/merge_operator.cc \ db/range_del_aggregator.cc \ + db/range_del_aggregator_v2.cc \ db/range_tombstone_fragmenter.cc \ db/repair.cc \ db/snapshot_impl.cc \ @@ -331,6 +332,7 @@ MAIN_SOURCES = \ db/repair_test.cc \ db/range_del_aggregator_test.cc \ db/range_del_aggregator_bench.cc \ + db/range_del_aggregator_v2_test.cc \ db/range_tombstone_fragmenter_test.cc \ db/table_properties_collector_test.cc \ db/util_merge_operators_test.cc \ diff --git a/utilities/debug.cc b/utilities/debug.cc index e0c5f5566e..5ea7205a9f 100644 --- a/utilities/debug.cc +++ b/utilities/debug.cc @@ -19,7 +19,8 @@ Status GetAllKeyVersions(DB* db, Slice begin_key, Slice end_key, DBImpl* idb = static_cast(db->GetRootDB()); auto icmp = InternalKeyComparator(idb->GetOptions().comparator); - RangeDelAggregator range_del_agg(icmp, {} /* snapshots */); + RangeDelAggregatorV2 range_del_agg(&icmp, + kMaxSequenceNumber /* upper_bound */); Arena arena; ScopedArenaIterator iter(idb->NewInternalIterator(&arena, &range_del_agg));