Less linear search in DBIter::Seek() when keys are overwritten a lot

Summary:
In one deployment we saw high latencies (presumably from slow iterator operations) and a lot of CPU time reported by perf with this stack:

```
  rocksdb::MergingIterator::Next
  rocksdb::DBIter::FindNextUserEntryInternal
  rocksdb::DBIter::Seek
```

I think what's happening is:
1. we create a snapshot iterator,
2. we do lots of Put()s for the same key x; this creates lots of entries in memtable,
3. we seek the iterator to a key slightly smaller than x,
4. the seek walks over lots of entries in memtable for key x, skipping them because of high sequence numbers.

CC IslamAbdelRahman
Closes https://github.com/facebook/rocksdb/pull/1413

Differential Revision: D4083879

Pulled By: IslamAbdelRahman

fbshipit-source-id: a83ddae
This commit is contained in:
Mike Kolupaev 2016-11-28 10:12:28 -08:00 committed by Facebook Github Bot
parent cd7c4143d7
commit 236d4c67e9
6 changed files with 210 additions and 72 deletions

1
.gitignore vendored
View File

@ -37,6 +37,7 @@ util/build_version.cc
build_tools/VALGRIND_LOGS/
coverage/COVERAGE_REPORT
.gdbhistory
.gdb_history
package/
.phutil_module_cache
unity.a

View File

@ -339,7 +339,7 @@ void DBIter::Next() {
// saved_value_ => the merged value
//
// NOTE: In between, saved_key_ can point to a user key that has
// a delete marker
// a delete marker or a sequence number higher than sequence_
//
// The prefix_check parameter controls whether we check the iterated
// keys against the prefix of the seeked key. Set to false when
@ -356,90 +356,137 @@ void DBIter::FindNextUserEntryInternal(bool skipping, bool prefix_check) {
assert(iter_->Valid());
assert(direction_ == kForward);
current_entry_is_merged_ = false;
// How many times in a row we have skipped an entry with user key less than
// or equal to saved_key_. We could skip these entries either because
// sequence numbers were too high or because skipping = true.
// What saved_key_ contains throughout this method:
// - if skipping : saved_key_ contains the key that we need to skip,
// and we haven't seen any keys greater than that,
// - if num_skipped > 0 : saved_key_ contains the key that we have skipped
// num_skipped times, and we haven't seen any keys
// greater than that,
// - none of the above : saved_key_ can contain anything, it doesn't matter.
uint64_t num_skipped = 0;
do {
ParsedInternalKey ikey;
if (ParseKey(&ikey)) {
if (iterate_upper_bound_ != nullptr &&
user_comparator_->Compare(ikey.user_key, *iterate_upper_bound_) >= 0) {
break;
}
if (!ParseKey(&ikey)) {
// Skip corrupted keys.
iter_->Next();
continue;
}
if (prefix_extractor_ && prefix_check &&
prefix_extractor_->Transform(ikey.user_key).compare(prefix_start_key_) != 0) {
break;
}
if (iterate_upper_bound_ != nullptr &&
user_comparator_->Compare(ikey.user_key, *iterate_upper_bound_) >= 0) {
break;
}
if (ikey.sequence <= sequence_) {
if (skipping &&
user_comparator_->Compare(ikey.user_key, saved_key_.GetKey()) <= 0) {
num_skipped++; // skip this entry
PERF_COUNTER_ADD(internal_key_skipped_count, 1);
} else {
switch (ikey.type) {
case kTypeDeletion:
case kTypeSingleDeletion:
if (prefix_extractor_ && prefix_check &&
prefix_extractor_->Transform(ikey.user_key)
.compare(prefix_start_key_) != 0) {
break;
}
if (ikey.sequence <= sequence_) {
if (skipping &&
user_comparator_->Compare(ikey.user_key, saved_key_.GetKey()) <= 0) {
num_skipped++; // skip this entry
PERF_COUNTER_ADD(internal_key_skipped_count, 1);
} else {
skipping = false; // ikey > saved_key_, i.e. saved_key_ is skipped
num_skipped = 0;
switch (ikey.type) {
case kTypeDeletion:
case kTypeSingleDeletion:
// Arrange to skip all upcoming entries for this key since
// they are hidden by this deletion.
saved_key_.SetKey(
ikey.user_key,
!iter_->IsKeyPinned() || !pin_thru_lifetime_ /* copy */);
skipping = true;
PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
break;
case kTypeValue:
saved_key_.SetKey(
ikey.user_key,
!iter_->IsKeyPinned() || !pin_thru_lifetime_ /* copy */);
if (range_del_agg_.ShouldDelete(ikey)) {
// Arrange to skip all upcoming entries for this key since
// they are hidden by this deletion.
saved_key_.SetKey(
ikey.user_key,
!iter_->IsKeyPinned() || !pin_thru_lifetime_ /* copy */);
skipping = true;
num_skipped = 0;
PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
break;
case kTypeValue:
saved_key_.SetKey(
ikey.user_key,
!iter_->IsKeyPinned() || !pin_thru_lifetime_ /* copy */);
if (range_del_agg_.ShouldDelete(ikey)) {
// Arrange to skip all upcoming entries for this key since
// they are hidden by this deletion.
skipping = true;
num_skipped = 0;
PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
} else {
valid_ = true;
return;
}
break;
case kTypeMerge:
saved_key_.SetKey(
ikey.user_key,
!iter_->IsKeyPinned() || !pin_thru_lifetime_ /* copy */);
if (range_del_agg_.ShouldDelete(ikey)) {
// Arrange to skip all upcoming entries for this key since
// they are hidden by this deletion.
skipping = true;
num_skipped = 0;
PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
} else {
// By now, we are sure the current ikey is going to yield a
// value
current_entry_is_merged_ = true;
valid_ = true;
MergeValuesNewToOld(); // Go to a different state machine
return;
}
break;
default:
assert(false);
break;
}
} else {
valid_ = true;
return;
}
break;
case kTypeMerge:
saved_key_.SetKey(
ikey.user_key,
!iter_->IsKeyPinned() || !pin_thru_lifetime_ /* copy */);
if (range_del_agg_.ShouldDelete(ikey)) {
// Arrange to skip all upcoming entries for this key since
// they are hidden by this deletion.
skipping = true;
num_skipped = 0;
PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
} else {
// By now, we are sure the current ikey is going to yield a
// value
current_entry_is_merged_ = true;
valid_ = true;
MergeValuesNewToOld(); // Go to a different state machine
return;
}
break;
default:
assert(false);
break;
}
}
} else {
// This key was inserted after our snapshot was taken.
PERF_COUNTER_ADD(internal_recent_skipped_count, 1);
// Here saved_key_ may contain some old key, or the default empty key, or
// key assigned by some random other method. We don't care.
if (user_comparator_->Compare(ikey.user_key, saved_key_.GetKey()) <= 0) {
num_skipped++;
} else {
saved_key_.SetKey(
ikey.user_key,
!iter_->IsKeyPinned() || !pin_thru_lifetime_ /* copy */);
skipping = false;
num_skipped = 0;
}
}
// If we have sequentially iterated via numerous keys and still not
// found the next user-key, then it is better to seek so that we can
// avoid too many key comparisons. We seek to the last occurrence of
// our current key by looking for sequence number 0 and type deletion
// (the smallest type).
if (skipping && num_skipped > max_skip_) {
// If we have sequentially iterated via numerous equal keys, then it's
// better to seek so that we can avoid too many key comparisons.
if (num_skipped > max_skip_) {
num_skipped = 0;
std::string last_key;
AppendInternalKey(&last_key, ParsedInternalKey(saved_key_.GetKey(), 0,
kTypeDeletion));
if (skipping) {
// We're looking for the next user-key but all we see are the same
// user-key with decreasing sequence numbers. Fast forward to
// sequence number 0 and type deletion (the smallest type).
AppendInternalKey(&last_key, ParsedInternalKey(saved_key_.GetKey(), 0,
kTypeDeletion));
// Don't set skipping = false because we may still see more user-keys
// equal to saved_key_.
} else {
// We saw multiple entries with this user key and sequence numbers
// higher than sequence_. Fast forward to sequence_.
// Note that this only covers a case when a higher key was overwritten
// many times since our snapshot was taken, not the case when a lot of
// different keys were inserted after our snapshot was taken.
AppendInternalKey(&last_key,
ParsedInternalKey(saved_key_.GetKey(), sequence_,
kValueTypeForSeek));
}
iter_->Seek(last_key);
RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION);
} else {
@ -503,6 +550,7 @@ void DBIter::MergeValuesNewToOld() {
// when complete, add result to operands and continue.
merge_context_.PushOperand(iter_->value(),
iter_->IsValuePinned() /* operand_pinned */);
PERF_COUNTER_ADD(internal_merge_count, 1);
} else {
assert(false);
}
@ -564,6 +612,11 @@ void DBIter::ReverseToBackward() {
FindParseableKey(&ikey, kReverse);
while (iter_->Valid() &&
user_comparator_->Compare(ikey.user_key, saved_key_.GetKey()) > 0) {
if (ikey.sequence > sequence_) {
PERF_COUNTER_ADD(internal_recent_skipped_count, 1);
} else {
PERF_COUNTER_ADD(internal_key_skipped_count, 1);
}
iter_->Prev();
FindParseableKey(&ikey, kReverse);
}
@ -676,6 +729,7 @@ bool DBIter::FindValueForCurrentKey() {
assert(merge_operator_ != nullptr);
merge_context_.PushOperandBack(
iter_->value(), iter_->IsValuePinned() /* operand_pinned */);
PERF_COUNTER_ADD(internal_merge_count, 1);
}
break;
default:
@ -760,6 +814,7 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() {
ikey.type == kTypeMerge && !range_del_agg_.ShouldDelete(ikey)) {
merge_context_.PushOperand(iter_->value(),
iter_->IsValuePinned() /* operand_pinned */);
PERF_COUNTER_ADD(internal_merge_count, 1);
iter_->Next();
FindParseableKey(&ikey, kForward);
}
@ -830,6 +885,11 @@ void DBIter::FindPrevUserKey() {
++num_skipped;
}
}
if (ikey.sequence > sequence_) {
PERF_COUNTER_ADD(internal_recent_skipped_count, 1);
} else {
PERF_COUNTER_ADD(internal_key_skipped_count, 1);
}
iter_->Prev();
FindParseableKey(&ikey, kReverse);
}

View File

@ -8,10 +8,10 @@
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "db/db_test_util.h"
#include "port/port.h"
#include "port/stack_trace.h"
#include "rocksdb/iostats_context.h"
#include "rocksdb/perf_context.h"
#include "port/port.h"
namespace rocksdb {
@ -1778,6 +1778,62 @@ TEST_F(DBIteratorTest, ReadAhead) {
delete iter;
}
// Insert a key, create a snapshot iterator, overwrite key lots of times,
// seek to a smaller key. Expect DBIter to fall back to a seek instead of
// going through all the overwrites linearly.
TEST_F(DBIteratorTest, DBIteratorSkipRecentDuplicatesTest) {
Options options = CurrentOptions();
options.env = env_;
options.create_if_missing = true;
options.max_sequential_skip_in_iterations = 3;
options.prefix_extractor = nullptr;
options.write_buffer_size = 1 << 27; // big enough to avoid flush
options.statistics = rocksdb::CreateDBStatistics();
DestroyAndReopen(options);
// Insert.
ASSERT_OK(Put("b", "0"));
// Create iterator.
ReadOptions ro;
std::unique_ptr<Iterator> iter(db_->NewIterator(ro));
// Insert a lot.
for (int i = 0; i < 100; ++i) {
ASSERT_OK(Put("b", std::to_string(i + 1).c_str()));
}
#ifndef ROCKSDB_LITE
// Check that memtable wasn't flushed.
std::string val;
ASSERT_TRUE(db_->GetProperty("rocksdb.num-files-at-level0", &val));
EXPECT_EQ("0", val);
#endif
// Seek iterator to a smaller key.
perf_context.Reset();
iter->Seek("a");
ASSERT_TRUE(iter->Valid());
EXPECT_EQ("b", iter->key().ToString());
EXPECT_EQ("0", iter->value().ToString());
// Check that the seek didn't do too much work.
// Checks are not tight, just make sure that everything is well below 100.
EXPECT_LT(perf_context.internal_key_skipped_count, 4);
EXPECT_LT(perf_context.internal_recent_skipped_count, 8);
EXPECT_LT(perf_context.seek_on_memtable_count, 10);
EXPECT_LT(perf_context.next_on_memtable_count, 10);
EXPECT_LT(perf_context.prev_on_memtable_count, 10);
// Check that iterator did something like what we expect.
EXPECT_EQ(perf_context.internal_delete_skipped_count, 0);
EXPECT_EQ(perf_context.internal_merge_count, 0);
EXPECT_GE(perf_context.internal_recent_skipped_count, 2);
EXPECT_GE(perf_context.seek_on_memtable_count, 2);
EXPECT_EQ(1, options.statistics->getTickerCount(
NUMBER_OF_RESEEKS_IN_ITERATION));
}
} // namespace rocksdb
int main(int argc, char** argv) {

View File

@ -322,11 +322,13 @@ class MemTableIterator : public InternalIterator {
valid_ = iter_->Valid();
}
virtual void Next() override {
PERF_COUNTER_ADD(next_on_memtable_count, 1);
assert(Valid());
iter_->Next();
valid_ = iter_->Valid();
}
virtual void Prev() override {
PERF_COUNTER_ADD(prev_on_memtable_count, 1);
assert(Valid());
iter_->Prev();
valid_ = iter_->Valid();

View File

@ -45,8 +45,7 @@ struct PerfContext {
// tombstones are not included in this counter, while previous updates
// hidden by the tombstones will be included here.
// 4. symmetric cases for Prev() and SeekToLast()
// We sometimes also skip entries of more recent updates than the snapshot
// we read from, but they are not included in this counter.
// internal_recent_skipped_count is not included in this counter.
//
uint64_t internal_key_skipped_count;
// Total number of deletes and single deletes skipped over during iteration
@ -57,6 +56,13 @@ struct PerfContext {
// still older updates invalidated by the tombstones.
//
uint64_t internal_delete_skipped_count;
// How many times iterators skipped over internal keys that are more recent
// than the snapshot that iterator is using.
//
uint64_t internal_recent_skipped_count;
// How many values were fed into merge operator by iterators.
//
uint64_t internal_merge_count;
uint64_t get_snapshot_time; // total nanos spent on getting snapshot
uint64_t get_from_memtable_time; // total nanos spent on querying memtables
@ -67,7 +73,12 @@ struct PerfContext {
// total nanos spent on seeking memtable
uint64_t seek_on_memtable_time;
// number of seeks issued on memtable
// (including SeekForPrev but not SeekToFirst and SeekToLast)
uint64_t seek_on_memtable_count;
// number of Next()s issued on memtable
uint64_t next_on_memtable_count;
// number of Prev()s issued on memtable
uint64_t prev_on_memtable_count;
// total nanos spent on seeking child iters
uint64_t seek_child_seek_time;
// number of seek issued in child iterators

View File

@ -28,6 +28,8 @@ void PerfContext::Reset() {
block_decompress_time = 0;
internal_key_skipped_count = 0;
internal_delete_skipped_count = 0;
internal_recent_skipped_count = 0;
internal_merge_count = 0;
write_wal_time = 0;
get_snapshot_time = 0;
@ -37,6 +39,8 @@ void PerfContext::Reset() {
get_from_output_files_time = 0;
seek_on_memtable_time = 0;
seek_on_memtable_count = 0;
next_on_memtable_count = 0;
prev_on_memtable_count = 0;
seek_child_seek_time = 0;
seek_child_seek_count = 0;
seek_min_heap_time = 0;
@ -80,6 +84,8 @@ std::string PerfContext::ToString(bool exclude_zero_counters) const {
PERF_CONTEXT_OUTPUT(block_decompress_time);
PERF_CONTEXT_OUTPUT(internal_key_skipped_count);
PERF_CONTEXT_OUTPUT(internal_delete_skipped_count);
PERF_CONTEXT_OUTPUT(internal_recent_skipped_count);
PERF_CONTEXT_OUTPUT(internal_merge_count);
PERF_CONTEXT_OUTPUT(write_wal_time);
PERF_CONTEXT_OUTPUT(get_snapshot_time);
PERF_CONTEXT_OUTPUT(get_from_memtable_time);
@ -88,6 +94,8 @@ std::string PerfContext::ToString(bool exclude_zero_counters) const {
PERF_CONTEXT_OUTPUT(get_from_output_files_time);
PERF_CONTEXT_OUTPUT(seek_on_memtable_time);
PERF_CONTEXT_OUTPUT(seek_on_memtable_count);
PERF_CONTEXT_OUTPUT(next_on_memtable_count);
PERF_CONTEXT_OUTPUT(prev_on_memtable_count);
PERF_CONTEXT_OUTPUT(seek_child_seek_time);
PERF_CONTEXT_OUTPUT(seek_child_seek_count);
PERF_CONTEXT_OUTPUT(seek_min_heap_time);