diff --git a/HISTORY.md b/HISTORY.md index d2f48e918b..0afa698b4a 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -2,6 +2,7 @@ ## Unreleased ### Public API Change * Support dynamically change `stats_dump_period_sec` option via SetDBOptions(). +* Added ReadOptions::max_skippable_internal_keys to set a threshold to fail a request as incomplete when too many keys are being skipped when using iterators. ### New Features * Memtable flush can be avoided during checkpoint creation if total log file size is smaller than a threshold specified by the user. diff --git a/db/db_impl.cc b/db/db_impl.cc index b590a2b076..ce38b471d3 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -4442,7 +4442,9 @@ Iterator* DBImpl::NewIterator(const ReadOptions& read_options, kMaxSequenceNumber, sv->mutable_cf_options.max_sequential_skip_in_iterations, sv->version_number, read_options.iterate_upper_bound, - read_options.prefix_same_as_start, read_options.pin_data); + read_options.prefix_same_as_start, read_options.pin_data, + read_options.total_order_seek, + read_options.max_skippable_internal_keys); #endif } else { SequenceNumber latest_snapshot = versions_->LastSequence(); @@ -4501,7 +4503,8 @@ Iterator* DBImpl::NewIterator(const ReadOptions& read_options, sv->mutable_cf_options.max_sequential_skip_in_iterations, sv->version_number, read_options.iterate_upper_bound, read_options.prefix_same_as_start, read_options.pin_data, - read_options.total_order_seek); + read_options.total_order_seek, + read_options.max_skippable_internal_keys); InternalIterator* internal_iter = NewInternalIterator(read_options, cfd, sv, db_iter->GetArena(), @@ -4553,7 +4556,9 @@ Status DBImpl::NewIterators( env_, *cfd->ioptions(), cfd->user_comparator(), iter, kMaxSequenceNumber, sv->mutable_cf_options.max_sequential_skip_in_iterations, - sv->version_number, nullptr, false, read_options.pin_data)); + sv->version_number, nullptr, false, read_options.pin_data, + read_options.total_order_seek, + read_options.max_skippable_internal_keys)); } #endif } else { @@ -4573,7 +4578,9 @@ Status DBImpl::NewIterators( ArenaWrappedDBIter* db_iter = NewArenaWrappedDbIterator( env_, *cfd->ioptions(), cfd->user_comparator(), snapshot, sv->mutable_cf_options.max_sequential_skip_in_iterations, - sv->version_number, nullptr, false, read_options.pin_data); + sv->version_number, nullptr, false, read_options.pin_data, + read_options.total_order_seek, + read_options.max_skippable_internal_keys); InternalIterator* internal_iter = NewInternalIterator(read_options, cfd, sv, db_iter->GetArena(), db_iter->GetRangeDelAggregator()); @@ -5160,7 +5167,6 @@ Status DBImpl::DelayWrite(uint64_t num_bytes, env_->SleepForMicroseconds(kDelayInterval); } mutex_.Lock(); - } while (bg_error_.ok() && write_controller_.IsStopped()) { diff --git a/db/db_impl_readonly.cc b/db/db_impl_readonly.cc index bf4c4b37f0..7537e4f4c4 100644 --- a/db/db_impl_readonly.cc +++ b/db/db_impl_readonly.cc @@ -65,7 +65,9 @@ Iterator* DBImplReadOnly::NewIterator(const ReadOptions& read_options, ->number_ : latest_snapshot), super_version->mutable_cf_options.max_sequential_skip_in_iterations, - super_version->version_number); + super_version->version_number, read_options.iterate_upper_bound, + read_options.prefix_same_as_start, read_options.pin_data, + read_options.total_order_seek, read_options.max_skippable_internal_keys); auto internal_iter = NewInternalIterator(read_options, cfd, super_version, db_iter->GetArena(), db_iter->GetRangeDelAggregator()); @@ -94,7 +96,10 @@ Status DBImplReadOnly::NewIterators( ->number_ : latest_snapshot), sv->mutable_cf_options.max_sequential_skip_in_iterations, - sv->version_number); + sv->version_number, read_options.iterate_upper_bound, + read_options.prefix_same_as_start, read_options.pin_data, + read_options.total_order_seek, + read_options.max_skippable_internal_keys); auto* internal_iter = NewInternalIterator(read_options, cfd, sv, db_iter->GetArena(), db_iter->GetRangeDelAggregator()); diff --git a/db/db_iter.cc b/db/db_iter.cc index e6723275a7..d2a0fbbd05 100644 --- a/db/db_iter.cc +++ b/db/db_iter.cc @@ -106,7 +106,8 @@ class DBIter: public Iterator { uint64_t max_sequential_skip_in_iterations, uint64_t version_number, const Slice* iterate_upper_bound = nullptr, bool prefix_same_as_start = false, bool pin_data = false, - bool total_order_seek = false) + bool total_order_seek = false, + uint64_t max_skippable_internal_keys = 0) : arena_mode_(arena_mode), env_(env), logger_(ioptions.info_log), @@ -128,6 +129,7 @@ class DBIter: public Iterator { RecordTick(statistics_, NO_ITERATORS); prefix_extractor_ = ioptions.prefix_extractor; max_skip_ = max_sequential_skip_in_iterations; + max_skippable_internal_keys_ = max_skippable_internal_keys; if (pin_thru_lifetime_) { pinned_iters_mgr_.StartPinning(); } @@ -224,6 +226,7 @@ class DBIter: public Iterator { void FindNextUserEntryInternal(bool skipping, bool prefix_check); bool ParseKey(ParsedInternalKey* key); void MergeValuesNewToOld(); + bool TooManyInternalKeysSkipped(bool increment = true); // Temporarily pin the blocks that we encounter until ReleaseTempPinnedData() // is called @@ -249,6 +252,10 @@ class DBIter: public Iterator { } } + inline void ResetInternalKeysSkippedCounter() { + num_internal_keys_skipped_ = 0; + } + const SliceTransform* prefix_extractor_; bool arena_mode_; Env* const env_; @@ -268,6 +275,8 @@ class DBIter: public Iterator { // for prefix seek mode to support prev() Statistics* statistics_; uint64_t max_skip_; + uint64_t max_skippable_internal_keys_; + uint64_t num_internal_keys_skipped_; uint64_t version_number_; const Slice* iterate_upper_bound_; IterKey prefix_start_buf_; @@ -304,6 +313,7 @@ void DBIter::Next() { // Release temporarily pinned blocks from last operation ReleaseTempPinnedData(); + ResetInternalKeysSkippedCounter(); if (direction_ == kReverse) { ReverseToForward(); } else if (iter_->Valid() && !current_entry_is_merged_) { @@ -390,6 +400,10 @@ void DBIter::FindNextUserEntryInternal(bool skipping, bool prefix_check) { break; } + if (TooManyInternalKeysSkipped()) { + return; + } + if (ikey.sequence <= sequence_) { if (skipping && user_comparator_->Compare(ikey.user_key, saved_key_.GetKey()) <= 0) { @@ -580,6 +594,7 @@ void DBIter::MergeValuesNewToOld() { void DBIter::Prev() { assert(valid_); ReleaseTempPinnedData(); + ResetInternalKeysSkippedCounter(); if (direction_ == kForward) { ReverseToBackward(); } @@ -658,6 +673,7 @@ void DBIter::PrevInternal() { while (iter_->Valid()) { saved_key_.SetKey(ExtractUserKey(iter_->key()), !iter_->IsKeyPinned() || !pin_thru_lifetime_ /* copy */); + if (FindValueForCurrentKey()) { valid_ = true; if (!iter_->Valid()) { @@ -674,6 +690,11 @@ void DBIter::PrevInternal() { } return; } + + if (TooManyInternalKeysSkipped(false)) { + return; + } + if (!iter_->Valid()) { break; } @@ -709,6 +730,10 @@ bool DBIter::FindValueForCurrentKey() { size_t num_skipped = 0; while (iter_->Valid() && ikey.sequence <= sequence_ && user_comparator_->Equal(ikey.user_key, saved_key_.GetKey())) { + if (TooManyInternalKeysSkipped()) { + return false; + } + // We iterate too much: let's use Seek() to avoid too much key comparisons if (num_skipped >= max_skip_) { return FindValueForCurrentKeyUsingSeek(); @@ -908,6 +933,10 @@ void DBIter::FindPrevUserKey() { while (iter_->Valid() && ((cmp = user_comparator_->Compare( ikey.user_key, saved_key_.GetKey())) == 0 || (cmp > 0 && ikey.sequence > sequence_))) { + if (TooManyInternalKeysSkipped()) { + return; + } + if (cmp == 0) { if (num_skipped >= max_skip_) { num_skipped = 0; @@ -930,6 +959,18 @@ void DBIter::FindPrevUserKey() { } } +bool DBIter::TooManyInternalKeysSkipped(bool increment) { + if ((max_skippable_internal_keys_ > 0) && + (num_internal_keys_skipped_ > max_skippable_internal_keys_)) { + valid_ = false; + status_ = Status::Incomplete("Too many internal keys skipped."); + return true; + } else if (increment) { + num_internal_keys_skipped_++; + } + return false; +} + // Skip all unparseable keys void DBIter::FindParseableKey(ParsedInternalKey* ikey, Direction direction) { while (iter_->Valid() && !ParseKey(ikey)) { @@ -944,6 +985,7 @@ void DBIter::FindParseableKey(ParsedInternalKey* ikey, Direction direction) { void DBIter::Seek(const Slice& target) { StopWatch sw(env_, statistics_, DB_SEEK); ReleaseTempPinnedData(); + ResetInternalKeysSkippedCounter(); saved_key_.Clear(); saved_key_.SetInternalKey(target, sequence_); @@ -985,6 +1027,7 @@ void DBIter::Seek(const Slice& target) { void DBIter::SeekForPrev(const Slice& target) { StopWatch sw(env_, statistics_, DB_SEEK); ReleaseTempPinnedData(); + ResetInternalKeysSkippedCounter(); saved_key_.Clear(); // now saved_key is used to store internal key. saved_key_.SetInternalKey(target, 0 /* sequence_number */, @@ -1030,6 +1073,7 @@ void DBIter::SeekToFirst() { } direction_ = kForward; ReleaseTempPinnedData(); + ResetInternalKeysSkippedCounter(); ClearSavedValue(); { @@ -1066,6 +1110,7 @@ void DBIter::SeekToLast() { } direction_ = kReverse; ReleaseTempPinnedData(); + ResetInternalKeysSkippedCounter(); ClearSavedValue(); { @@ -1105,11 +1150,13 @@ Iterator* NewDBIterator( const Comparator* user_key_comparator, InternalIterator* internal_iter, const SequenceNumber& sequence, uint64_t max_sequential_skip_in_iterations, uint64_t version_number, const Slice* iterate_upper_bound, - bool prefix_same_as_start, bool pin_data, bool total_order_seek) { - DBIter* db_iter = new DBIter( - env, ioptions, user_key_comparator, internal_iter, sequence, false, - max_sequential_skip_in_iterations, version_number, iterate_upper_bound, - prefix_same_as_start, pin_data, total_order_seek); + bool prefix_same_as_start, bool pin_data, bool total_order_seek, + uint64_t max_skippable_internal_keys) { + DBIter* db_iter = + new DBIter(env, ioptions, user_key_comparator, internal_iter, sequence, + false, max_sequential_skip_in_iterations, version_number, + iterate_upper_bound, prefix_same_as_start, pin_data, + total_order_seek, max_skippable_internal_keys); return db_iter; } @@ -1153,14 +1200,15 @@ ArenaWrappedDBIter* NewArenaWrappedDbIterator( const Comparator* user_key_comparator, const SequenceNumber& sequence, uint64_t max_sequential_skip_in_iterations, uint64_t version_number, const Slice* iterate_upper_bound, bool prefix_same_as_start, bool pin_data, - bool total_order_seek) { + bool total_order_seek, uint64_t max_skippable_internal_keys) { ArenaWrappedDBIter* iter = new ArenaWrappedDBIter(); Arena* arena = iter->GetArena(); auto mem = arena->AllocateAligned(sizeof(DBIter)); - DBIter* db_iter = new (mem) DBIter( - env, ioptions, user_key_comparator, nullptr, sequence, true, - max_sequential_skip_in_iterations, version_number, iterate_upper_bound, - prefix_same_as_start, pin_data, total_order_seek); + DBIter* db_iter = + new (mem) DBIter(env, ioptions, user_key_comparator, nullptr, sequence, + true, max_sequential_skip_in_iterations, version_number, + iterate_upper_bound, prefix_same_as_start, pin_data, + total_order_seek, max_skippable_internal_keys); iter->SetDBIter(db_iter); diff --git a/db/db_iter.h b/db/db_iter.h index ee77552211..cab42b8460 100644 --- a/db/db_iter.h +++ b/db/db_iter.h @@ -33,7 +33,7 @@ extern Iterator* NewDBIterator( const SequenceNumber& sequence, uint64_t max_sequential_skip_in_iterations, uint64_t version_number, const Slice* iterate_upper_bound = nullptr, bool prefix_same_as_start = false, bool pin_data = false, - bool total_order_seek = false); + bool total_order_seek = false, uint64_t max_skippable_internal_keys = 0); // A wrapper iterator which wraps DB Iterator and the arena, with which the DB // iterator is supposed be allocated. This class is used as an entry point of @@ -82,6 +82,6 @@ extern ArenaWrappedDBIter* NewArenaWrappedDbIterator( uint64_t max_sequential_skip_in_iterations, uint64_t version_number, const Slice* iterate_upper_bound = nullptr, bool prefix_same_as_start = false, bool pin_data = false, - bool total_order_seek = false); + bool total_order_seek = false, uint64_t max_skippable_internal_keys = 0); } // namespace rocksdb diff --git a/db/db_iter_test.cc b/db/db_iter_test.cc index f6492bd3a3..7dd5dfd00d 100644 --- a/db/db_iter_test.cc +++ b/db/db_iter_test.cc @@ -875,6 +875,378 @@ TEST_F(DBIteratorTest, DBIteratorUseSkip) { } } +TEST_F(DBIteratorTest, DBIteratorSkipInternalKeys) { + Options options; + ReadOptions ro; + + // Basic test case ... Make sure explicityly passing the default value works. + // Skipping internal keys is disabled by default, when the value is 0. + { + TestIterator* internal_iter = new TestIterator(BytewiseComparator()); + internal_iter->AddPut("a", "val_a"); + internal_iter->AddDeletion("b"); + internal_iter->AddDeletion("b"); + internal_iter->AddPut("c", "val_c"); + internal_iter->AddPut("c", "val_c"); + internal_iter->AddDeletion("c"); + internal_iter->AddPut("d", "val_d"); + internal_iter->Finish(); + + ro.max_skippable_internal_keys = 0; + std::unique_ptr db_iter(NewDBIterator( + env_, ImmutableCFOptions(options), BytewiseComparator(), internal_iter, + 10, options.max_sequential_skip_in_iterations, 0, nullptr, false, false, + false, ro.max_skippable_internal_keys)); + + db_iter->SeekToFirst(); + ASSERT_TRUE(db_iter->Valid()); + ASSERT_EQ(db_iter->key().ToString(), "a"); + ASSERT_EQ(db_iter->value().ToString(), "val_a"); + + db_iter->Next(); + ASSERT_TRUE(db_iter->Valid()); + ASSERT_EQ(db_iter->key().ToString(), "d"); + ASSERT_EQ(db_iter->value().ToString(), "val_d"); + + db_iter->Next(); + ASSERT_TRUE(!db_iter->Valid()); + ASSERT_TRUE(db_iter->status().ok()); + + db_iter->SeekToLast(); + ASSERT_TRUE(db_iter->Valid()); + ASSERT_EQ(db_iter->key().ToString(), "d"); + ASSERT_EQ(db_iter->value().ToString(), "val_d"); + + db_iter->Prev(); + ASSERT_TRUE(db_iter->Valid()); + ASSERT_EQ(db_iter->key().ToString(), "a"); + ASSERT_EQ(db_iter->value().ToString(), "val_a"); + + db_iter->Prev(); + ASSERT_TRUE(!db_iter->Valid()); + ASSERT_TRUE(db_iter->status().ok()); + } + + // Test to make sure that the request will *not* fail as incomplete if + // num_internal_keys_skipped is *equal* to max_skippable_internal_keys + // threshold. (It will fail as incomplete only when the threshold is + // exceeded.) + { + TestIterator* internal_iter = new TestIterator(BytewiseComparator()); + internal_iter->AddPut("a", "val_a"); + internal_iter->AddDeletion("b"); + internal_iter->AddDeletion("b"); + internal_iter->AddPut("c", "val_c"); + internal_iter->Finish(); + + ro.max_skippable_internal_keys = 2; + std::unique_ptr db_iter(NewDBIterator( + env_, ImmutableCFOptions(options), BytewiseComparator(), internal_iter, + 10, options.max_sequential_skip_in_iterations, 0, nullptr, false, false, + false, ro.max_skippable_internal_keys)); + + db_iter->SeekToFirst(); + ASSERT_TRUE(db_iter->Valid()); + ASSERT_EQ(db_iter->key().ToString(), "a"); + ASSERT_EQ(db_iter->value().ToString(), "val_a"); + + db_iter->Next(); + ASSERT_TRUE(db_iter->Valid()); + ASSERT_EQ(db_iter->key().ToString(), "c"); + ASSERT_EQ(db_iter->value().ToString(), "val_c"); + + db_iter->Next(); + ASSERT_TRUE(!db_iter->Valid()); + ASSERT_TRUE(db_iter->status().ok()); + + db_iter->SeekToLast(); + ASSERT_TRUE(db_iter->Valid()); + ASSERT_EQ(db_iter->key().ToString(), "c"); + ASSERT_EQ(db_iter->value().ToString(), "val_c"); + + db_iter->Prev(); + ASSERT_EQ(db_iter->key().ToString(), "a"); + ASSERT_EQ(db_iter->value().ToString(), "val_a"); + + db_iter->Prev(); + ASSERT_TRUE(!db_iter->Valid()); + ASSERT_TRUE(db_iter->status().ok()); + } + + // Fail the request as incomplete when num_internal_keys_skipped > + // max_skippable_internal_keys + { + TestIterator* internal_iter = new TestIterator(BytewiseComparator()); + internal_iter->AddPut("a", "val_a"); + internal_iter->AddDeletion("b"); + internal_iter->AddDeletion("b"); + internal_iter->AddDeletion("b"); + internal_iter->AddPut("c", "val_c"); + internal_iter->Finish(); + + ro.max_skippable_internal_keys = 2; + std::unique_ptr db_iter(NewDBIterator( + env_, ImmutableCFOptions(options), BytewiseComparator(), internal_iter, + 10, options.max_sequential_skip_in_iterations, 0, nullptr, false, false, + false, ro.max_skippable_internal_keys)); + + db_iter->SeekToFirst(); + ASSERT_TRUE(db_iter->Valid()); + ASSERT_EQ(db_iter->key().ToString(), "a"); + ASSERT_EQ(db_iter->value().ToString(), "val_a"); + + db_iter->Next(); + ASSERT_TRUE(!db_iter->Valid()); + ASSERT_TRUE(db_iter->status().IsIncomplete()); + + db_iter->SeekToLast(); + ASSERT_TRUE(db_iter->Valid()); + ASSERT_EQ(db_iter->key().ToString(), "c"); + ASSERT_EQ(db_iter->value().ToString(), "val_c"); + + db_iter->Prev(); + ASSERT_TRUE(!db_iter->Valid()); + ASSERT_TRUE(db_iter->status().IsIncomplete()); + } + + // Test that the num_internal_keys_skipped counter resets after a successful + // read. + { + TestIterator* internal_iter = new TestIterator(BytewiseComparator()); + internal_iter->AddPut("a", "val_a"); + internal_iter->AddDeletion("b"); + internal_iter->AddDeletion("b"); + internal_iter->AddPut("c", "val_c"); + internal_iter->AddDeletion("d"); + internal_iter->AddDeletion("d"); + internal_iter->AddDeletion("d"); + internal_iter->AddPut("e", "val_e"); + internal_iter->Finish(); + + ro.max_skippable_internal_keys = 2; + std::unique_ptr db_iter(NewDBIterator( + env_, ImmutableCFOptions(options), BytewiseComparator(), internal_iter, + 10, options.max_sequential_skip_in_iterations, 0, nullptr, false, false, + false, ro.max_skippable_internal_keys)); + + db_iter->SeekToFirst(); + ASSERT_TRUE(db_iter->Valid()); + ASSERT_EQ(db_iter->key().ToString(), "a"); + ASSERT_EQ(db_iter->value().ToString(), "val_a"); + + db_iter->Next(); + ASSERT_TRUE(db_iter->Valid()); + ASSERT_EQ(db_iter->key().ToString(), "c"); + ASSERT_EQ(db_iter->value().ToString(), "val_c"); + + db_iter->Next(); // num_internal_keys_skipped counter resets here. + ASSERT_TRUE(!db_iter->Valid()); + ASSERT_TRUE(db_iter->status().IsIncomplete()); + } + + // Test that the num_internal_keys_skipped counter resets after a successful + // read. + // Reverse direction + { + TestIterator* internal_iter = new TestIterator(BytewiseComparator()); + internal_iter->AddPut("a", "val_a"); + internal_iter->AddDeletion("b"); + internal_iter->AddDeletion("b"); + internal_iter->AddDeletion("b"); + internal_iter->AddPut("c", "val_c"); + internal_iter->AddDeletion("d"); + internal_iter->AddDeletion("d"); + internal_iter->AddPut("e", "val_e"); + internal_iter->Finish(); + + ro.max_skippable_internal_keys = 2; + std::unique_ptr db_iter(NewDBIterator( + env_, ImmutableCFOptions(options), BytewiseComparator(), internal_iter, + 10, options.max_sequential_skip_in_iterations, 0, nullptr, false, false, + false, ro.max_skippable_internal_keys)); + + db_iter->SeekToLast(); + ASSERT_TRUE(db_iter->Valid()); + ASSERT_EQ(db_iter->key().ToString(), "e"); + ASSERT_EQ(db_iter->value().ToString(), "val_e"); + + db_iter->Prev(); + ASSERT_TRUE(db_iter->Valid()); + ASSERT_EQ(db_iter->key().ToString(), "c"); + ASSERT_EQ(db_iter->value().ToString(), "val_c"); + + db_iter->Prev(); // num_internal_keys_skipped counter resets here. + ASSERT_TRUE(!db_iter->Valid()); + ASSERT_TRUE(db_iter->status().IsIncomplete()); + } + + // Test that skipping separate keys is handled + { + TestIterator* internal_iter = new TestIterator(BytewiseComparator()); + internal_iter->AddPut("a", "val_a"); + internal_iter->AddDeletion("b"); + internal_iter->AddDeletion("c"); + internal_iter->AddDeletion("d"); + internal_iter->AddPut("e", "val_e"); + internal_iter->Finish(); + + ro.max_skippable_internal_keys = 2; + std::unique_ptr db_iter(NewDBIterator( + env_, ImmutableCFOptions(options), BytewiseComparator(), internal_iter, + 10, options.max_sequential_skip_in_iterations, 0, nullptr, false, false, + false, ro.max_skippable_internal_keys)); + + db_iter->SeekToFirst(); + ASSERT_TRUE(db_iter->Valid()); + ASSERT_EQ(db_iter->key().ToString(), "a"); + ASSERT_EQ(db_iter->value().ToString(), "val_a"); + + db_iter->Next(); + ASSERT_TRUE(!db_iter->Valid()); + ASSERT_TRUE(db_iter->status().IsIncomplete()); + + db_iter->SeekToLast(); + ASSERT_TRUE(db_iter->Valid()); + ASSERT_EQ(db_iter->key().ToString(), "e"); + ASSERT_EQ(db_iter->value().ToString(), "val_e"); + + db_iter->Prev(); + ASSERT_TRUE(!db_iter->Valid()); + ASSERT_TRUE(db_iter->status().IsIncomplete()); + } + + // Test if alternating puts and deletes of the same key are handled correctly. + { + TestIterator* internal_iter = new TestIterator(BytewiseComparator()); + internal_iter->AddPut("a", "val_a"); + internal_iter->AddPut("b", "val_b"); + internal_iter->AddDeletion("b"); + internal_iter->AddPut("c", "val_c"); + internal_iter->AddDeletion("c"); + internal_iter->AddPut("d", "val_d"); + internal_iter->AddDeletion("d"); + internal_iter->AddPut("e", "val_e"); + internal_iter->Finish(); + + ro.max_skippable_internal_keys = 2; + std::unique_ptr db_iter(NewDBIterator( + env_, ImmutableCFOptions(options), BytewiseComparator(), internal_iter, + 10, options.max_sequential_skip_in_iterations, 0, nullptr, false, false, + false, ro.max_skippable_internal_keys)); + + db_iter->SeekToFirst(); + ASSERT_TRUE(db_iter->Valid()); + ASSERT_EQ(db_iter->key().ToString(), "a"); + ASSERT_EQ(db_iter->value().ToString(), "val_a"); + + db_iter->Next(); + ASSERT_TRUE(!db_iter->Valid()); + ASSERT_TRUE(db_iter->status().IsIncomplete()); + + db_iter->SeekToLast(); + ASSERT_TRUE(db_iter->Valid()); + ASSERT_EQ(db_iter->key().ToString(), "e"); + ASSERT_EQ(db_iter->value().ToString(), "val_e"); + + db_iter->Prev(); + ASSERT_TRUE(!db_iter->Valid()); + ASSERT_TRUE(db_iter->status().IsIncomplete()); + } + + // Test for large number of skippable internal keys with *default* + // max_sequential_skip_in_iterations. + { + for (size_t i = 1; i <= 200; ++i) { + TestIterator* internal_iter = new TestIterator(BytewiseComparator()); + internal_iter->AddPut("a", "val_a"); + for (size_t j = 1; j <= i; ++j) { + internal_iter->AddPut("b", "val_b"); + internal_iter->AddDeletion("b"); + } + internal_iter->AddPut("c", "val_c"); + internal_iter->Finish(); + + ro.max_skippable_internal_keys = i; + std::unique_ptr db_iter(NewDBIterator( + env_, ImmutableCFOptions(options), BytewiseComparator(), + internal_iter, 2 * i + 1, options.max_sequential_skip_in_iterations, + 0, nullptr, false, false, false, ro.max_skippable_internal_keys)); + + db_iter->SeekToFirst(); + ASSERT_TRUE(db_iter->Valid()); + ASSERT_EQ(db_iter->key().ToString(), "a"); + ASSERT_EQ(db_iter->value().ToString(), "val_a"); + + db_iter->Next(); + if ((options.max_sequential_skip_in_iterations + 1) >= + ro.max_skippable_internal_keys) { + ASSERT_TRUE(!db_iter->Valid()); + ASSERT_TRUE(db_iter->status().IsIncomplete()); + } else { + ASSERT_TRUE(db_iter->Valid()); + ASSERT_EQ(db_iter->key().ToString(), "c"); + ASSERT_EQ(db_iter->value().ToString(), "val_c"); + } + + db_iter->SeekToLast(); + ASSERT_TRUE(db_iter->Valid()); + ASSERT_EQ(db_iter->key().ToString(), "c"); + ASSERT_EQ(db_iter->value().ToString(), "val_c"); + + db_iter->Prev(); + if ((options.max_sequential_skip_in_iterations + 1) >= + ro.max_skippable_internal_keys) { + ASSERT_TRUE(!db_iter->Valid()); + ASSERT_TRUE(db_iter->status().IsIncomplete()); + } else { + ASSERT_TRUE(db_iter->Valid()); + ASSERT_EQ(db_iter->key().ToString(), "a"); + ASSERT_EQ(db_iter->value().ToString(), "val_a"); + } + } + } + + // Test for large number of skippable internal keys with a *non-default* + // max_sequential_skip_in_iterations. + { + for (size_t i = 1; i <= 200; ++i) { + TestIterator* internal_iter = new TestIterator(BytewiseComparator()); + internal_iter->AddPut("a", "val_a"); + for (size_t j = 1; j <= i; ++j) { + internal_iter->AddPut("b", "val_b"); + internal_iter->AddDeletion("b"); + } + internal_iter->AddPut("c", "val_c"); + internal_iter->Finish(); + + options.max_sequential_skip_in_iterations = 1000; + ro.max_skippable_internal_keys = i; + std::unique_ptr db_iter(NewDBIterator( + env_, ImmutableCFOptions(options), BytewiseComparator(), + internal_iter, 2 * i + 1, options.max_sequential_skip_in_iterations, + 0, nullptr, false, false, false, ro.max_skippable_internal_keys)); + + db_iter->SeekToFirst(); + ASSERT_TRUE(db_iter->Valid()); + ASSERT_EQ(db_iter->key().ToString(), "a"); + ASSERT_EQ(db_iter->value().ToString(), "val_a"); + + db_iter->Next(); + ASSERT_TRUE(!db_iter->Valid()); + ASSERT_TRUE(db_iter->status().IsIncomplete()); + + db_iter->SeekToLast(); + ASSERT_TRUE(db_iter->Valid()); + ASSERT_EQ(db_iter->key().ToString(), "c"); + ASSERT_EQ(db_iter->value().ToString(), "val_c"); + + db_iter->Prev(); + ASSERT_TRUE(!db_iter->Valid()); + ASSERT_TRUE(db_iter->status().IsIncomplete()); + } + } +} + TEST_F(DBIteratorTest, DBIterator1) { Options options; options.merge_operator = MergeOperators::CreateFromStringId("stringappend"); diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index d6195e9e70..60ccace65c 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -997,6 +997,12 @@ struct ReadOptions { // Default: false bool ignore_range_deletions; + // A threshold for the number of keys that can be skipped before failing an + // iterator seek as incomplete. The default value of 0 should be used to + // never fail a request as incomplete, even on skipping too many keys. + // Default: 0 + uint64_t max_skippable_internal_keys; + ReadOptions(); ReadOptions(bool cksum, bool cache); }; diff --git a/util/options.cc b/util/options.cc index 58edee85ca..612d4b2eb0 100644 --- a/util/options.cc +++ b/util/options.cc @@ -592,8 +592,8 @@ ReadOptions::ReadOptions() pin_data(false), background_purge_on_iterator_cleanup(false), readahead_size(0), - ignore_range_deletions(false) { -} + ignore_range_deletions(false), + max_skippable_internal_keys(0) {} ReadOptions::ReadOptions(bool cksum, bool cache) : verify_checksums(cksum), @@ -608,7 +608,7 @@ ReadOptions::ReadOptions(bool cksum, bool cache) pin_data(false), background_purge_on_iterator_cleanup(false), readahead_size(0), - ignore_range_deletions(false) { -} + ignore_range_deletions(false), + max_skippable_internal_keys(0) {} } // namespace rocksdb