From e3a06f12d27fd50af7b6c5941973f529601f9a3e Mon Sep 17 00:00:00 2001 From: Yi Wu Date: Tue, 12 Dec 2017 11:02:07 -0800 Subject: [PATCH] WritePrepared Txn: fix compaction filter snapshot checks Summary: Add snapshot_checker check whenever we need to check sequence against snapshots and decide what to do with an input key. The changes are related to one of: * compaction filter * single delete * delete at bottom level * merge Closes https://github.com/facebook/rocksdb/pull/3251 Differential Revision: D6537850 Pulled By: yiwu-arbug fbshipit-source-id: 3faba40ed5e37779f4a0cb7ae78af9546659c7f2 --- db/compaction_iterator.cc | 30 +++++++++++++++++++++++++----- 1 file changed, 25 insertions(+), 5 deletions(-) diff --git a/db/compaction_iterator.cc b/db/compaction_iterator.cc index dfda5e71f1..613e40a8ee 100644 --- a/db/compaction_iterator.cc +++ b/db/compaction_iterator.cc @@ -184,8 +184,11 @@ void CompactionIterator::InvokeFilterIfNeeded(bool* need_skip, Slice* skip_until) { if (compaction_filter_ != nullptr && (ikey_.type == kTypeValue || ikey_.type == kTypeBlobIndex) && - (visible_at_tip_ || ikey_.sequence > latest_snapshot_ || - ignore_snapshots_)) { + (visible_at_tip_ || ignore_snapshots_ || + ikey_.sequence > latest_snapshot_ || + (snapshot_checker_ != nullptr && + UNLIKELY(!snapshot_checker_->IsInSnapshot(ikey_.sequence, + latest_snapshot_))))) { // If the user has specified a compaction filter and the sequence // number is greater than any external snapshot, then invoke the // filter. If the return value of the compaction filter is true, @@ -411,7 +414,10 @@ void CompactionIterator::NextFromInput() { cmp_->Equal(ikey_.user_key, next_ikey.user_key)) { // Check whether the next key belongs to the same snapshot as the // SingleDelete. - if (prev_snapshot == 0 || next_ikey.sequence > prev_snapshot) { + if (prev_snapshot == 0 || next_ikey.sequence > prev_snapshot || + (snapshot_checker_ != nullptr && + UNLIKELY(!snapshot_checker_->IsInSnapshot(next_ikey.sequence, + prev_snapshot)))) { if (next_ikey.type == kTypeSingleDeletion) { // We encountered two SingleDeletes in a row. This could be due to // unexpected user input. @@ -422,8 +428,12 @@ void CompactionIterator::NextFromInput() { // input_->Next(). ++iter_stats_.num_record_drop_obsolete; ++iter_stats_.num_single_del_mismatch; - } else if ((ikey_.sequence <= earliest_write_conflict_snapshot_) || - has_outputted_key_) { + } else if (has_outputted_key_ || + (ikey_.sequence <= earliest_write_conflict_snapshot_ && + (snapshot_checker_ == nullptr || + LIKELY(snapshot_checker_->IsInSnapshot( + ikey_.sequence, + earliest_write_conflict_snapshot_))))) { // Found a matching value, we can drop the single delete and the // value. It is safe to drop both records since we've already // outputted a key in this snapshot, or there is no earlier @@ -471,6 +481,9 @@ void CompactionIterator::NextFromInput() { // comparison, so the value of has_current_user_key does not matter. has_current_user_key_ = false; if (compaction_ != nullptr && ikey_.sequence <= earliest_snapshot_ && + (snapshot_checker_ == nullptr || + LIKELY(snapshot_checker_->IsInSnapshot(ikey_.sequence, + earliest_snapshot_))) && compaction_->KeyNotExistsBeyondOutputLevel(ikey_.user_key, &level_ptrs_)) { // Key doesn't exist outside of this range. @@ -504,6 +517,9 @@ void CompactionIterator::NextFromInput() { input_->Next(); } else if (compaction_ != nullptr && ikey_.type == kTypeDeletion && ikey_.sequence <= earliest_snapshot_ && + (snapshot_checker_ == nullptr || + LIKELY(snapshot_checker_->IsInSnapshot(ikey_.sequence, + earliest_snapshot_))) && ikeyNotNeededForIncrementalSnapshot() && compaction_->KeyNotExistsBeyondOutputLevel(ikey_.user_key, &level_ptrs_)) { @@ -538,6 +554,10 @@ void CompactionIterator::NextFromInput() { // have hit (A) // We encapsulate the merge related state machine in a different // object to minimize change to the existing flow. + // In case snapshot_checker is present, we can probably merge further + // beyond prev_snapshot, since there could be more keys with sequence + // smaller than prev_snapshot, but reported by snapshot_checker as not + // visible by prev_snapshot. But it will make the logic more complicated. Status s = merge_helper_->MergeUntil(input_, range_del_agg_, prev_snapshot, bottommost_level_); merge_out_iter_.SeekToFirst();