WritePrepared Txn: fix compaction filter snapshot checks

Summary:
Add snapshot_checker check whenever we need to check sequence against snapshots and decide what to do with an input key. The changes are related to one of:
* compaction filter
* single delete
* delete at bottom level
* merge
Closes https://github.com/facebook/rocksdb/pull/3251

Differential Revision: D6537850

Pulled By: yiwu-arbug

fbshipit-source-id: 3faba40ed5e37779f4a0cb7ae78af9546659c7f2
This commit is contained in:
Yi Wu 2017-12-12 11:02:07 -08:00 committed by Facebook Github Bot
parent a9c8d4ef15
commit e3a06f12d2
1 changed files with 25 additions and 5 deletions

View File

@ -184,8 +184,11 @@ void CompactionIterator::InvokeFilterIfNeeded(bool* need_skip,
Slice* skip_until) { Slice* skip_until) {
if (compaction_filter_ != nullptr && if (compaction_filter_ != nullptr &&
(ikey_.type == kTypeValue || ikey_.type == kTypeBlobIndex) && (ikey_.type == kTypeValue || ikey_.type == kTypeBlobIndex) &&
(visible_at_tip_ || ikey_.sequence > latest_snapshot_ || (visible_at_tip_ || ignore_snapshots_ ||
ignore_snapshots_)) { ikey_.sequence > latest_snapshot_ ||
(snapshot_checker_ != nullptr &&
UNLIKELY(!snapshot_checker_->IsInSnapshot(ikey_.sequence,
latest_snapshot_))))) {
// If the user has specified a compaction filter and the sequence // If the user has specified a compaction filter and the sequence
// number is greater than any external snapshot, then invoke the // number is greater than any external snapshot, then invoke the
// filter. If the return value of the compaction filter is true, // filter. If the return value of the compaction filter is true,
@ -411,7 +414,10 @@ void CompactionIterator::NextFromInput() {
cmp_->Equal(ikey_.user_key, next_ikey.user_key)) { cmp_->Equal(ikey_.user_key, next_ikey.user_key)) {
// Check whether the next key belongs to the same snapshot as the // Check whether the next key belongs to the same snapshot as the
// SingleDelete. // SingleDelete.
if (prev_snapshot == 0 || next_ikey.sequence > prev_snapshot) { if (prev_snapshot == 0 || next_ikey.sequence > prev_snapshot ||
(snapshot_checker_ != nullptr &&
UNLIKELY(!snapshot_checker_->IsInSnapshot(next_ikey.sequence,
prev_snapshot)))) {
if (next_ikey.type == kTypeSingleDeletion) { if (next_ikey.type == kTypeSingleDeletion) {
// We encountered two SingleDeletes in a row. This could be due to // We encountered two SingleDeletes in a row. This could be due to
// unexpected user input. // unexpected user input.
@ -422,8 +428,12 @@ void CompactionIterator::NextFromInput() {
// input_->Next(). // input_->Next().
++iter_stats_.num_record_drop_obsolete; ++iter_stats_.num_record_drop_obsolete;
++iter_stats_.num_single_del_mismatch; ++iter_stats_.num_single_del_mismatch;
} else if ((ikey_.sequence <= earliest_write_conflict_snapshot_) || } else if (has_outputted_key_ ||
has_outputted_key_) { (ikey_.sequence <= earliest_write_conflict_snapshot_ &&
(snapshot_checker_ == nullptr ||
LIKELY(snapshot_checker_->IsInSnapshot(
ikey_.sequence,
earliest_write_conflict_snapshot_))))) {
// Found a matching value, we can drop the single delete and the // Found a matching value, we can drop the single delete and the
// value. It is safe to drop both records since we've already // value. It is safe to drop both records since we've already
// outputted a key in this snapshot, or there is no earlier // outputted a key in this snapshot, or there is no earlier
@ -471,6 +481,9 @@ void CompactionIterator::NextFromInput() {
// comparison, so the value of has_current_user_key does not matter. // comparison, so the value of has_current_user_key does not matter.
has_current_user_key_ = false; has_current_user_key_ = false;
if (compaction_ != nullptr && ikey_.sequence <= earliest_snapshot_ && if (compaction_ != nullptr && ikey_.sequence <= earliest_snapshot_ &&
(snapshot_checker_ == nullptr ||
LIKELY(snapshot_checker_->IsInSnapshot(ikey_.sequence,
earliest_snapshot_))) &&
compaction_->KeyNotExistsBeyondOutputLevel(ikey_.user_key, compaction_->KeyNotExistsBeyondOutputLevel(ikey_.user_key,
&level_ptrs_)) { &level_ptrs_)) {
// Key doesn't exist outside of this range. // Key doesn't exist outside of this range.
@ -504,6 +517,9 @@ void CompactionIterator::NextFromInput() {
input_->Next(); input_->Next();
} else if (compaction_ != nullptr && ikey_.type == kTypeDeletion && } else if (compaction_ != nullptr && ikey_.type == kTypeDeletion &&
ikey_.sequence <= earliest_snapshot_ && ikey_.sequence <= earliest_snapshot_ &&
(snapshot_checker_ == nullptr ||
LIKELY(snapshot_checker_->IsInSnapshot(ikey_.sequence,
earliest_snapshot_))) &&
ikeyNotNeededForIncrementalSnapshot() && ikeyNotNeededForIncrementalSnapshot() &&
compaction_->KeyNotExistsBeyondOutputLevel(ikey_.user_key, compaction_->KeyNotExistsBeyondOutputLevel(ikey_.user_key,
&level_ptrs_)) { &level_ptrs_)) {
@ -538,6 +554,10 @@ void CompactionIterator::NextFromInput() {
// have hit (A) // have hit (A)
// We encapsulate the merge related state machine in a different // We encapsulate the merge related state machine in a different
// object to minimize change to the existing flow. // object to minimize change to the existing flow.
// In case snapshot_checker is present, we can probably merge further
// beyond prev_snapshot, since there could be more keys with sequence
// smaller than prev_snapshot, but reported by snapshot_checker as not
// visible by prev_snapshot. But it will make the logic more complicated.
Status s = merge_helper_->MergeUntil(input_, range_del_agg_, Status s = merge_helper_->MergeUntil(input_, range_del_agg_,
prev_snapshot, bottommost_level_); prev_snapshot, bottommost_level_);
merge_out_iter_.SeekToFirst(); merge_out_iter_.SeekToFirst();