Fragment memtable range tombstone in the write path (#10380)

Summary:
- Right now each read fragments the memtable range tombstones https://github.com/facebook/rocksdb/issues/4808. This PR explores the idea of fragmenting memtable range tombstones in the write path and reads can just read this cached fragmented tombstone without any fragmenting cost. This PR only does the caching for immutable memtable, and does so right before a memtable is added to an immutable memtable list. The fragmentation is done without holding mutex to minimize its performance impact.
- db_bench is updated to print out the number of range deletions executed if there is any.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/10380

Test Plan:
- CI, added asserts in various places to check whether a fragmented range tombstone list should have been constructed.
- Benchmark: as this PR only optimizes immutable memtable path, the number of writes in the benchmark is chosen such  an immutable memtable is created and range tombstones are in that memtable.

```
single thread:
./db_bench --benchmarks=fillrandom,readrandom --writes_per_range_tombstone=1 --max_write_buffer_number=100 --min_write_buffer_number_to_merge=100 --writes=500000 --reads=100000 --max_num_range_tombstones=100

multi_thread
./db_bench --benchmarks=fillrandom,readrandom --writes_per_range_tombstone=1 --max_write_buffer_number=100 --min_write_buffer_number_to_merge=100 --writes=15000 --reads=20000 --threads=32 --max_num_range_tombstones=100
```
Commit 99cdf16464 is included in benchmark result. It was an earlier attempt where tombstones are fragmented for each write operation. Reader threads share it using a shared_ptr which would slow down multi-thread read performance as seen in benchmark results.
Results are averaged over 5 runs.

Single thread result:
| Max # tombstones  | main fillrandom micros/op | 99cdf16464 | Post PR | main readrandom micros/op |  99cdf16464 | Post PR |
| ------------- | ------------- |------------- |------------- |------------- |------------- |------------- |
| 0    |6.68     |6.57     |6.72     |4.72     |4.79     |4.54     |
| 1    |6.67     |6.58     |6.62     |5.41     |4.74     |4.72     |
| 10   |6.59     |6.5      |6.56     |7.83     |4.69     |4.59     |
| 100  |6.62     |6.75     |6.58     |29.57    |5.04     |5.09     |
| 1000 |6.54     |6.82     |6.61     |320.33   |5.22     |5.21     |

32-thread result: note that "Max # tombstones" is per thread.
| Max # tombstones  | main fillrandom micros/op | 99cdf16464 | Post PR | main readrandom micros/op |  99cdf16464 | Post PR |
| ------------- | ------------- |------------- |------------- |------------- |------------- |------------- |
| 0    |234.52   |260.25   |239.42   |5.06     |5.38     |5.09     |
| 1    |236.46   |262.0    |231.1    |19.57    |22.14    |5.45     |
| 10   |236.95   |263.84   |251.49   |151.73   |21.61    |5.73     |
| 100  |268.16   |296.8    |280.13   |2308.52  |22.27    |6.57     |

Reviewed By: ajkr

Differential Revision: D37916564

Pulled By: cbi42

fbshipit-source-id: 05d6d2e16df26c374c57ddcca13a5bfe9d5b731e
This commit is contained in:
Changyu Bi 2022-08-05 12:02:33 -07:00 committed by Facebook GitHub Bot
parent f28d0c2020
commit 9d77bf8f7b
20 changed files with 208 additions and 65 deletions

View File

@ -28,6 +28,9 @@
* PosixLogger is removed and by default EnvLogger will be used for info logging. The behavior of the two loggers should be very similar when using the default Posix Env.
* Remove [min|max]_timestamp from VersionEdit for now since they are not tracked in MANIFEST anyway but consume two empty std::string (up to 64 bytes) for each file. Should they be added back in the future, we should store them more compactly.
### Performance Improvements
* Instead of constructing `FragmentedRangeTombstoneList` during every read operation, it is now constructed once and stored in immutable memtables. This improves speed of querying range tombstones from immutable memtables.
## 7.5.0 (07/15/2022)
### New Features
* Mempurge option flag `experimental_mempurge_threshold` is now a ColumnFamilyOptions and can now be dynamically configured using `SetOptions()`.

View File

@ -89,8 +89,8 @@ Status ArenaWrappedDBIter::Refresh() {
ReadRangeDelAggregator* range_del_agg =
db_iter_->GetRangeDelAggregator();
std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter;
range_del_iter.reset(
sv->mem->NewRangeTombstoneIterator(read_options_, latest_seq));
range_del_iter.reset(sv->mem->NewRangeTombstoneIterator(
read_options_, latest_seq, false /* immutable_memtable */));
range_del_agg->AddTombstones(std::move(range_del_iter));
cfd_->ReturnThreadLocalSuperVersion(sv);
}

View File

@ -1149,8 +1149,8 @@ Status ColumnFamilyData::RangesOverlapWithMemtables(
auto read_seq = super_version->current->version_set()->LastSequence();
ReadRangeDelAggregator range_del_agg(&internal_comparator_, read_seq);
auto* active_range_del_iter =
super_version->mem->NewRangeTombstoneIterator(read_opts, read_seq);
auto* active_range_del_iter = super_version->mem->NewRangeTombstoneIterator(
read_opts, read_seq, false /* immutable_memtable */);
range_del_agg.AddTombstones(
std::unique_ptr<FragmentedRangeTombstoneIterator>(active_range_del_iter));
Status status;

View File

@ -1761,8 +1761,8 @@ InternalIterator* DBImpl::NewInternalIterator(const ReadOptions& read_options,
std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter;
Status s;
if (!read_options.ignore_range_deletions) {
range_del_iter.reset(
super_version->mem->NewRangeTombstoneIterator(read_options, sequence));
range_del_iter.reset(super_version->mem->NewRangeTombstoneIterator(
read_options, sequence, false /* immutable_memtable */));
range_del_agg->AddTombstones(std::move(range_del_iter));
}
// Collect all needed child iterators for immutable memtables
@ -1982,7 +1982,8 @@ Status DBImpl::GetImpl(const ReadOptions& read_options, const Slice& key,
if (get_impl_options.get_value) {
if (sv->mem->Get(lkey, get_impl_options.value->GetSelf(), timestamp, &s,
&merge_context, &max_covering_tombstone_seq,
read_options, get_impl_options.callback,
read_options, false /* immutable_memtable */,
get_impl_options.callback,
get_impl_options.is_blob_index)) {
done = true;
get_impl_options.value->PinSelf();
@ -2002,7 +2003,8 @@ Status DBImpl::GetImpl(const ReadOptions& read_options, const Slice& key,
// merged and raw values should be returned to the user.
if (sv->mem->Get(lkey, /*value*/ nullptr, /*timestamp=*/nullptr, &s,
&merge_context, &max_covering_tombstone_seq,
read_options, nullptr, nullptr, false)) {
read_options, false /* immutable_memtable */, nullptr,
nullptr, false)) {
done = true;
RecordTick(stats_, MEMTABLE_HIT);
} else if ((s.ok() || s.IsMergeInProgress()) &&
@ -2252,6 +2254,7 @@ std::vector<Status> DBImpl::MultiGet(
if (!skip_memtable) {
if (super_version->mem->Get(lkey, value, timestamp, &s, &merge_context,
&max_covering_tombstone_seq, read_options,
false /* immutable_memtable */,
read_callback)) {
done = true;
RecordTick(stats_, MEMTABLE_HIT);
@ -2788,7 +2791,8 @@ Status DBImpl::MultiGetImpl(
(read_options.read_tier == kPersistedTier &&
has_unpersisted_data_.load(std::memory_order_relaxed));
if (!skip_memtable) {
super_version->mem->MultiGet(read_options, &range, callback);
super_version->mem->MultiGet(read_options, &range, callback,
false /* immutable_memtable */);
if (!range.empty()) {
super_version->imm->MultiGet(read_options, &range, callback);
}
@ -4859,7 +4863,8 @@ Status DBImpl::GetLatestSequenceForKey(
// Check if there is a record for this key in the latest memtable
sv->mem->Get(lkey, /*value=*/nullptr, timestamp, &s, &merge_context,
&max_covering_tombstone_seq, seq, read_options,
nullptr /*read_callback*/, is_blob_index);
false /* immutable_memtable */, nullptr /*read_callback*/,
is_blob_index);
if (!(s.ok() || s.IsNotFound() || s.IsMergeInProgress())) {
// unexpected error reading memtable.

View File

@ -1546,7 +1546,11 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
std::vector<std::unique_ptr<FragmentedRangeTombstoneIterator>>
range_del_iters;
auto range_del_iter =
mem->NewRangeTombstoneIterator(ro, kMaxSequenceNumber);
// This is called during recovery, where a live memtable is flushed
// directly. In this case, no fragmented tombstone list is cached in
// this memtable yet.
mem->NewRangeTombstoneIterator(ro, kMaxSequenceNumber,
false /* immutable_memtable */);
if (range_del_iter != nullptr) {
range_del_iters.emplace_back(range_del_iter);
}

View File

@ -87,7 +87,8 @@ Status DBImplReadOnly::Get(const ReadOptions& read_options,
PERF_TIMER_STOP(get_snapshot_time);
if (super_version->mem->Get(lkey, pinnable_val->GetSelf(), ts, &s,
&merge_context, &max_covering_tombstone_seq,
read_options, &read_cb)) {
read_options, false /* immutable_memtable */,
&read_cb)) {
pinnable_val->PinSelf();
RecordTick(stats_, MEMTABLE_HIT);
} else {

View File

@ -261,6 +261,7 @@ Status DBImplSecondary::RecoverLogFiles(
MemTable* new_mem =
cfd->ConstructNewMemtable(mutable_cf_options, seq_of_batch);
cfd->mem()->SetNextLogNumber(log_number);
cfd->mem()->ConstructFragmentedRangeTombstones();
cfd->imm()->Add(cfd->mem(), &job_context->memtables_to_free);
new_mem->Ref();
cfd->SetMemtable(new_mem);
@ -391,7 +392,8 @@ Status DBImplSecondary::GetImpl(const ReadOptions& read_options,
std::string* ts = ucmp->timestamp_size() > 0 ? timestamp : nullptr;
if (super_version->mem->Get(lkey, pinnable_val->GetSelf(), ts, &s,
&merge_context, &max_covering_tombstone_seq,
read_options, &read_cb)) {
read_options, false /* immutable_memtable */,
&read_cb)) {
done = true;
pinnable_val->PinSelf();
RecordTick(stats_, MEMTABLE_HIT);

View File

@ -2065,6 +2065,10 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) {
"[%s] New memtable created with log file: #%" PRIu64
". Immutable memtables: %d.\n",
cfd->GetName().c_str(), new_log_number, num_imm_unflushed);
// There should be no concurrent write as the thread is at the front of
// writer queue
cfd->mem()->ConstructFragmentedRangeTombstones();
mutex_.Lock();
if (recycle_log_number != 0) {
// Since renaming the file is done outside DB mutex, we need to ensure

View File

@ -263,7 +263,8 @@ TEST_F(DBMemTableTest, ConcurrentMergeWrite) {
SequenceNumber max_covering_tombstone_seq = 0;
LookupKey lkey("key", kMaxSequenceNumber);
bool res = mem->Get(lkey, &value, /*timestamp=*/nullptr, &status,
&merge_context, &max_covering_tombstone_seq, roptions);
&merge_context, &max_covering_tombstone_seq, roptions,
false /* immutable_memtable */);
ASSERT_OK(status);
ASSERT_TRUE(res);
uint64_t ivalue = DecodeFixed64(Slice(value).data());

View File

@ -390,7 +390,8 @@ Status FlushJob::MemPurge() {
range_del_iters;
for (MemTable* m : mems_) {
memtables.push_back(m->NewIterator(ro, &arena));
auto* range_del_iter = m->NewRangeTombstoneIterator(ro, kMaxSequenceNumber);
auto* range_del_iter = m->NewRangeTombstoneIterator(
ro, kMaxSequenceNumber, true /* immutable_memtable */);
if (range_del_iter != nullptr) {
range_del_iters.emplace_back(range_del_iter);
}
@ -585,6 +586,8 @@ Status FlushJob::MemPurge() {
// as in need of being flushed.
if (new_mem->ApproximateMemoryUsage() < maxSize &&
!(new_mem->ShouldFlushNow())) {
// Construct fragmented memtable range tombstones without mutex
new_mem->ConstructFragmentedRangeTombstones();
db_mutex_->Lock();
uint64_t new_mem_id = mems_[0]->GetID();
@ -732,7 +735,8 @@ bool FlushJob::MemPurgeDecider(double threshold) {
// Estimate if the sample entry is valid or not.
get_res = mt->Get(lkey, &vget, nullptr, &mget_s, &merge_context,
&max_covering_tombstone_seq, &sqno, ro);
&max_covering_tombstone_seq, &sqno, ro,
true /* immutable_memtable */);
if (!get_res) {
ROCKS_LOG_WARN(
db_options_.info_log,
@ -773,7 +777,8 @@ bool FlushJob::MemPurgeDecider(double threshold) {
next_mem_iter != std::end(mems_); next_mem_iter++) {
if ((*next_mem_iter)
->Get(lkey, &vget, nullptr, &mget_s, &merge_context,
&max_covering_tombstone_seq, &sqno, ro)) {
&max_covering_tombstone_seq, &sqno, ro,
true /* immutable_memtable */)) {
not_in_next_mems = false;
break;
}
@ -857,8 +862,8 @@ Status FlushJob::WriteLevel0Table() {
"[%s] [JOB %d] Flushing memtable with next log file: %" PRIu64 "\n",
cfd_->GetName().c_str(), job_context_->job_id, m->GetNextLogNumber());
memtables.push_back(m->NewIterator(ro, &arena));
auto* range_del_iter =
m->NewRangeTombstoneIterator(ro, kMaxSequenceNumber);
auto* range_del_iter = m->NewRangeTombstoneIterator(
ro, kMaxSequenceNumber, true /* immutable_memtable */);
if (range_del_iter != nullptr) {
range_del_iters.emplace_back(range_del_iter);
}

View File

@ -242,6 +242,7 @@ TEST_F(FlushJobTest, NonEmpty) {
mock::SortKVVector(&inserted_keys);
autovector<MemTable*> to_delete;
new_mem->ConstructFragmentedRangeTombstones();
cfd->imm()->Add(new_mem, &to_delete);
for (auto& m : to_delete) {
delete m;
@ -303,6 +304,7 @@ TEST_F(FlushJobTest, FlushMemTablesSingleColumnFamily) {
autovector<MemTable*> to_delete;
for (auto mem : new_mems) {
mem->ConstructFragmentedRangeTombstones();
cfd->imm()->Add(mem, &to_delete);
}
@ -372,7 +374,7 @@ TEST_F(FlushJobTest, FlushMemtablesMultipleColumnFamilies) {
ASSERT_OK(mem->Add(curr_seqno++, kTypeValue, key, value,
nullptr /* kv_prot_info */));
}
mem->ConstructFragmentedRangeTombstones();
cfd->imm()->Add(mem, &to_delete);
}
largest_seqs.push_back(curr_seqno - 1);
@ -505,6 +507,7 @@ TEST_F(FlushJobTest, Snapshots) {
mock::SortKVVector(&inserted_keys);
autovector<MemTable*> to_delete;
new_mem->ConstructFragmentedRangeTombstones();
cfd->imm()->Add(new_mem, &to_delete);
for (auto& m : to_delete) {
delete m;
@ -559,6 +562,7 @@ TEST_F(FlushJobTest, GetRateLimiterPriorityForWrite) {
autovector<MemTable*> to_delete;
for (auto mem : new_mems) {
mem->ConstructFragmentedRangeTombstones();
cfd->imm()->Add(mem, &to_delete);
}
@ -638,6 +642,7 @@ TEST_F(FlushJobTimestampTest, AllKeysExpired) {
SequenceNumber seq = (curr_seq_++);
AddKeyValueToMemtable(new_mem, test::EncodeInt(0), ts, seq,
ValueType::kTypeDeletionWithTimestamp, "");
new_mem->ConstructFragmentedRangeTombstones();
cfd->imm()->Add(new_mem, &to_delete);
}
@ -690,6 +695,7 @@ TEST_F(FlushJobTimestampTest, NoKeyExpired) {
AddKeyValueToMemtable(new_mem, test::EncodeInt(0), ts, seq,
ValueType::kTypeValue, "0_value");
}
new_mem->ConstructFragmentedRangeTombstones();
cfd->imm()->Add(new_mem, &to_delete);
}

View File

@ -668,7 +668,8 @@ void ForwardIterator::RebuildIterators(bool refresh_sv) {
if (!read_options_.ignore_range_deletions) {
std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter(
sv_->mem->NewRangeTombstoneIterator(
read_options_, sv_->current->version_set()->LastSequence()));
read_options_, sv_->current->version_set()->LastSequence(),
false /* immutable_memtable */));
range_del_agg.AddTombstones(std::move(range_del_iter));
// Always return Status::OK().
Status temp_s = sv_->imm->AddRangeTombstoneIterators(read_options_, &arena_,
@ -733,7 +734,8 @@ void ForwardIterator::RenewIterators() {
if (!read_options_.ignore_range_deletions) {
std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter(
svnew->mem->NewRangeTombstoneIterator(
read_options_, sv_->current->version_set()->LastSequence()));
read_options_, sv_->current->version_set()->LastSequence(),
false /* immutable_memtable */));
range_del_agg.AddTombstones(std::move(range_del_iter));
// Always return Status::OK().
Status temp_s = svnew->imm->AddRangeTombstoneIterators(

View File

@ -445,16 +445,28 @@ InternalIterator* MemTable::NewIterator(const ReadOptions& read_options,
}
FragmentedRangeTombstoneIterator* MemTable::NewRangeTombstoneIterator(
const ReadOptions& read_options, SequenceNumber read_seq) {
const ReadOptions& read_options, SequenceNumber read_seq,
bool immutable_memtable) {
if (read_options.ignore_range_deletions ||
is_range_del_table_empty_.load(std::memory_order_relaxed)) {
return nullptr;
}
return NewRangeTombstoneIteratorInternal(read_options, read_seq);
return NewRangeTombstoneIteratorInternal(read_options, read_seq,
immutable_memtable);
}
FragmentedRangeTombstoneIterator* MemTable::NewRangeTombstoneIteratorInternal(
const ReadOptions& read_options, SequenceNumber read_seq) {
const ReadOptions& read_options, SequenceNumber read_seq,
bool immutable_memtable) {
if (immutable_memtable) {
// Note that caller should already have verified that
// !is_range_del_table_empty_
assert(IsFragmentedRangeTombstonesConstructed());
return new FragmentedRangeTombstoneIterator(
fragmented_range_tombstone_list_.get(), comparator_.comparator,
read_seq);
}
auto* unfragmented_iter = new MemTableIterator(
*this, read_options, nullptr /* arena */, true /* use_range_del_table */);
auto fragmented_tombstone_list =
@ -467,6 +479,21 @@ FragmentedRangeTombstoneIterator* MemTable::NewRangeTombstoneIteratorInternal(
return fragmented_iter;
}
void MemTable::ConstructFragmentedRangeTombstones() {
assert(!IsFragmentedRangeTombstonesConstructed(false));
// There should be no concurrent Construction
if (!is_range_del_table_empty_.load(std::memory_order_relaxed)) {
auto* unfragmented_iter =
new MemTableIterator(*this, ReadOptions(), nullptr /* arena */,
true /* use_range_del_table */);
fragmented_range_tombstone_list_ =
std::make_unique<FragmentedRangeTombstoneList>(
std::unique_ptr<InternalIterator>(unfragmented_iter),
comparator_.comparator);
}
}
port::RWMutex* MemTable::GetLock(const Slice& key) {
return &locks_[GetSliceRangedNPHash(key, locks_.size())];
}
@ -885,7 +912,8 @@ bool MemTable::Get(const LookupKey& key, std::string* value,
MergeContext* merge_context,
SequenceNumber* max_covering_tombstone_seq,
SequenceNumber* seq, const ReadOptions& read_opts,
ReadCallback* callback, bool* is_blob_index, bool do_merge) {
bool immutable_memtable, ReadCallback* callback,
bool* is_blob_index, bool do_merge) {
// The sequence number is updated synchronously in version_set.h
if (IsEmpty()) {
// Avoiding recording stats for speed.
@ -895,7 +923,8 @@ bool MemTable::Get(const LookupKey& key, std::string* value,
std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter(
NewRangeTombstoneIterator(read_opts,
GetInternalKeySeqno(key.internal_key())));
GetInternalKeySeqno(key.internal_key()),
immutable_memtable));
if (range_del_iter != nullptr) {
*max_covering_tombstone_seq =
std::max(*max_covering_tombstone_seq,
@ -977,7 +1006,7 @@ void MemTable::GetFromTable(const LookupKey& key,
}
void MemTable::MultiGet(const ReadOptions& read_options, MultiGetRange* range,
ReadCallback* callback) {
ReadCallback* callback, bool immutable_memtable) {
// The sequence number is updated synchronously in version_set.h
if (IsEmpty()) {
// Avoiding recording stats for speed.
@ -1024,7 +1053,8 @@ void MemTable::MultiGet(const ReadOptions& read_options, MultiGetRange* range,
if (!no_range_del) {
std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter(
NewRangeTombstoneIteratorInternal(
read_options, GetInternalKeySeqno(iter->lkey->internal_key())));
read_options, GetInternalKeySeqno(iter->lkey->internal_key()),
immutable_memtable));
iter->max_covering_tombstone_seq = std::max(
iter->max_covering_tombstone_seq,
range_del_iter->MaxCoveringTombstoneSeqnum(iter->lkey->user_key()));

View File

@ -202,8 +202,19 @@ class MemTable {
// those allocated in arena.
InternalIterator* NewIterator(const ReadOptions& read_options, Arena* arena);
// Returns an iterator that yields the range tombstones of the memtable.
// The caller must ensure that the underlying MemTable remains live
// while the returned iterator is live.
// @param immutable_memtable Whether this memtable is an immutable memtable.
// This information is not stored in memtable itself, so it needs to be
// specified by the caller. This flag is used internally to decide whether a
// cached fragmented range tombstone list can be returned. This cached version
// is constructed when a memtable becomes immutable. Setting the flag to false
// will always yield correct result, but may incur performance penalty as it
// always creates a new fragmented range tombstone list.
FragmentedRangeTombstoneIterator* NewRangeTombstoneIterator(
const ReadOptions& read_options, SequenceNumber read_seq);
const ReadOptions& read_options, SequenceNumber read_seq,
bool immutable_memtable);
Status VerifyEncodedEntry(Slice encoded,
const ProtectionInfoKVOS64& kv_prot_info);
@ -244,35 +255,44 @@ class MemTable {
// If do_merge = false then any Merge Operands encountered for key are simply
// stored in merge_context.operands_list and never actually merged to get a
// final value. The raw Merge Operands are eventually returned to the user.
// @param immutable_memtable Whether this memtable is immutable. Used
// internally by NewRangeTombstoneIterator(). See comment above
// NewRangeTombstoneIterator() for more detail.
bool Get(const LookupKey& key, std::string* value, Status* s,
MergeContext* merge_context,
SequenceNumber* max_covering_tombstone_seq, SequenceNumber* seq,
const ReadOptions& read_opts, ReadCallback* callback = nullptr,
bool* is_blob_index = nullptr, bool do_merge = true) {
const ReadOptions& read_opts, bool immutable_memtable,
ReadCallback* callback = nullptr, bool* is_blob_index = nullptr,
bool do_merge = true) {
return Get(key, value, /*timestamp=*/nullptr, s, merge_context,
max_covering_tombstone_seq, seq, read_opts, callback,
is_blob_index, do_merge);
max_covering_tombstone_seq, seq, read_opts, immutable_memtable,
callback, is_blob_index, do_merge);
}
bool Get(const LookupKey& key, std::string* value, std::string* timestamp,
Status* s, MergeContext* merge_context,
SequenceNumber* max_covering_tombstone_seq, SequenceNumber* seq,
const ReadOptions& read_opts, ReadCallback* callback = nullptr,
bool* is_blob_index = nullptr, bool do_merge = true);
const ReadOptions& read_opts, bool immutable_memtable,
ReadCallback* callback = nullptr, bool* is_blob_index = nullptr,
bool do_merge = true);
bool Get(const LookupKey& key, std::string* value, std::string* timestamp,
Status* s, MergeContext* merge_context,
SequenceNumber* max_covering_tombstone_seq,
const ReadOptions& read_opts, ReadCallback* callback = nullptr,
bool* is_blob_index = nullptr, bool do_merge = true) {
const ReadOptions& read_opts, bool immutable_memtable,
ReadCallback* callback = nullptr, bool* is_blob_index = nullptr,
bool do_merge = true) {
SequenceNumber seq;
return Get(key, value, timestamp, s, merge_context,
max_covering_tombstone_seq, &seq, read_opts, callback,
is_blob_index, do_merge);
max_covering_tombstone_seq, &seq, read_opts, immutable_memtable,
callback, is_blob_index, do_merge);
}
// @param immutable_memtable Whether this memtable is immutable. Used
// internally by NewRangeTombstoneIterator(). See comment above
// NewRangeTombstoneIterator() for more detail.
void MultiGet(const ReadOptions& read_options, MultiGetRange* range,
ReadCallback* callback);
ReadCallback* callback, bool immutable_memtable);
// If `key` exists in current memtable with type value_type and the existing
// value is at least as large as the new value, updates it in-place. Otherwise
@ -502,6 +522,23 @@ class MemTable {
// Returns a heuristic flush decision
bool ShouldFlushNow();
void ConstructFragmentedRangeTombstones();
// Returns whether a fragmented range tombstone list is already constructed
// for this memtable. It should be constructed right before a memtable is
// added to an immutable memtable list. Note that if a memtable does not have
// any range tombstone, then no range tombstone list will ever be constructed.
// @param allow_empty Specifies whether a memtable with no range tombstone is
// considered to have its fragmented range tombstone list constructed.
bool IsFragmentedRangeTombstonesConstructed(bool allow_empty = true) const {
if (allow_empty) {
return fragmented_range_tombstone_list_.get() != nullptr ||
is_range_del_table_empty_;
} else {
return fragmented_range_tombstone_list_.get() != nullptr;
}
}
private:
enum FlushStateEnum { FLUSH_NOT_REQUESTED, FLUSH_REQUESTED, FLUSH_SCHEDULED };
@ -601,9 +638,18 @@ class MemTable {
MergeContext* merge_context, SequenceNumber* seq,
bool* found_final_value, bool* merge_in_progress);
// Always returns non-null and assumes certain pre-checks are done
// Always returns non-null and assumes certain pre-checks (e.g.,
// is_range_del_table_empty_) are done. This is only valid during the lifetime
// of the underlying memtable.
FragmentedRangeTombstoneIterator* NewRangeTombstoneIteratorInternal(
const ReadOptions& read_options, SequenceNumber read_seq);
const ReadOptions& read_options, SequenceNumber read_seq,
bool immutable_memtable);
// The fragmented range tombstones of this memtable.
// This is constructed when this memtable becomes immutable
// if !is_range_del_table_empty_.
std::unique_ptr<FragmentedRangeTombstoneList>
fragmented_range_tombstone_list_;
};
extern const char* EncodeKey(std::string* scratch, const Slice& target);

View File

@ -119,7 +119,8 @@ void MemTableListVersion::MultiGet(const ReadOptions& read_options,
MultiGetRange* range,
ReadCallback* callback) {
for (auto memtable : memlist_) {
memtable->MultiGet(read_options, range, callback);
memtable->MultiGet(read_options, range, callback,
true /* immutable_memtable */);
if (range->empty()) {
return;
}
@ -130,9 +131,10 @@ bool MemTableListVersion::GetMergeOperands(
const LookupKey& key, Status* s, MergeContext* merge_context,
SequenceNumber* max_covering_tombstone_seq, const ReadOptions& read_opts) {
for (MemTable* memtable : memlist_) {
bool done = memtable->Get(key, /*value*/ nullptr, /*timestamp*/ nullptr, s,
merge_context, max_covering_tombstone_seq,
read_opts, nullptr, nullptr, false);
bool done =
memtable->Get(key, /*value*/ nullptr, /*timestamp*/ nullptr, s,
merge_context, max_covering_tombstone_seq, read_opts,
true /* immutable_memtable */, nullptr, nullptr, false);
if (done) {
return true;
}
@ -157,11 +159,13 @@ bool MemTableListVersion::GetFromList(
*seq = kMaxSequenceNumber;
for (auto& memtable : *list) {
assert(memtable->IsFragmentedRangeTombstonesConstructed());
SequenceNumber current_seq = kMaxSequenceNumber;
bool done = memtable->Get(key, value, timestamp, s, merge_context,
max_covering_tombstone_seq, &current_seq,
read_opts, callback, is_blob_index);
bool done =
memtable->Get(key, value, timestamp, s, merge_context,
max_covering_tombstone_seq, &current_seq, read_opts,
true /* immutable_memtable */, callback, is_blob_index);
if (*seq == kMaxSequenceNumber) {
// Store the most recent sequence number of any operation on this key.
// Since we only care about the most recent change, we only need to
@ -194,8 +198,10 @@ Status MemTableListVersion::AddRangeTombstoneIterators(
? read_opts.snapshot->GetSequenceNumber()
: kMaxSequenceNumber;
for (auto& m : memlist_) {
assert(m->IsFragmentedRangeTombstonesConstructed());
std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter(
m->NewRangeTombstoneIterator(read_opts, read_seq));
m->NewRangeTombstoneIterator(read_opts, read_seq,
true /* immutable_memtable */));
range_del_agg->AddTombstones(std::move(range_del_iter));
}
return Status::OK();

View File

@ -267,22 +267,25 @@ TEST_F(MemTableListTest, GetTest) {
// Fetch the newly written keys
merge_context.Clear();
found = mem->Get(LookupKey("key1", seq), &value,
/*timestamp*/nullptr, &s, &merge_context,
&max_covering_tombstone_seq, ReadOptions());
/*timestamp*/ nullptr, &s, &merge_context,
&max_covering_tombstone_seq, ReadOptions(),
false /* immutable_memtable */);
ASSERT_TRUE(s.ok() && found);
ASSERT_EQ(value, "value1");
merge_context.Clear();
found = mem->Get(LookupKey("key1", 2), &value,
/*timestamp*/nullptr, &s, &merge_context,
&max_covering_tombstone_seq, ReadOptions());
/*timestamp*/ nullptr, &s, &merge_context,
&max_covering_tombstone_seq, ReadOptions(),
false /* immutable_memtable */);
// MemTable found out that this key is *not* found (at this sequence#)
ASSERT_TRUE(found && s.IsNotFound());
merge_context.Clear();
found = mem->Get(LookupKey("key2", seq), &value,
/*timestamp*/nullptr, &s, &merge_context,
&max_covering_tombstone_seq, ReadOptions());
/*timestamp*/ nullptr, &s, &merge_context,
&max_covering_tombstone_seq, ReadOptions(),
false /* immutable_memtable */);
ASSERT_TRUE(s.ok() && found);
ASSERT_EQ(value, "value2.2");
@ -290,6 +293,9 @@ TEST_F(MemTableListTest, GetTest) {
ASSERT_EQ(1, mem->num_deletes());
// Add memtable to list
// This is to make assert(memtable->IsFragmentedRangeTombstonesConstructed())
// in MemTableListVersion::GetFromList work.
mem->ConstructFragmentedRangeTombstones();
list.Add(mem, &to_delete);
SequenceNumber saved_seq = seq;
@ -306,6 +312,9 @@ TEST_F(MemTableListTest, GetTest) {
nullptr /* kv_prot_info */));
// Add second memtable to list
// This is to make assert(memtable->IsFragmentedRangeTombstonesConstructed())
// in MemTableListVersion::GetFromList work.
mem2->ConstructFragmentedRangeTombstones();
list.Add(mem2, &to_delete);
// Fetch keys via MemTableList
@ -388,19 +397,24 @@ TEST_F(MemTableListTest, GetFromHistoryTest) {
// Fetch the newly written keys
merge_context.Clear();
found = mem->Get(LookupKey("key1", seq), &value,
/*timestamp*/nullptr, &s, &merge_context,
&max_covering_tombstone_seq, ReadOptions());
/*timestamp*/ nullptr, &s, &merge_context,
&max_covering_tombstone_seq, ReadOptions(),
false /* immutable_memtable */);
// MemTable found out that this key is *not* found (at this sequence#)
ASSERT_TRUE(found && s.IsNotFound());
merge_context.Clear();
found = mem->Get(LookupKey("key2", seq), &value,
/*timestamp*/nullptr, &s, &merge_context,
&max_covering_tombstone_seq, ReadOptions());
/*timestamp*/ nullptr, &s, &merge_context,
&max_covering_tombstone_seq, ReadOptions(),
false /* immutable_memtable */);
ASSERT_TRUE(s.ok() && found);
ASSERT_EQ(value, "value2.2");
// Add memtable to list
// This is to make assert(memtable->IsFragmentedRangeTombstonesConstructed())
// in MemTableListVersion::GetFromList work.
mem->ConstructFragmentedRangeTombstones();
list.Add(mem, &to_delete);
ASSERT_EQ(0, to_delete.size());
@ -472,6 +486,9 @@ TEST_F(MemTableListTest, GetFromHistoryTest) {
nullptr /* kv_prot_info */));
// Add second memtable to list
// This is to make assert(memtable->IsFragmentedRangeTombstonesConstructed())
// in MemTableListVersion::GetFromList work.
mem2->ConstructFragmentedRangeTombstones();
list.Add(mem2, &to_delete);
ASSERT_EQ(0, to_delete.size());
@ -493,6 +510,9 @@ TEST_F(MemTableListTest, GetFromHistoryTest) {
MemTable* mem3 = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb3,
kMaxSequenceNumber, 0 /* column_family_id */);
mem3->Ref();
// This is to make assert(memtable->IsFragmentedRangeTombstonesConstructed())
// in MemTableListVersion::GetFromList work.
mem3->ConstructFragmentedRangeTombstones();
list.Add(mem3, &to_delete);
ASSERT_EQ(1, list.NumNotFlushed());
ASSERT_EQ(1, list.NumFlushed());

View File

@ -431,8 +431,8 @@ class Repairer {
auto write_hint = cfd->CalculateSSTWriteHint(0);
std::vector<std::unique_ptr<FragmentedRangeTombstoneIterator>>
range_del_iters;
auto range_del_iter =
mem->NewRangeTombstoneIterator(ro, kMaxSequenceNumber);
auto range_del_iter = mem->NewRangeTombstoneIterator(
ro, kMaxSequenceNumber, false /* immutable_memtable */);
if (range_del_iter != nullptr) {
range_del_iters.emplace_back(range_del_iter);
}

View File

@ -60,7 +60,8 @@ static std::string PrintContents(WriteBatch* b,
arena_iter_guard.set(iter);
} else {
iter = mem->NewRangeTombstoneIterator(ReadOptions(),
kMaxSequenceNumber /* read_seq */);
kMaxSequenceNumber /* read_seq */,
false /* immutable_memtable */);
iter_guard.reset(iter);
}
if (iter == nullptr) {

View File

@ -4186,7 +4186,8 @@ TEST_F(MemTableTest, Simple) {
arena_iter_guard.set(iter);
} else {
iter = GetMemTable()->NewRangeTombstoneIterator(
ReadOptions(), kMaxSequenceNumber /* read_seq */);
ReadOptions(), kMaxSequenceNumber /* read_seq */,
false /* immutable_memtable */);
iter_guard.reset(iter);
}
if (iter == nullptr) {

View File

@ -5099,6 +5099,7 @@ class Benchmark {
int64_t num_written = 0;
int64_t next_seq_db_at = num_ops;
size_t id = 0;
int64_t num_range_deletions = 0;
while ((num_per_key_gen != 0) && !duration.Done(entries_per_batch_)) {
if (duration.GetStage() != stage) {
@ -5296,6 +5297,7 @@ class Benchmark {
(num_written - writes_before_delete_range_) %
writes_per_range_tombstone_ ==
0) {
num_range_deletions++;
int64_t begin_num = key_gens[id]->Next();
if (FLAGS_expand_range_tombstones) {
for (int64_t offset = 0; offset < range_tombstone_width_;
@ -5401,6 +5403,10 @@ class Benchmark {
".\nNumber of 'disposable entry delete': %" PRIu64 "\n",
num_written, num_selective_deletes);
}
if (num_range_deletions > 0) {
std::cout << "Number of range deletions: " << num_range_deletions
<< std::endl;
}
thread->stats.AddBytes(bytes);
}