mirror of https://github.com/facebook/rocksdb.git
Improve FragmentTombstones() speed by lazily initializing `seq_set_` (#10848)
Summary: FragmentedRangeTombstoneList has a member variable `seq_set_` that contains the sequence numbers of all range tombstones in a set. The set is constructed in `FragmentTombstones()` and is used only in `FragmentedRangeTombstoneList::ContainsRange()` which only happens during compaction. This PR moves the initialization of `seq_set_` to `FragmentedRangeTombstoneList::ContainsRange()`. This should speed up `FragmentTombstones()` when the range tombstone list is used for read/scan requests. Microbench shows the speed improvement to be ~45%. Pull Request resolved: https://github.com/facebook/rocksdb/pull/10848 Test Plan: - Existing tests and stress test: `python3 tools/db_crashtest.py whitebox --simple --verify_iterator_with_expected_state_one_in=5`. - Microbench: update `range_del_aggregator_bench` to benchmark speed of `FragmentTombstones()`: ``` ./range_del_aggregator_bench --num_range_tombstones=1000 --tombstone_start_upper_bound=50000000 --num_runs=10000 --tombstone_width_mean=200 --should_deletes_per_run=100 --use_compaction_range_del_aggregator=true Before this PR: ========================= Fragment Tombstones: 270.286 us AddTombstones: 1.28933 us ShouldDelete (first): 0.525528 us ShouldDelete (rest): 0.0797519 us After this PR: time to fragment tombstones is pushed to AddTombstones() which only happen during compaction. ========================= Fragment Tombstones: 149.879 us AddTombstones: 102.131 us ShouldDelete (first): 0.565871 us ShouldDelete (rest): 0.0729444 us ``` - db_bench: this should improve speed for fragmenting range tombstones for mutable memtable: ``` ./db_bench --benchmarks=readwhilewriting --writes_per_range_tombstone=100 --max_write_buffer_number=100 --min_write_buffer_number_to_merge=100 --writes=500000 --reads=250000 --disable_auto_compactions --max_num_range_tombstones=100000 --finish_after_writes --write_buffer_size=1073741824 --threads=25 Before this PR: readwhilewriting : 18.301 micros/op 1310445 ops/sec 4.769 seconds 6250000 operations; 28.1 MB/s (41001 of 250000 found) After this PR: readwhilewriting : 16.943 micros/op 1439376 ops/sec 4.342 seconds 6250000 operations; 23.8 MB/s (28977 of 250000 found) ``` Reviewed By: ajkr Differential Revision: D40646227 Pulled By: cbi42 fbshipit-source-id: ea471667edb258f67d01cfd828588e80a89e4083
This commit is contained in:
parent
fc74abb436
commit
7a95938899
|
@ -587,10 +587,9 @@ FragmentedRangeTombstoneIterator* MemTable::NewRangeTombstoneIteratorInternal(
|
|||
auto* unfragmented_iter =
|
||||
new MemTableIterator(*this, read_options, nullptr /* arena */,
|
||||
true /* use_range_del_table */);
|
||||
cache->tombstones = std::make_unique<FragmentedRangeTombstoneList>(
|
||||
FragmentedRangeTombstoneList(
|
||||
std::unique_ptr<InternalIterator>(unfragmented_iter),
|
||||
comparator_.comparator));
|
||||
cache->tombstones.reset(new FragmentedRangeTombstoneList(
|
||||
std::unique_ptr<InternalIterator>(unfragmented_iter),
|
||||
comparator_.comparator));
|
||||
cache->initialized.store(true, std::memory_order_release);
|
||||
}
|
||||
cache->reader_mutex.unlock();
|
||||
|
|
|
@ -54,12 +54,17 @@ DEFINE_int32(should_deletes_per_run, 1, "number of ShouldDelete calls per run");
|
|||
DEFINE_int32(add_tombstones_per_run, 1,
|
||||
"number of AddTombstones calls per run");
|
||||
|
||||
DEFINE_bool(use_compaction_range_del_aggregator, false,
|
||||
"Whether to use CompactionRangeDelAggregator. Default is to use "
|
||||
"ReadRangeDelAggregator.");
|
||||
|
||||
namespace {
|
||||
|
||||
struct Stats {
|
||||
uint64_t time_add_tombstones = 0;
|
||||
uint64_t time_first_should_delete = 0;
|
||||
uint64_t time_rest_should_delete = 0;
|
||||
uint64_t time_fragment_tombstones = 0;
|
||||
};
|
||||
|
||||
std::ostream& operator<<(std::ostream& os, const Stats& s) {
|
||||
|
@ -67,6 +72,10 @@ std::ostream& operator<<(std::ostream& os, const Stats& s) {
|
|||
fmt_holder.copyfmt(os);
|
||||
|
||||
os << std::left;
|
||||
os << std::setw(25) << "Fragment Tombstones: "
|
||||
<< s.time_fragment_tombstones /
|
||||
(FLAGS_add_tombstones_per_run * FLAGS_num_runs * 1.0e3)
|
||||
<< " us\n";
|
||||
os << std::setw(25) << "AddTombstones: "
|
||||
<< s.time_add_tombstones /
|
||||
(FLAGS_add_tombstones_per_run * FLAGS_num_runs * 1.0e3)
|
||||
|
@ -186,10 +195,17 @@ int main(int argc, char** argv) {
|
|||
FLAGS_num_range_tombstones);
|
||||
}
|
||||
auto mode = ROCKSDB_NAMESPACE::RangeDelPositioningMode::kForwardTraversal;
|
||||
|
||||
std::vector<ROCKSDB_NAMESPACE::SequenceNumber> snapshots{0};
|
||||
for (int i = 0; i < FLAGS_num_runs; i++) {
|
||||
ROCKSDB_NAMESPACE::ReadRangeDelAggregator range_del_agg(
|
||||
&icmp, ROCKSDB_NAMESPACE::kMaxSequenceNumber /* upper_bound */);
|
||||
std::unique_ptr<ROCKSDB_NAMESPACE::RangeDelAggregator> range_del_agg =
|
||||
nullptr;
|
||||
if (FLAGS_use_compaction_range_del_aggregator) {
|
||||
range_del_agg.reset(new ROCKSDB_NAMESPACE::CompactionRangeDelAggregator(
|
||||
&icmp, snapshots));
|
||||
} else {
|
||||
range_del_agg.reset(new ROCKSDB_NAMESPACE::ReadRangeDelAggregator(
|
||||
&icmp, ROCKSDB_NAMESPACE::kMaxSequenceNumber /* upper_bound */));
|
||||
}
|
||||
|
||||
std::vector<
|
||||
std::unique_ptr<ROCKSDB_NAMESPACE::FragmentedRangeTombstoneList> >
|
||||
|
@ -207,12 +223,16 @@ int main(int argc, char** argv) {
|
|||
ROCKSDB_NAMESPACE::PersistentRangeTombstone(
|
||||
ROCKSDB_NAMESPACE::Key(start), ROCKSDB_NAMESPACE::Key(end), j);
|
||||
}
|
||||
|
||||
auto iter =
|
||||
ROCKSDB_NAMESPACE::MakeRangeDelIterator(persistent_range_tombstones);
|
||||
ROCKSDB_NAMESPACE::StopWatchNano stop_watch_fragment_tombstones(
|
||||
clock, true /* auto_start */);
|
||||
fragmented_range_tombstone_lists.emplace_back(
|
||||
new ROCKSDB_NAMESPACE::FragmentedRangeTombstoneList(
|
||||
ROCKSDB_NAMESPACE::MakeRangeDelIterator(
|
||||
persistent_range_tombstones),
|
||||
icmp));
|
||||
std::move(iter), icmp, FLAGS_use_compaction_range_del_aggregator,
|
||||
snapshots));
|
||||
stats.time_fragment_tombstones +=
|
||||
stop_watch_fragment_tombstones.ElapsedNanos();
|
||||
std::unique_ptr<ROCKSDB_NAMESPACE::FragmentedRangeTombstoneIterator>
|
||||
fragmented_range_del_iter(
|
||||
new ROCKSDB_NAMESPACE::FragmentedRangeTombstoneIterator(
|
||||
|
@ -221,7 +241,7 @@ int main(int argc, char** argv) {
|
|||
|
||||
ROCKSDB_NAMESPACE::StopWatchNano stop_watch_add_tombstones(
|
||||
clock, true /* auto_start */);
|
||||
range_del_agg.AddTombstones(std::move(fragmented_range_del_iter));
|
||||
range_del_agg->AddTombstones(std::move(fragmented_range_del_iter));
|
||||
stats.time_add_tombstones += stop_watch_add_tombstones.ElapsedNanos();
|
||||
}
|
||||
|
||||
|
@ -238,7 +258,7 @@ int main(int argc, char** argv) {
|
|||
|
||||
ROCKSDB_NAMESPACE::StopWatchNano stop_watch_should_delete(
|
||||
clock, true /* auto_start */);
|
||||
range_del_agg.ShouldDelete(parsed_key, mode);
|
||||
range_del_agg->ShouldDelete(parsed_key, mode);
|
||||
uint64_t call_time = stop_watch_should_delete.ElapsedNanos();
|
||||
|
||||
if (j == 0) {
|
||||
|
|
|
@ -156,7 +156,6 @@ void FragmentedRangeTombstoneList::FragmentTombstones(
|
|||
if (seq <= next_snapshot) {
|
||||
// This seqnum is visible by a lower snapshot.
|
||||
tombstone_seqs_.push_back(seq);
|
||||
seq_set_.insert(seq);
|
||||
auto upper_bound_it =
|
||||
std::lower_bound(snapshots.begin(), snapshots.end(), seq);
|
||||
if (upper_bound_it == snapshots.begin()) {
|
||||
|
@ -173,7 +172,6 @@ void FragmentedRangeTombstoneList::FragmentTombstones(
|
|||
// The fragmentation is being done for reads, so preserve all seqnums.
|
||||
tombstone_seqs_.insert(tombstone_seqs_.end(), seqnums_to_flush.begin(),
|
||||
seqnums_to_flush.end());
|
||||
seq_set_.insert(seqnums_to_flush.begin(), seqnums_to_flush.end());
|
||||
if (ts_sz) {
|
||||
tombstone_timestamps_.insert(tombstone_timestamps_.end(),
|
||||
timestamps_to_flush.begin(),
|
||||
|
@ -258,15 +256,20 @@ void FragmentedRangeTombstoneList::FragmentTombstones(
|
|||
}
|
||||
|
||||
bool FragmentedRangeTombstoneList::ContainsRange(SequenceNumber lower,
|
||||
SequenceNumber upper) const {
|
||||
SequenceNumber upper) {
|
||||
std::call_once(seq_set_init_once_flag_, [this]() {
|
||||
for (auto s : tombstone_seqs_) {
|
||||
seq_set_.insert(s);
|
||||
}
|
||||
});
|
||||
auto seq_it = seq_set_.lower_bound(lower);
|
||||
return seq_it != seq_set_.end() && *seq_it <= upper;
|
||||
}
|
||||
|
||||
FragmentedRangeTombstoneIterator::FragmentedRangeTombstoneIterator(
|
||||
const FragmentedRangeTombstoneList* tombstones,
|
||||
const InternalKeyComparator& icmp, SequenceNumber _upper_bound,
|
||||
const Slice* ts_upper_bound, SequenceNumber _lower_bound)
|
||||
FragmentedRangeTombstoneList* tombstones, const InternalKeyComparator& icmp,
|
||||
SequenceNumber _upper_bound, const Slice* ts_upper_bound,
|
||||
SequenceNumber _lower_bound)
|
||||
: tombstone_start_cmp_(icmp.user_comparator()),
|
||||
tombstone_end_cmp_(icmp.user_comparator()),
|
||||
icmp_(&icmp),
|
||||
|
@ -280,7 +283,7 @@ FragmentedRangeTombstoneIterator::FragmentedRangeTombstoneIterator(
|
|||
}
|
||||
|
||||
FragmentedRangeTombstoneIterator::FragmentedRangeTombstoneIterator(
|
||||
const std::shared_ptr<const FragmentedRangeTombstoneList>& tombstones,
|
||||
const std::shared_ptr<FragmentedRangeTombstoneList>& tombstones,
|
||||
const InternalKeyComparator& icmp, SequenceNumber _upper_bound,
|
||||
const Slice* ts_upper_bound, SequenceNumber _lower_bound)
|
||||
: tombstone_start_cmp_(icmp.user_comparator()),
|
||||
|
|
|
@ -84,7 +84,9 @@ struct FragmentedRangeTombstoneList {
|
|||
|
||||
// Returns true if the stored tombstones contain with one with a sequence
|
||||
// number in [lower, upper].
|
||||
bool ContainsRange(SequenceNumber lower, SequenceNumber upper) const;
|
||||
// This method is not const as it internally lazy initialize a set of
|
||||
// sequence numbers (`seq_set_`).
|
||||
bool ContainsRange(SequenceNumber lower, SequenceNumber upper);
|
||||
|
||||
uint64_t num_unfragmented_tombstones() const {
|
||||
return num_unfragmented_tombstones_;
|
||||
|
@ -113,6 +115,7 @@ struct FragmentedRangeTombstoneList {
|
|||
std::vector<RangeTombstoneStack> tombstones_;
|
||||
std::vector<SequenceNumber> tombstone_seqs_;
|
||||
std::vector<Slice> tombstone_timestamps_;
|
||||
std::once_flag seq_set_init_once_flag_;
|
||||
std::set<SequenceNumber> seq_set_;
|
||||
std::list<std::string> pinned_slices_;
|
||||
PinnedIteratorsManager pinned_iters_mgr_;
|
||||
|
@ -131,12 +134,13 @@ struct FragmentedRangeTombstoneList {
|
|||
// tombstone collapsing is always O(n log n).
|
||||
class FragmentedRangeTombstoneIterator : public InternalIterator {
|
||||
public:
|
||||
FragmentedRangeTombstoneIterator(FragmentedRangeTombstoneList* tombstones,
|
||||
const InternalKeyComparator& icmp,
|
||||
SequenceNumber upper_bound,
|
||||
const Slice* ts_upper_bound = nullptr,
|
||||
SequenceNumber lower_bound = 0);
|
||||
FragmentedRangeTombstoneIterator(
|
||||
const FragmentedRangeTombstoneList* tombstones,
|
||||
const InternalKeyComparator& icmp, SequenceNumber upper_bound,
|
||||
const Slice* ts_upper_bound = nullptr, SequenceNumber lower_bound = 0);
|
||||
FragmentedRangeTombstoneIterator(
|
||||
const std::shared_ptr<const FragmentedRangeTombstoneList>& tombstones,
|
||||
const std::shared_ptr<FragmentedRangeTombstoneList>& tombstones,
|
||||
const InternalKeyComparator& icmp, SequenceNumber upper_bound,
|
||||
const Slice* ts_upper_bound = nullptr, SequenceNumber lower_bound = 0);
|
||||
FragmentedRangeTombstoneIterator(
|
||||
|
@ -311,9 +315,9 @@ class FragmentedRangeTombstoneIterator : public InternalIterator {
|
|||
const RangeTombstoneStackEndComparator tombstone_end_cmp_;
|
||||
const InternalKeyComparator* icmp_;
|
||||
const Comparator* ucmp_;
|
||||
std::shared_ptr<const FragmentedRangeTombstoneList> tombstones_ref_;
|
||||
std::shared_ptr<FragmentedRangeTombstoneList> tombstones_ref_;
|
||||
std::shared_ptr<FragmentedRangeTombstoneListCache> tombstones_cache_ref_;
|
||||
const FragmentedRangeTombstoneList* tombstones_;
|
||||
FragmentedRangeTombstoneList* tombstones_;
|
||||
SequenceNumber upper_bound_;
|
||||
SequenceNumber lower_bound_;
|
||||
// Only consider timestamps <= ts_upper_bound_.
|
||||
|
|
|
@ -597,7 +597,7 @@ struct BlockBasedTable::Rep {
|
|||
bool prefix_filtering;
|
||||
std::shared_ptr<const SliceTransform> table_prefix_extractor;
|
||||
|
||||
std::shared_ptr<const FragmentedRangeTombstoneList> fragmented_range_dels;
|
||||
std::shared_ptr<FragmentedRangeTombstoneList> fragmented_range_dels;
|
||||
|
||||
// If global_seqno is used, all Keys in this file will have the same
|
||||
// seqno with value `global_seqno`.
|
||||
|
|
Loading…
Reference in New Issue