Change and clarify the relationship between Valid(), status() and Seek*() for all iterators. Also fix some bugs

Summary:
Before this PR, Iterator/InternalIterator may simultaneously have non-ok status() and Valid() = true. That state means that the last operation failed, but the iterator is nevertheless positioned on some unspecified record. Likely intended uses of that are:
 * If some sst files are corrupted, a normal iterator can be used to read the data from files that are not corrupted.
 * When using read_tier = kBlockCacheTier, read the data that's in block cache, skipping over the data that is not.

However, this behavior wasn't documented well (and until recently the wiki on github had misleading incorrect information). In the code there's a lot of confusion about the relationship between status() and Valid(), and about whether Seek()/SeekToLast()/etc reset the status or not. There were a number of bugs caused by this confusion, both inside rocksdb and in the code that uses rocksdb (including ours).

This PR changes the convention to:
 * If status() is not ok, Valid() always returns false.
 * Any seek operation resets status. (Before the PR, it depended on iterator type and on particular error.)

This does sacrifice the two use cases listed above, but siying said it's ok.

Overview of the changes:
 * A commit that adds missing status checks in MergingIterator. This fixes a bug that actually affects us, and we need it fixed. `DBIteratorTest.NonBlockingIterationBugRepro` explains the scenario.
 * Changes to lots of iterator types to make all of them conform to the new convention. Some bug fixes along the way. By far the biggest changes are in DBIter, which is a big messy piece of code; I tried to make it less big and messy but mostly failed.
 * A stress-test for DBIter, to gain some confidence that I didn't break it. It does a few million random operations on the iterator, while occasionally modifying the underlying data (like ForwardIterator does) and occasionally returning non-ok status from internal iterator.

To find the iterator types that needed changes I searched for "public .*Iterator" in the code. Here's an overview of all 27 iterator types:

Iterators that didn't need changes:
 * status() is always ok(), or Valid() is always false: MemTableIterator, ModelIter, TestIterator, KVIter (2 classes with this name anonymous namespaces), LoggingForwardVectorIterator, VectorIterator, MockTableIterator, EmptyIterator, EmptyInternalIterator.
 * Thin wrappers that always pass through Valid() and status(): ArenaWrappedDBIter, TtlIterator, InternalIteratorFromIterator.

Iterators with changes (see inline comments for details):
 * DBIter - an overhaul:
    - It used to silently skip corrupted keys (`FindParseableKey()`), which seems dangerous. This PR makes it just stop immediately after encountering a corrupted key, just like it would for other kinds of corruption. Let me know if there was actually some deeper meaning in this behavior and I should put it back.
    - It had a few code paths silently discarding subiterator's status. The stress test caught a few.
    - The backwards iteration code path was expecting the internal iterator's set of keys to be immutable. It's probably always true in practice at the moment, since ForwardIterator doesn't support backwards iteration, but this PR fixes it anyway. See added DBIteratorTest.ReverseToForwardBug for an example.
    - Some parts of backwards iteration code path even did things like `assert(iter_->Valid())` after a seek, which is never a safe assumption.
    - It used to not reset status on seek for some types of errors.
    - Some simplifications and better comments.
    - Some things got more complicated from the added error handling. I'm open to ideas for how to make it nicer.
 * MergingIterator - check status after every operation on every subiterator, and in some places assert that valid subiterators have ok status.
 * ForwardIterator - changed to the new convention, also slightly simplified.
 * ForwardLevelIterator - fixed some bugs and simplified.
 * LevelIterator - simplified.
 * TwoLevelIterator - changed to the new convention. Also fixed a bug that would make SeekForPrev() sometimes silently ignore errors from first_level_iter_.
 * BlockBasedTableIterator - minor changes.
 * BlockIter - replaced `SetStatus()` with `Invalidate()` to make sure non-ok BlockIter is always invalid.
 * PlainTableIterator - some seeks used to not reset status.
 * CuckooTableIterator - tiny code cleanup.
 * ManagedIterator - fixed some bugs.
 * BaseDeltaIterator - changed to the new convention and fixed a bug.
 * BlobDBIterator - seeks used to not reset status.
 * KeyConvertingIterator - some small change.
Closes https://github.com/facebook/rocksdb/pull/3810

Differential Revision: D7888019

Pulled By: al13n321

fbshipit-source-id: 4aaf6d3421c545d16722a815b2fa2e7912bc851d
This commit is contained in:
Mike Kolupaev 2018-05-17 02:44:14 -07:00 committed by Facebook Github Bot
parent 46fde6b653
commit 8bf555f487
28 changed files with 1345 additions and 382 deletions

View File

@ -845,6 +845,7 @@ if(WITH_TESTS)
db/db_inplace_update_test.cc
db/db_io_failure_test.cc
db/db_iter_test.cc
db/db_iter_stress_test.cc
db/db_iterator_test.cc
db/db_log_iter_test.cc
db/db_memtable_test.cc

View File

@ -7,6 +7,8 @@
* Touch-up to write-related counters in PerfContext. New counters added: write_scheduling_flushes_compactions_time, write_thread_wait_nanos. Counters whose behavior was fixed or modified: write_memtable_time, write_pre_and_post_process_time, write_delay_time.
* Posix Env's NewRandomRWFile() will fail if the file doesn't exist.
* Now, `DBOptions::use_direct_io_for_flush_and_compaction` only applies to background writes, and `DBOptions::use_direct_reads` applies to both user reads and background reads. This conforms with Linux's `open(2)` manpage, which advises against simultaneously reading a file in buffered and direct modes, due to possibly undefined behavior and degraded performance.
* Iterator::Valid() always returns false if !status().ok(). So, now when doing a Seek() followed by some Next()s, there's no need to check status() after every operation.
* Iterator::Seek()/SeekForPrev()/SeekToFirst()/SeekToLast() always resets status().
### New Features
* Introduce TTL for level compaction so that all files older than ttl go through the compaction process to get rid of old data.

View File

@ -402,6 +402,7 @@ TESTS = \
db_blob_index_test \
db_bloom_filter_test \
db_iter_test \
db_iter_stress_test \
db_log_iter_test \
db_compaction_filter_test \
db_compaction_test \
@ -1195,6 +1196,9 @@ db_tailing_iter_test: db/db_tailing_iter_test.o db/db_test_util.o $(LIBOBJECTS)
db_iter_test: db/db_iter_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)
db_iter_stress_test: db/db_iter_stress_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)
db_universal_compaction_test: db/db_universal_compaction_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)

View File

@ -560,6 +560,11 @@ ROCKS_TESTS = [
"db/db_iter_test.cc",
"serial",
],
[
"db_iter_stress_test",
"db/db_iter_stress_test.cc",
"serial",
],
[
"db_iterator_test",
"db/db_iterator_test.cc",

View File

@ -333,9 +333,9 @@ TEST_F(CorruptionTest, TableFileIndexData) {
Corrupt(kTableFile, -2000, 500);
Reopen();
dbi = reinterpret_cast<DBImpl*>(db_);
// one full file should be readable, since only one was corrupted
// one full file may be readable, since only one was corrupted
// the other file should be fully non-readable, since index was corrupted
Check(5000, 5000);
Check(0, 5000);
ASSERT_NOK(dbi->VerifyChecksum());
}

View File

@ -52,8 +52,13 @@ class DBIter final: public Iterator {
public:
// The following is grossly complicated. TODO: clean it up
// Which direction is the iterator currently moving?
// (1) When moving forward, the internal iterator is positioned at
// the exact entry that yields this->key(), this->value()
// (1) When moving forward:
// (1a) if current_entry_is_merged_ = false, the internal iterator is
// positioned at the exact entry that yields this->key(), this->value()
// (1b) if current_entry_is_merged_ = true, the internal iterator is
// positioned immediately after the last entry that contributed to the
// current this->value(). That entry may or may not have key equal to
// this->key().
// (2) When moving backwards, the internal iterator is positioned
// just before all entries whose user key == this->key().
enum Direction {
@ -194,6 +199,7 @@ class DBIter final: public Iterator {
if (status_.ok()) {
return iter_->status();
} else {
assert(!valid_);
return status_;
}
}
@ -235,18 +241,21 @@ class DBIter final: public Iterator {
void set_valid(bool v) { valid_ = v; }
private:
void ReverseToForward();
void ReverseToBackward();
void PrevInternal();
void FindParseableKey(ParsedInternalKey* ikey, Direction direction);
// For all methods in this block:
// PRE: iter_->Valid() && status_.ok()
// Return false if there was an error, and status() is non-ok, valid_ = false;
// in this case callers would usually stop what they were doing and return.
bool ReverseToForward();
bool ReverseToBackward();
bool FindValueForCurrentKey();
bool FindValueForCurrentKeyUsingSeek();
void FindPrevUserKey();
void FindNextUserKey();
inline void FindNextUserEntry(bool skipping, bool prefix_check);
void FindNextUserEntryInternal(bool skipping, bool prefix_check);
bool FindUserKeyBeforeSavedKey();
inline bool FindNextUserEntry(bool skipping, bool prefix_check);
bool FindNextUserEntryInternal(bool skipping, bool prefix_check);
bool ParseKey(ParsedInternalKey* key);
void MergeValuesNewToOld();
bool MergeValuesNewToOld();
void PrevInternal();
bool TooManyInternalKeysSkipped(bool increment = true);
bool IsVisible(SequenceNumber sequence);
@ -336,6 +345,7 @@ class DBIter final: public Iterator {
inline bool DBIter::ParseKey(ParsedInternalKey* ikey) {
if (!ParseInternalKey(iter_->key(), ikey)) {
status_ = Status::Corruption("corrupted internal key in DBIter");
valid_ = false;
ROCKS_LOG_ERROR(logger_, "corrupted internal key in DBIter: %s",
iter_->key().ToString(true).c_str());
return false;
@ -346,12 +356,16 @@ inline bool DBIter::ParseKey(ParsedInternalKey* ikey) {
void DBIter::Next() {
assert(valid_);
assert(status_.ok());
// Release temporarily pinned blocks from last operation
ReleaseTempPinnedData();
ResetInternalKeysSkippedCounter();
bool ok = true;
if (direction_ == kReverse) {
ReverseToForward();
if (!ReverseToForward()) {
ok = false;
}
} else if (iter_->Valid() && !current_entry_is_merged_) {
// If the current value is not a merge, the iter position is the
// current key, which is already returned. We can safely issue a
@ -365,13 +379,12 @@ void DBIter::Next() {
if (statistics_ != nullptr) {
local_stats_.next_count_++;
}
// Now we point to the next internal position, for both of merge and
// not merge cases.
if (!iter_->Valid()) {
if (ok && iter_->Valid()) {
FindNextUserEntry(true /* skipping the current user key */,
prefix_same_as_start_);
} else {
valid_ = false;
return;
}
FindNextUserEntry(true /* skipping the current user key */, prefix_same_as_start_);
if (statistics_ != nullptr && valid_) {
local_stats_.next_found_count_++;
local_stats_.bytes_read_ += (key().size() + value().size());
@ -392,15 +405,16 @@ void DBIter::Next() {
// keys against the prefix of the seeked key. Set to false when
// performing a seek without a key (e.g. SeekToFirst). Set to
// prefix_same_as_start_ for other iterations.
inline void DBIter::FindNextUserEntry(bool skipping, bool prefix_check) {
inline bool DBIter::FindNextUserEntry(bool skipping, bool prefix_check) {
PERF_TIMER_GUARD(find_next_user_entry_time);
FindNextUserEntryInternal(skipping, prefix_check);
return FindNextUserEntryInternal(skipping, prefix_check);
}
// Actual implementation of DBIter::FindNextUserEntry()
void DBIter::FindNextUserEntryInternal(bool skipping, bool prefix_check) {
bool DBIter::FindNextUserEntryInternal(bool skipping, bool prefix_check) {
// Loop until we hit an acceptable entry to yield
assert(iter_->Valid());
assert(status_.ok());
assert(direction_ == kForward);
current_entry_is_merged_ = false;
@ -420,9 +434,7 @@ void DBIter::FindNextUserEntryInternal(bool skipping, bool prefix_check) {
do {
if (!ParseKey(&ikey_)) {
// Skip corrupted keys.
iter_->Next();
continue;
return false;
}
if (iterate_upper_bound_ != nullptr &&
@ -437,7 +449,7 @@ void DBIter::FindNextUserEntryInternal(bool skipping, bool prefix_check) {
}
if (TooManyInternalKeysSkipped()) {
return;
return false;
}
if (IsVisible(ikey_.sequence)) {
@ -455,18 +467,18 @@ void DBIter::FindNextUserEntryInternal(bool skipping, bool prefix_check) {
// if iterartor specified start_seqnum we
// 1) return internal key, including the type
// 2) return ikey only if ikey.seqnum >= start_seqnum_
// not that if deletion seqnum is < start_seqnum_ we
// note that if deletion seqnum is < start_seqnum_ we
// just skip it like in normal iterator.
if (start_seqnum_ > 0 && ikey_.sequence >= start_seqnum_) {
saved_key_.SetInternalKey(ikey_);
valid_=true;
return;
valid_ = true;
return true;
} else {
saved_key_.SetUserKey(
ikey_.user_key,
!pin_thru_lifetime_ || !iter_->IsKeyPinned() /* copy */);
skipping = true;
PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
skipping = true;
PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
}
break;
case kTypeValue:
@ -480,7 +492,7 @@ void DBIter::FindNextUserEntryInternal(bool skipping, bool prefix_check) {
if (ikey_.sequence >= start_seqnum_) {
saved_key_.SetInternalKey(ikey_);
valid_ = true;
return;
return true;
} else {
// this key and all previous versions shouldn't be included,
// skipping
@ -507,14 +519,15 @@ void DBIter::FindNextUserEntryInternal(bool skipping, bool prefix_check) {
"Encounter unexpected blob index. Please open DB with "
"rocksdb::blob_db::BlobDB instead.");
valid_ = false;
} else {
is_blob_ = true;
valid_ = true;
return false;
}
return;
is_blob_ = true;
valid_ = true;
return true;
} else {
valid_ = true;
return;
return true;
}
}
break;
@ -535,8 +548,7 @@ void DBIter::FindNextUserEntryInternal(bool skipping, bool prefix_check) {
// value
current_entry_is_merged_ = true;
valid_ = true;
MergeValuesNewToOld(); // Go to a different state machine
return;
return MergeValuesNewToOld(); // Go to a different state machine
}
break;
default:
@ -545,13 +557,14 @@ void DBIter::FindNextUserEntryInternal(bool skipping, bool prefix_check) {
}
}
} else {
// This key was inserted after our snapshot was taken.
PERF_COUNTER_ADD(internal_recent_skipped_count, 1);
// Here saved_key_ may contain some old key, or the default empty key, or
// key assigned by some random other method. We don't care.
if (user_comparator_->Compare(ikey_.user_key, saved_key_.GetUserKey()) <=
0) {
// This key was inserted after our snapshot was taken.
// If this happens too many times in a row for the same user key, we want
// to seek to the target sequence number.
int cmp =
user_comparator_->Compare(ikey_.user_key, saved_key_.GetUserKey());
if (cmp == 0 || (skipping && cmp <= 0)) {
num_skipped++;
} else {
saved_key_.SetUserKey(
@ -591,7 +604,9 @@ void DBIter::FindNextUserEntryInternal(bool skipping, bool prefix_check) {
iter_->Next();
}
} while (iter_->Valid());
valid_ = false;
return iter_->status().ok();
}
// Merge values of the same user key starting from the current iter_ position
@ -600,12 +615,12 @@ void DBIter::FindNextUserEntryInternal(bool skipping, bool prefix_check) {
// saved_key_ stores the user key
// POST: saved_value_ has the merged value for the user key
// iter_ points to the next entry (or invalid)
void DBIter::MergeValuesNewToOld() {
bool DBIter::MergeValuesNewToOld() {
if (!merge_operator_) {
ROCKS_LOG_ERROR(logger_, "Options::merge_operator is null.");
status_ = Status::InvalidArgument("merge_operator_ must be set.");
valid_ = false;
return;
return false;
}
// Temporarily pin the blocks that hold merge operands
@ -621,8 +636,7 @@ void DBIter::MergeValuesNewToOld() {
for (iter_->Next(); iter_->Valid(); iter_->Next()) {
TEST_SYNC_POINT("DBIter::MergeValuesNewToOld:SteppedToNextOperand");
if (!ParseKey(&ikey)) {
// skip corrupted key
continue;
return false;
}
if (!user_comparator_->Equal(ikey.user_key, saved_key_.GetUserKey())) {
@ -639,7 +653,6 @@ void DBIter::MergeValuesNewToOld() {
} else if (kTypeValue == ikey.type) {
// hit a put, merge the put value with operands and store the
// final result in saved_value_. We are done!
// ignore corruption if there is any.
const Slice val = iter_->value();
s = MergeHelper::TimedFullMerge(
merge_operator_, ikey.user_key, &val, merge_context_.GetOperands(),
@ -647,10 +660,15 @@ void DBIter::MergeValuesNewToOld() {
if (!s.ok()) {
valid_ = false;
status_ = s;
return false;
}
// iter_ is positioned after put
iter_->Next();
return;
if (!iter_->status().ok()) {
valid_ = false;
return false;
}
return true;
} else if (kTypeMerge == ikey.type) {
// hit a merge, add the value as an operand and run associative merge.
// when complete, add result to operands and continue.
@ -668,12 +686,17 @@ void DBIter::MergeValuesNewToOld() {
Status::NotSupported("Blob DB does not support merge operator.");
}
valid_ = false;
return;
return false;
} else {
assert(false);
}
}
if (!iter_->status().ok()) {
valid_ = false;
return false;
}
// we either exhausted all internal keys under this user key, or hit
// a deletion marker.
// feed null as the existing value to the merge operator, such that
@ -685,17 +708,27 @@ void DBIter::MergeValuesNewToOld() {
if (!s.ok()) {
valid_ = false;
status_ = s;
return false;
}
assert(status_.ok());
return true;
}
void DBIter::Prev() {
assert(valid_);
assert(status_.ok());
ReleaseTempPinnedData();
ResetInternalKeysSkippedCounter();
bool ok = true;
if (direction_ == kForward) {
ReverseToBackward();
if (!ReverseToBackward()) {
ok = false;
}
}
if (ok) {
PrevInternal();
}
PrevInternal();
if (statistics_ != nullptr) {
local_stats_.prev_count_++;
if (valid_) {
@ -705,71 +738,76 @@ void DBIter::Prev() {
}
}
void DBIter::ReverseToForward() {
if (prefix_extractor_ != nullptr && !total_order_seek_) {
bool DBIter::ReverseToForward() {
assert(iter_->status().ok());
// When moving backwards, iter_ is positioned on _previous_ key, which may
// not exist or may have different prefix than the current key().
// If that's the case, seek iter_ to current key.
if ((prefix_extractor_ != nullptr && !total_order_seek_) || !iter_->Valid()) {
IterKey last_key;
last_key.SetInternalKey(ParsedInternalKey(
saved_key_.GetUserKey(), kMaxSequenceNumber, kValueTypeForSeek));
iter_->Seek(last_key.GetInternalKey());
}
FindNextUserKey();
direction_ = kForward;
if (!iter_->Valid()) {
iter_->SeekToFirst();
range_del_agg_.InvalidateTombstoneMapPositions();
// Skip keys less than the current key() (a.k.a. saved_key_).
while (iter_->Valid()) {
ParsedInternalKey ikey;
if (!ParseKey(&ikey)) {
return false;
}
if (user_comparator_->Compare(ikey.user_key, saved_key_.GetUserKey()) >=
0) {
return true;
}
iter_->Next();
}
if (!iter_->status().ok()) {
valid_ = false;
return false;
}
return true;
}
void DBIter::ReverseToBackward() {
if (prefix_extractor_ != nullptr && !total_order_seek_) {
IterKey last_key;
last_key.SetInternalKey(ParsedInternalKey(saved_key_.GetUserKey(), 0,
kValueTypeForSeekForPrev));
iter_->SeekForPrev(last_key.GetInternalKey());
}
if (current_entry_is_merged_) {
// Not placed in the same key. Need to call Prev() until finding the
// previous key.
if (!iter_->Valid()) {
iter_->SeekToLast();
range_del_agg_.InvalidateTombstoneMapPositions();
}
ParsedInternalKey ikey;
FindParseableKey(&ikey, kReverse);
while (iter_->Valid() &&
user_comparator_->Compare(ikey.user_key, saved_key_.GetUserKey()) >
0) {
assert(ikey.sequence != kMaxSequenceNumber);
if (!IsVisible(ikey.sequence)) {
PERF_COUNTER_ADD(internal_recent_skipped_count, 1);
} else {
PERF_COUNTER_ADD(internal_key_skipped_count, 1);
}
iter_->Prev();
FindParseableKey(&ikey, kReverse);
}
}
#ifndef NDEBUG
if (iter_->Valid()) {
ParsedInternalKey ikey;
assert(ParseKey(&ikey));
assert(user_comparator_->Compare(ikey.user_key, saved_key_.GetUserKey()) <=
0);
}
#endif
// Move iter_ to the key before saved_key_.
bool DBIter::ReverseToBackward() {
assert(iter_->status().ok());
// When current_entry_is_merged_ is true, iter_ may be positioned on the next
// key, which may not exist or may have prefix different from current.
// If that's the case, seek to saved_key_.
if (current_entry_is_merged_ &&
((prefix_extractor_ != nullptr && !total_order_seek_) ||
!iter_->Valid())) {
IterKey last_key;
// Using kMaxSequenceNumber and kValueTypeForSeek
// (not kValueTypeForSeekForPrev) to seek to a key strictly smaller
// than saved_key_.
last_key.SetInternalKey(ParsedInternalKey(
saved_key_.GetUserKey(), kMaxSequenceNumber, kValueTypeForSeek));
if (prefix_extractor_ != nullptr && !total_order_seek_) {
iter_->SeekForPrev(last_key.GetInternalKey());
} else {
// Some iterators may not support SeekForPrev(), so we avoid using it
// when prefix seek mode is disabled. This is somewhat expensive
// (an extra Prev(), as well as an extra change of direction of iter_),
// so we may need to reconsider it later.
iter_->Seek(last_key.GetInternalKey());
if (!iter_->Valid() && iter_->status().ok()) {
iter_->SeekToLast();
}
}
}
FindPrevUserKey();
direction_ = kReverse;
return FindUserKeyBeforeSavedKey();
}
void DBIter::PrevInternal() {
if (!iter_->Valid()) {
valid_ = false;
return;
}
ParsedInternalKey ikey;
while (iter_->Valid()) {
saved_key_.SetUserKey(
ExtractUserKey(iter_->key()),
@ -791,38 +829,41 @@ void DBIter::PrevInternal() {
return;
}
if (FindValueForCurrentKey()) {
if (!iter_->Valid()) {
return;
}
FindParseableKey(&ikey, kReverse);
if (user_comparator_->Equal(ikey.user_key, saved_key_.GetUserKey())) {
FindPrevUserKey();
}
if (!FindValueForCurrentKey()) { // assigns valid_
return;
}
// Whether or not we found a value for current key, we need iter_ to end up
// on a smaller key.
if (!FindUserKeyBeforeSavedKey()) {
return;
}
if (valid_) {
// Found the value.
return;
}
if (TooManyInternalKeysSkipped(false)) {
return;
}
if (!iter_->Valid()) {
break;
}
FindParseableKey(&ikey, kReverse);
if (user_comparator_->Equal(ikey.user_key, saved_key_.GetUserKey())) {
FindPrevUserKey();
}
}
// We haven't found any key - iterator is not valid
// Or the prefix is different than start prefix
assert(!iter_->Valid());
valid_ = false;
}
// This function checks, if the entry with biggest sequence_number <= sequence_
// is non kTypeDeletion or kTypeSingleDeletion. If it's not, we save value in
// saved_value_
// Used for backwards iteration.
// Looks at the entries with user key saved_key_ and finds the most up-to-date
// value for it, or executes a merge, or determines that the value was deleted.
// Sets valid_ to true if the value is found and is ready to be presented to
// the user through value().
// Sets valid_ to false if the value was deleted, and we should try another key.
// Returns false if an error occurred, and !status().ok() and !valid_.
//
// PRE: iter_ is positioned on the last entry with user key equal to saved_key_.
// POST: iter_ is positioned on one of the entries equal to saved_key_, or on
// the entry just before them, or on the entry just after them.
bool DBIter::FindValueForCurrentKey() {
assert(iter_->Valid());
merge_context_.Clear();
@ -832,20 +873,27 @@ bool DBIter::FindValueForCurrentKey() {
ValueType last_not_merge_type = kTypeDeletion;
ValueType last_key_entry_type = kTypeDeletion;
ParsedInternalKey ikey;
FindParseableKey(&ikey, kReverse);
// Temporarily pin blocks that hold (merge operands / the value)
ReleaseTempPinnedData();
TempPinData();
size_t num_skipped = 0;
while (iter_->Valid() && IsVisible(ikey.sequence) &&
user_comparator_->Equal(ikey.user_key, saved_key_.GetUserKey())) {
while (iter_->Valid()) {
ParsedInternalKey ikey;
if (!ParseKey(&ikey)) {
return false;
}
if (!IsVisible(ikey.sequence) ||
!user_comparator_->Equal(ikey.user_key, saved_key_.GetUserKey())) {
break;
}
if (TooManyInternalKeysSkipped()) {
return false;
}
// We iterate too much: let's use Seek() to avoid too much key comparisons
// This user key has lots of entries.
// We're going from old to new, and it's taking too long. Let's do a Seek()
// and go from new to old. This helps when a key was overwritten many times.
if (num_skipped >= max_skip_) {
return FindValueForCurrentKeyUsingSeek();
}
@ -892,10 +940,13 @@ bool DBIter::FindValueForCurrentKey() {
}
PERF_COUNTER_ADD(internal_key_skipped_count, 1);
assert(user_comparator_->Equal(ikey.user_key, saved_key_.GetUserKey()));
iter_->Prev();
++num_skipped;
FindParseableKey(&ikey, kReverse);
}
if (!iter_->status().ok()) {
valid_ = false;
return false;
}
Status s;
@ -905,7 +956,7 @@ bool DBIter::FindValueForCurrentKey() {
case kTypeSingleDeletion:
case kTypeRangeDeletion:
valid_ = false;
return false;
return true;
case kTypeMerge:
current_entry_is_merged_ = true;
if (last_not_merge_type == kTypeDeletion ||
@ -926,7 +977,7 @@ bool DBIter::FindValueForCurrentKey() {
Status::NotSupported("Blob DB does not support merge operator.");
}
valid_ = false;
return true;
return false;
} else {
assert(last_not_merge_type == kTypeValue);
s = MergeHelper::TimedFullMerge(
@ -936,7 +987,7 @@ bool DBIter::FindValueForCurrentKey() {
}
break;
case kTypeValue:
// do nothing - we've already has value in saved_value_
// do nothing - we've already has value in pinned_value_
break;
case kTypeBlobIndex:
if (!allow_blob_) {
@ -945,7 +996,7 @@ bool DBIter::FindValueForCurrentKey() {
"Encounter unexpected blob index. Please open DB with "
"rocksdb::blob_db::BlobDB instead.");
valid_ = false;
return true;
return false;
}
is_blob_ = true;
break;
@ -953,17 +1004,19 @@ bool DBIter::FindValueForCurrentKey() {
assert(false);
break;
}
if (s.ok()) {
valid_ = true;
} else {
if (!s.ok()) {
valid_ = false;
status_ = s;
return false;
}
valid_ = true;
return true;
}
// This function is used in FindValueForCurrentKey.
// We use Seek() function instead of Prev() to find necessary value
// TODO: This is very similar to FindNextUserEntry() and MergeValuesNewToOld().
// Would be nice to reuse some code.
bool DBIter::FindValueForCurrentKeyUsingSeek() {
// FindValueForCurrentKey will enable pinning before calling
// FindValueForCurrentKeyUsingSeek()
@ -974,26 +1027,38 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() {
iter_->Seek(last_key);
RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION);
// assume there is at least one parseable key for this user key
ParsedInternalKey ikey;
FindParseableKey(&ikey, kForward);
assert(iter_->Valid());
assert(user_comparator_->Equal(ikey.user_key, saved_key_.GetUserKey()));
// In case read_callback presents, the value we seek to may not be visible.
// Seek for the next value that's visible.
while (!IsVisible(ikey.sequence)) {
// Find the next value that's visible.
ParsedInternalKey ikey;
while (true) {
if (!iter_->Valid()) {
valid_ = false;
return iter_->status().ok();
}
if (!ParseKey(&ikey)) {
return false;
}
if (!user_comparator_->Equal(ikey.user_key, saved_key_.GetUserKey())) {
// No visible values for this key, even though FindValueForCurrentKey()
// has seen some. This is possible if we're using a tailing iterator, and
// the entries were discarded in a compaction.
valid_ = false;
return true;
}
if (IsVisible(ikey.sequence)) {
break;
}
iter_->Next();
FindParseableKey(&ikey, kForward);
assert(iter_->Valid());
assert(user_comparator_->Equal(ikey.user_key, saved_key_.GetUserKey()));
}
if (ikey.type == kTypeDeletion || ikey.type == kTypeSingleDeletion ||
range_del_agg_.ShouldDelete(
ikey, RangeDelAggregator::RangePositioningMode::kBackwardTraversal)) {
valid_ = false;
return false;
return true;
}
if (ikey.type == kTypeBlobIndex && !allow_blob_) {
ROCKS_LOG_ERROR(logger_, "Encounter unexpected blob index.");
@ -1001,7 +1066,7 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() {
"Encounter unexpected blob index. Please open DB with "
"rocksdb::blob_db::BlobDB instead.");
valid_ = false;
return true;
return false;
}
if (ikey.type == kTypeValue || ikey.type == kTypeBlobIndex) {
assert(iter_->IsValuePinned());
@ -1012,115 +1077,147 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() {
// kTypeMerge. We need to collect all kTypeMerge values and save them
// in operands
assert(ikey.type == kTypeMerge);
current_entry_is_merged_ = true;
merge_context_.Clear();
while (
iter_->Valid() &&
user_comparator_->Equal(ikey.user_key, saved_key_.GetUserKey()) &&
ikey.type == kTypeMerge &&
!range_del_agg_.ShouldDelete(
ikey, RangeDelAggregator::RangePositioningMode::kBackwardTraversal)) {
merge_context_.PushOperand(iter_->value(),
iter_->IsValuePinned() /* operand_pinned */);
PERF_COUNTER_ADD(internal_merge_count, 1);
merge_context_.PushOperand(iter_->value(),
iter_->IsValuePinned() /* operand_pinned */);
while (true) {
iter_->Next();
FindParseableKey(&ikey, kForward);
}
Status s;
if (!iter_->Valid() ||
!user_comparator_->Equal(ikey.user_key, saved_key_.GetUserKey()) ||
ikey.type == kTypeDeletion || ikey.type == kTypeSingleDeletion ||
range_del_agg_.ShouldDelete(
ikey, RangeDelAggregator::RangePositioningMode::kBackwardTraversal)) {
s = MergeHelper::TimedFullMerge(merge_operator_, saved_key_.GetUserKey(),
nullptr, merge_context_.GetOperands(),
&saved_value_, logger_, statistics_, env_,
&pinned_value_, true);
// Make iter_ valid and point to saved_key_
if (!iter_->Valid() ||
!user_comparator_->Equal(ikey.user_key, saved_key_.GetUserKey())) {
iter_->Seek(last_key);
RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION);
if (!iter_->Valid()) {
if (!iter_->status().ok()) {
valid_ = false;
return false;
}
break;
}
if (s.ok()) {
if (!ParseKey(&ikey)) {
return false;
}
if (!user_comparator_->Equal(ikey.user_key, saved_key_.GetUserKey())) {
break;
}
if (ikey.type == kTypeDeletion || ikey.type == kTypeSingleDeletion ||
range_del_agg_.ShouldDelete(
ikey,
RangeDelAggregator::RangePositioningMode::kBackwardTraversal)) {
break;
} else if (ikey.type == kTypeValue) {
const Slice val = iter_->value();
Status s = MergeHelper::TimedFullMerge(
merge_operator_, saved_key_.GetUserKey(), &val,
merge_context_.GetOperands(), &saved_value_, logger_, statistics_,
env_, &pinned_value_, true);
if (!s.ok()) {
valid_ = false;
status_ = s;
return false;
}
valid_ = true;
} else {
return true;
} else if (ikey.type == kTypeMerge) {
merge_context_.PushOperand(iter_->value(),
iter_->IsValuePinned() /* operand_pinned */);
PERF_COUNTER_ADD(internal_merge_count, 1);
} else if (ikey.type == kTypeBlobIndex) {
if (!allow_blob_) {
ROCKS_LOG_ERROR(logger_, "Encounter unexpected blob index.");
status_ = Status::NotSupported(
"Encounter unexpected blob index. Please open DB with "
"rocksdb::blob_db::BlobDB instead.");
} else {
status_ =
Status::NotSupported("Blob DB does not support merge operator.");
}
valid_ = false;
status_ = s;
return false;
} else {
assert(false);
}
return true;
}
const Slice& val = iter_->value();
s = MergeHelper::TimedFullMerge(merge_operator_, saved_key_.GetUserKey(),
&val, merge_context_.GetOperands(),
&saved_value_, logger_, statistics_, env_,
&pinned_value_, true);
if (s.ok()) {
valid_ = true;
} else {
Status s = MergeHelper::TimedFullMerge(
merge_operator_, saved_key_.GetUserKey(), nullptr,
merge_context_.GetOperands(), &saved_value_, logger_, statistics_, env_,
&pinned_value_, true);
if (!s.ok()) {
valid_ = false;
status_ = s;
return false;
}
// Make sure we leave iter_ in a good state. If it's valid and we don't care
// about prefixes, that's already good enough. Otherwise it needs to be
// seeked to the current key.
if ((prefix_extractor_ != nullptr && !total_order_seek_) || !iter_->Valid()) {
if (prefix_extractor_ != nullptr && !total_order_seek_) {
iter_->SeekForPrev(last_key);
} else {
iter_->Seek(last_key);
if (!iter_->Valid() && iter_->status().ok()) {
iter_->SeekToLast();
}
}
RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION);
}
valid_ = true;
return true;
}
// Used in Next to change directions
// Go to next user key
// Don't use Seek(),
// because next user key will be very close
void DBIter::FindNextUserKey() {
if (!iter_->Valid()) {
return;
}
ParsedInternalKey ikey;
FindParseableKey(&ikey, kForward);
while (iter_->Valid() &&
!user_comparator_->Equal(ikey.user_key, saved_key_.GetUserKey())) {
iter_->Next();
FindParseableKey(&ikey, kForward);
}
}
// Go to previous user_key
void DBIter::FindPrevUserKey() {
if (!iter_->Valid()) {
return;
}
// Move backwards until the key smaller than saved_key_.
// Changes valid_ only if return value is false.
bool DBIter::FindUserKeyBeforeSavedKey() {
assert(status_.ok());
size_t num_skipped = 0;
ParsedInternalKey ikey;
FindParseableKey(&ikey, kReverse);
int cmp;
while (iter_->Valid() &&
((cmp = user_comparator_->Compare(ikey.user_key,
saved_key_.GetUserKey())) == 0 ||
(cmp > 0 && !IsVisible(ikey.sequence)))) {
if (TooManyInternalKeysSkipped()) {
return;
while (iter_->Valid()) {
ParsedInternalKey ikey;
if (!ParseKey(&ikey)) {
return false;
}
if (cmp == 0) {
if (num_skipped >= max_skip_) {
num_skipped = 0;
IterKey last_key;
last_key.SetInternalKey(ParsedInternalKey(
saved_key_.GetUserKey(), kMaxSequenceNumber, kValueTypeForSeek));
iter_->Seek(last_key.GetInternalKey());
RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION);
} else {
++num_skipped;
}
if (user_comparator_->Compare(ikey.user_key, saved_key_.GetUserKey()) < 0) {
return true;
}
if (TooManyInternalKeysSkipped()) {
return false;
}
assert(ikey.sequence != kMaxSequenceNumber);
if (!IsVisible(ikey.sequence)) {
PERF_COUNTER_ADD(internal_recent_skipped_count, 1);
} else {
PERF_COUNTER_ADD(internal_key_skipped_count, 1);
}
if (num_skipped >= max_skip_) {
num_skipped = 0;
IterKey last_key;
last_key.SetInternalKey(ParsedInternalKey(
saved_key_.GetUserKey(), kMaxSequenceNumber, kValueTypeForSeek));
// It would be more efficient to use SeekForPrev() here, but some
// iterators may not support it.
iter_->Seek(last_key.GetInternalKey());
RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION);
if (!iter_->Valid()) {
break;
}
} else {
++num_skipped;
}
iter_->Prev();
FindParseableKey(&ikey, kReverse);
}
if (!iter_->status().ok()) {
valid_ = false;
return false;
}
return true;
}
bool DBIter::TooManyInternalKeysSkipped(bool increment) {
@ -1140,19 +1237,9 @@ bool DBIter::IsVisible(SequenceNumber sequence) {
(read_callback_ == nullptr || read_callback_->IsCommitted(sequence));
}
// Skip all unparseable keys
void DBIter::FindParseableKey(ParsedInternalKey* ikey, Direction direction) {
while (iter_->Valid() && !ParseKey(ikey)) {
if (direction == kReverse) {
iter_->Prev();
} else {
iter_->Next();
}
}
}
void DBIter::Seek(const Slice& target) {
StopWatch sw(env_, statistics_, DB_SEEK);
status_ = Status::OK();
ReleaseTempPinnedData();
ResetInternalKeysSkippedCounter();
saved_key_.Clear();
@ -1201,6 +1288,7 @@ void DBIter::Seek(const Slice& target) {
void DBIter::SeekForPrev(const Slice& target) {
StopWatch sw(env_, statistics_, DB_SEEK);
status_ = Status::OK();
ReleaseTempPinnedData();
ResetInternalKeysSkippedCounter();
saved_key_.Clear();
@ -1249,15 +1337,16 @@ void DBIter::SeekForPrev(const Slice& target) {
}
void DBIter::SeekToFirst() {
// Don't use iter_::Seek() if we set a prefix extractor
// because prefix seek will be used.
if (prefix_extractor_ != nullptr) {
max_skip_ = std::numeric_limits<uint64_t>::max();
}
if (iterate_lower_bound_ != nullptr) {
Seek(*iterate_lower_bound_);
return;
}
// Don't use iter_::Seek() if we set a prefix extractor
// because prefix seek will be used.
if (prefix_extractor_ != nullptr && !total_order_seek_) {
max_skip_ = std::numeric_limits<uint64_t>::max();
}
status_ = Status::OK();
direction_ = kForward;
ReleaseTempPinnedData();
ResetInternalKeysSkippedCounter();
@ -1293,11 +1382,22 @@ void DBIter::SeekToFirst() {
}
void DBIter::SeekToLast() {
if (iterate_upper_bound_ != nullptr) {
// Seek to last key strictly less than ReadOptions.iterate_upper_bound.
SeekForPrev(*iterate_upper_bound_);
if (Valid() && user_comparator_->Equal(*iterate_upper_bound_, key())) {
ReleaseTempPinnedData();
PrevInternal();
}
return;
}
// Don't use iter_::Seek() if we set a prefix extractor
// because prefix seek will be used.
if (prefix_extractor_ != nullptr) {
if (prefix_extractor_ != nullptr && !total_order_seek_) {
max_skip_ = std::numeric_limits<uint64_t>::max();
}
status_ = Status::OK();
direction_ = kReverse;
ReleaseTempPinnedData();
ResetInternalKeysSkippedCounter();
@ -1308,21 +1408,7 @@ void DBIter::SeekToLast() {
iter_->SeekToLast();
range_del_agg_.InvalidateTombstoneMapPositions();
}
// When the iterate_upper_bound is set to a value,
// it will seek to the last key before the
// ReadOptions.iterate_upper_bound
if (iter_->Valid() && iterate_upper_bound_ != nullptr) {
SeekForPrev(*iterate_upper_bound_);
range_del_agg_.InvalidateTombstoneMapPositions();
if (!Valid()) {
return;
} else if (user_comparator_->Equal(*iterate_upper_bound_, key())) {
ReleaseTempPinnedData();
PrevInternal();
}
} else {
PrevInternal();
}
PrevInternal();
if (statistics_ != nullptr) {
RecordTick(statistics_, NUMBER_DB_SEEK);
if (valid_) {

652
db/db_iter_stress_test.cc Normal file
View File

@ -0,0 +1,652 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include "db/db_iter.h"
#include "db/dbformat.h"
#include "rocksdb/comparator.h"
#include "rocksdb/options.h"
#include "rocksdb/slice.h"
#include "util/random.h"
#include "util/string_util.h"
#include "util/testharness.h"
#include "utilities/merge_operators.h"
#ifdef GFLAGS
#include "util/gflags_compat.h"
using GFLAGS_NAMESPACE::ParseCommandLineFlags;
DEFINE_bool(verbose, false,
"Print huge, detailed trace. Intended for debugging failures.");
#else
void ParseCommandLineFlags(int*, char***, bool) {}
bool FLAGS_verbose = false;
#endif
namespace rocksdb {
class DBIteratorStressTest : public testing::Test {
public:
Env* env_;
DBIteratorStressTest() : env_(Env::Default()) {}
};
namespace {
struct Entry {
std::string key;
ValueType type; // kTypeValue, kTypeDeletion, kTypeMerge
uint64_t sequence;
std::string ikey; // internal key, made from `key`, `sequence` and `type`
std::string value;
// If false, we'll pretend that this entry doesn't exist.
bool visible = true;
bool operator<(const Entry& e) const {
if (key != e.key) return key < e.key;
return std::tie(sequence, type) > std::tie(e.sequence, e.type);
}
};
struct Data {
std::vector<Entry> entries;
// Indices in `entries` with `visible` = false.
std::vector<size_t> hidden;
// Keys of entries whose `visible` changed since the last seek of iterators.
std::set<std::string> recently_touched_keys;
};
struct StressTestIterator : public InternalIterator {
Data* data;
Random64* rnd;
InternalKeyComparator cmp;
// Each operation will return error with this probability...
double error_probability = 0;
// ... and add/remove entries with this probability.
double mutation_probability = 0;
// The probability of adding vs removing entries will be chosen so that the
// amount of removed entries stays somewhat close to this number.
double target_hidden_fraction = 0;
// If true, print all mutations to stdout for debugging.
bool trace = false;
int iter = -1;
Status status_;
StressTestIterator(Data* _data, Random64* _rnd, const Comparator* _cmp)
: data(_data), rnd(_rnd), cmp(_cmp) {}
bool Valid() const override {
if (iter >= 0 && iter < (int)data->entries.size()) {
assert(status_.ok());
return true;
}
return false;
}
Status status() const override { return status_; }
bool MaybeFail() {
if (rnd->Next() >=
std::numeric_limits<uint64_t>::max() * error_probability) {
return false;
}
if (rnd->Next() % 2) {
status_ = Status::Incomplete("test");
} else {
status_ = Status::IOError("test");
}
if (trace) {
std::cout << "injecting " << status_.ToString() << std::endl;
}
iter = -1;
return true;
}
void MaybeMutate() {
if (rnd->Next() >=
std::numeric_limits<uint64_t>::max() * mutation_probability) {
return;
}
do {
// If too many entries are hidden, hide less, otherwise hide more.
double hide_probability =
data->hidden.size() > data->entries.size() * target_hidden_fraction
? 1. / 3
: 2. / 3;
if (data->hidden.empty()) {
hide_probability = 1;
}
bool do_hide =
rnd->Next() < std::numeric_limits<uint64_t>::max() * hide_probability;
if (do_hide) {
// Hide a random entry.
size_t idx = rnd->Next() % data->entries.size();
Entry& e = data->entries[idx];
if (e.visible) {
if (trace) {
std::cout << "hiding idx " << idx << std::endl;
}
e.visible = false;
data->hidden.push_back(idx);
data->recently_touched_keys.insert(e.key);
} else {
// Already hidden. Let's go unhide something instead, just because
// it's easy and it doesn't really matter what we do.
do_hide = false;
}
}
if (!do_hide) {
// Unhide a random entry.
size_t hi = rnd->Next() % data->hidden.size();
size_t idx = data->hidden[hi];
if (trace) {
std::cout << "unhiding idx " << idx << std::endl;
}
Entry& e = data->entries[idx];
assert(!e.visible);
e.visible = true;
data->hidden[hi] = data->hidden.back();
data->hidden.pop_back();
data->recently_touched_keys.insert(e.key);
}
} while (rnd->Next() % 3 != 0); // do 3 mutations on average
}
void SkipForward() {
while (iter < (int)data->entries.size() && !data->entries[iter].visible) {
++iter;
}
}
void SkipBackward() {
while (iter >= 0 && !data->entries[iter].visible) {
--iter;
}
}
void SeekToFirst() {
if (MaybeFail()) return;
MaybeMutate();
status_ = Status::OK();
iter = 0;
SkipForward();
}
void SeekToLast() {
if (MaybeFail()) return;
MaybeMutate();
status_ = Status::OK();
iter = (int)data->entries.size() - 1;
SkipBackward();
}
void Seek(const Slice& target) {
if (MaybeFail()) return;
MaybeMutate();
status_ = Status::OK();
// Binary search.
auto it = std::partition_point(
data->entries.begin(), data->entries.end(),
[&](const Entry& e) { return cmp.Compare(e.ikey, target) < 0; });
iter = (int)(it - data->entries.begin());
SkipForward();
}
void SeekForPrev(const Slice& target) {
if (MaybeFail()) return;
MaybeMutate();
status_ = Status::OK();
// Binary search.
auto it = std::partition_point(
data->entries.begin(), data->entries.end(),
[&](const Entry& e) { return cmp.Compare(e.ikey, target) <= 0; });
iter = (int)(it - data->entries.begin());
--iter;
SkipBackward();
}
void Next() {
assert(Valid());
if (MaybeFail()) return;
MaybeMutate();
++iter;
SkipForward();
}
void Prev() {
assert(Valid());
if (MaybeFail()) return;
MaybeMutate();
--iter;
SkipBackward();
}
Slice key() const {
assert(Valid());
return data->entries[iter].ikey;
}
Slice value() const {
assert(Valid());
return data->entries[iter].value;
}
bool IsKeyPinned() const override { return true; }
bool IsValuePinned() const override { return true; }
};
// A small reimplementation of DBIter, supporting only some of the features,
// and doing everything in O(log n).
// Skips all keys that are in recently_touched_keys.
struct ReferenceIterator {
Data* data;
uint64_t sequence; // ignore entries with sequence number below this
bool valid = false;
std::string key;
std::string value;
ReferenceIterator(Data* _data, uint64_t _sequence)
: data(_data), sequence(_sequence) {}
bool Valid() const { return valid; }
// Finds the first entry with key
// greater/less/greater-or-equal/less-or-equal than `key`, depending on
// arguments: if `skip`, inequality is strict; if `forward`, it's
// greater/greater-or-equal, otherwise less/less-or-equal.
// Sets `key` to the result.
// If no such key exists, returns false. Doesn't check `visible`.
bool FindNextKey(bool skip, bool forward) {
valid = false;
auto it = std::partition_point(data->entries.begin(), data->entries.end(),
[&](const Entry& e) {
if (forward != skip) {
return e.key < key;
} else {
return e.key <= key;
}
});
if (forward) {
if (it != data->entries.end()) {
key = it->key;
return true;
}
} else {
if (it != data->entries.begin()) {
--it;
key = it->key;
return true;
}
}
return false;
}
bool FindValueForCurrentKey() {
if (data->recently_touched_keys.count(key)) {
return false;
}
// Find the first entry for the key. The caller promises that it exists.
auto it = std::partition_point(data->entries.begin(), data->entries.end(),
[&](const Entry& e) {
if (e.key != key) {
return e.key < key;
}
return e.sequence > sequence;
});
// Find the first visible entry.
for (;; ++it) {
if (it == data->entries.end()) {
return false;
}
Entry& e = *it;
if (e.key != key) {
return false;
}
assert(e.sequence <= sequence);
if (!e.visible) continue;
if (e.type == kTypeDeletion) {
return false;
}
if (e.type == kTypeValue) {
value = e.value;
valid = true;
return true;
}
assert(e.type == kTypeMerge);
break;
}
// Collect merge operands.
std::vector<Slice> operands;
for (; it != data->entries.end(); ++it) {
Entry& e = *it;
if (e.key != key) {
break;
}
assert(e.sequence <= sequence);
if (!e.visible) continue;
if (e.type == kTypeDeletion) {
break;
}
operands.push_back(e.value);
if (e.type == kTypeValue) {
break;
}
}
// Do a merge.
value = operands.back().ToString();
for (int i = (int)operands.size() - 2; i >= 0; --i) {
value.append(",");
value.append(operands[i].data(), operands[i].size());
}
valid = true;
return true;
}
// Start at `key` and move until we encounter a valid value.
// `forward` defines the direction of movement.
// If `skip` is true, we're looking for key not equal to `key`.
void DoTheThing(bool skip, bool forward) {
while (FindNextKey(skip, forward) && !FindValueForCurrentKey()) {
skip = true;
}
}
void Seek(const Slice& target) {
key = target.ToString();
DoTheThing(false, true);
}
void SeekForPrev(const Slice& target) {
key = target.ToString();
DoTheThing(false, false);
}
void SeekToFirst() { Seek(""); }
void SeekToLast() {
key = data->entries.back().key;
DoTheThing(false, false);
}
void Next() {
assert(Valid());
DoTheThing(true, true);
}
void Prev() {
assert(Valid());
DoTheThing(true, false);
}
};
} // namespace
// Use an internal iterator that sometimes returns errors and sometimes
// adds/removes entries on the fly. Do random operations on a DBIter and
// check results.
// TODO: can be improved for more coverage:
// * Override IsKeyPinned() and IsValuePinned() to actually use
// PinnedIteratorManager and check that there's no use-after free.
// * Try different combinations of prefix_extractor, total_order_seek,
// prefix_same_as_start, iterate_lower_bound, iterate_upper_bound.
TEST_F(DBIteratorStressTest, StressTest) {
// We use a deterministic RNG, and everything happens in a single thread.
Random64 rnd(826909345792864532ll);
auto gen_key = [&](int max_key) {
int len = 0;
int a = max_key;
while (a) {
a /= 10;
++len;
}
std::string s = ToString(rnd.Next() % (uint64_t)max_key);
s.insert(0, len - (int)s.size(), '0');
return s;
};
Options options;
options.merge_operator = MergeOperators::CreateFromStringId("stringappend");
ReadOptions ropt;
size_t num_matching = 0;
size_t num_at_end = 0;
size_t num_not_ok = 0;
size_t num_recently_removed = 0;
// Number of iterations for each combination of parameters
// (there are ~250 of those).
// Tweak this to change the test run time.
// As of the time of writing, the test takes ~4 seconds for value of 5000.
const int num_iterations = 5000;
// Enable this to print all the operations for debugging.
bool trace = FLAGS_verbose;
for (int num_entries : {5, 10, 100}) {
for (double key_space : {0.1, 1.0, 3.0}) {
for (ValueType prevalent_entry_type :
{kTypeValue, kTypeDeletion, kTypeMerge}) {
for (double error_probability : {0.01, 0.1}) {
for (double mutation_probability : {0.01, 0.5}) {
for (double target_hidden_fraction : {0.1, 0.5}) {
std::string trace_str =
"entries: " + ToString(num_entries) +
", key_space: " + ToString(key_space) +
", error_probability: " + ToString(error_probability) +
", mutation_probability: " + ToString(mutation_probability) +
", target_hidden_fraction: " +
ToString(target_hidden_fraction);
SCOPED_TRACE(trace_str);
if (trace) {
std::cout << trace_str << std::endl;
}
// Generate data.
Data data;
int max_key = (int)(num_entries * key_space) + 1;
for (int i = 0; i < num_entries; ++i) {
Entry e;
e.key = gen_key(max_key);
if (rnd.Next() % 10 != 0) {
e.type = prevalent_entry_type;
} else {
const ValueType types[] = {kTypeValue, kTypeDeletion,
kTypeMerge};
e.type =
types[rnd.Next() % (sizeof(types) / sizeof(types[0]))];
}
e.sequence = i;
e.value = "v" + ToString(i);
ParsedInternalKey internal_key(e.key, e.sequence, e.type);
AppendInternalKey(&e.ikey, internal_key);
data.entries.push_back(e);
}
std::sort(data.entries.begin(), data.entries.end());
if (trace) {
std::cout << "entries:";
for (size_t i = 0; i < data.entries.size(); ++i) {
Entry& e = data.entries[i];
std::cout
<< "\n idx " << i << ": \"" << e.key << "\": \""
<< e.value << "\" seq: " << e.sequence << " type: "
<< (e.type == kTypeValue
? "val"
: e.type == kTypeDeletion ? "del" : "merge");
}
std::cout << std::endl;
}
std::unique_ptr<Iterator> db_iter;
std::unique_ptr<ReferenceIterator> ref_iter;
for (int iteration = 0; iteration < num_iterations; ++iteration) {
SCOPED_TRACE(iteration);
// Create a new iterator every ~30 operations.
if (db_iter == nullptr || rnd.Next() % 30 == 0) {
uint64_t sequence = rnd.Next() % (data.entries.size() + 2);
ref_iter.reset(new ReferenceIterator(&data, sequence));
if (trace) {
std::cout << "new iterator, seq: " << sequence << std::endl;
}
auto internal_iter =
new StressTestIterator(&data, &rnd, BytewiseComparator());
internal_iter->error_probability = error_probability;
internal_iter->mutation_probability = mutation_probability;
internal_iter->target_hidden_fraction =
target_hidden_fraction;
internal_iter->trace = trace;
db_iter.reset(NewDBIterator(
env_, ropt, ImmutableCFOptions(options),
BytewiseComparator(), internal_iter, sequence,
options.max_sequential_skip_in_iterations,
nullptr /*read_callback*/));
}
// Do a random operation. It's important to do it on ref_it
// later than on db_iter to make sure ref_it sees the correct
// recently_touched_keys.
std::string old_key;
bool forward = rnd.Next() % 2 > 0;
// Do Next()/Prev() ~90% of the time.
bool seek = !ref_iter->Valid() || rnd.Next() % 10 == 0;
if (trace) {
std::cout << iteration << ": ";
}
if (!seek) {
assert(db_iter->Valid());
old_key = ref_iter->key;
if (trace) {
std::cout << (forward ? "Next" : "Prev") << std::endl;
}
if (forward) {
db_iter->Next();
ref_iter->Next();
} else {
db_iter->Prev();
ref_iter->Prev();
}
} else {
data.recently_touched_keys.clear();
// Do SeekToFirst less often than Seek.
if (rnd.Next() % 4 == 0) {
if (trace) {
std::cout << (forward ? "SeekToFirst" : "SeekToLast")
<< std::endl;
}
if (forward) {
old_key = "";
db_iter->SeekToFirst();
ref_iter->SeekToFirst();
} else {
old_key = data.entries.back().key;
db_iter->SeekToLast();
ref_iter->SeekToLast();
}
} else {
old_key = gen_key(max_key);
if (trace) {
std::cout << (forward ? "Seek" : "SeekForPrev") << " \""
<< old_key << '"' << std::endl;
}
if (forward) {
db_iter->Seek(old_key);
ref_iter->Seek(old_key);
} else {
db_iter->SeekForPrev(old_key);
ref_iter->SeekForPrev(old_key);
}
}
}
// Check the result.
if (db_iter->Valid()) {
ASSERT_TRUE(db_iter->status().ok());
if (data.recently_touched_keys.count(
db_iter->key().ToString())) {
// Ended on a key that may have been mutated during the
// operation. Reference iterator skips such keys, so we
// can't check the exact result.
// Check that the key moved in the right direction.
if (forward) {
if (seek)
ASSERT_GE(db_iter->key().ToString(), old_key);
else
ASSERT_GT(db_iter->key().ToString(), old_key);
} else {
if (seek)
ASSERT_LE(db_iter->key().ToString(), old_key);
else
ASSERT_LT(db_iter->key().ToString(), old_key);
}
if (ref_iter->Valid()) {
// Check that DBIter didn't miss any non-mutated key.
if (forward) {
ASSERT_LT(db_iter->key().ToString(), ref_iter->key);
} else {
ASSERT_GT(db_iter->key().ToString(), ref_iter->key);
}
}
// Tell the next iteration of the loop to reseek the
// iterators.
ref_iter->valid = false;
++num_recently_removed;
} else {
ASSERT_TRUE(ref_iter->Valid());
ASSERT_EQ(ref_iter->key, db_iter->key().ToString());
ASSERT_EQ(ref_iter->value, db_iter->value());
++num_matching;
}
} else if (db_iter->status().ok()) {
ASSERT_FALSE(ref_iter->Valid());
++num_at_end;
} else {
// Non-ok status. Nothing to check here.
// Tell the next iteration of the loop to reseek the
// iterators.
ref_iter->valid = false;
++num_not_ok;
}
}
}
}
}
}
}
}
// Check that all cases were hit many times.
EXPECT_GT(num_matching, 10000);
EXPECT_GT(num_at_end, 10000);
EXPECT_GT(num_not_ok, 10000);
EXPECT_GT(num_recently_removed, 10000);
std::cout << "stats:\n exact matches: " << num_matching
<< "\n end reached: " << num_at_end
<< "\n non-ok status: " << num_not_ok
<< "\n mutated on the fly: " << num_recently_removed << std::endl;
}
} // namespace rocksdb
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
ParseCommandLineFlags(&argc, &argv, true);
return RUN_ALL_TESTS();
}

View File

@ -84,6 +84,36 @@ class TestIterator : public InternalIterator {
});
}
// Removes the key from the set of keys over which this iterator iterates.
// Not to be confused with AddDeletion().
// If the iterator is currently positioned on this key, the deletion will
// apply next time the iterator moves.
// Used for simulating ForwardIterator updating to a new version that doesn't
// have some of the keys (e.g. after compaction with a filter).
void Vanish(std::string _key) {
if (valid_ && data_[iter_].first == _key) {
delete_current_ = true;
return;
}
for (auto it = data_.begin(); it != data_.end(); ++it) {
ParsedInternalKey ikey;
bool ok __attribute__((__unused__)) = ParseInternalKey(it->first, &ikey);
assert(ok);
if (ikey.user_key != _key) {
continue;
}
if (valid_ && data_.begin() + iter_ > it) {
--iter_;
}
data_.erase(it);
return;
}
assert(false);
}
// Number of operations done on this iterator since construction.
size_t steps() const { return steps_; }
virtual bool Valid() const override {
assert(initialized_);
return valid_;
@ -91,12 +121,16 @@ class TestIterator : public InternalIterator {
virtual void SeekToFirst() override {
assert(initialized_);
++steps_;
DeleteCurrentIfNeeded();
valid_ = (data_.size() > 0);
iter_ = 0;
}
virtual void SeekToLast() override {
assert(initialized_);
++steps_;
DeleteCurrentIfNeeded();
valid_ = (data_.size() > 0);
iter_ = data_.size() - 1;
}
@ -104,6 +138,7 @@ class TestIterator : public InternalIterator {
virtual void Seek(const Slice& target) override {
assert(initialized_);
SeekToFirst();
++steps_;
if (!valid_) {
return;
}
@ -119,20 +154,31 @@ class TestIterator : public InternalIterator {
virtual void SeekForPrev(const Slice& target) override {
assert(initialized_);
DeleteCurrentIfNeeded();
SeekForPrevImpl(target, &cmp);
}
virtual void Next() override {
assert(initialized_);
if (data_.empty() || (iter_ == data_.size() - 1)) {
valid_ = false;
assert(valid_);
assert(iter_ < data_.size());
++steps_;
if (delete_current_) {
DeleteCurrentIfNeeded();
} else {
++iter_;
}
valid_ = iter_ < data_.size();
}
virtual void Prev() override {
assert(initialized_);
assert(valid_);
assert(iter_ < data_.size());
++steps_;
DeleteCurrentIfNeeded();
if (iter_ == 0) {
valid_ = false;
} else {
@ -163,9 +209,19 @@ class TestIterator : public InternalIterator {
bool valid_;
size_t sequence_number_;
size_t iter_;
size_t steps_ = 0;
InternalKeyComparator cmp;
std::vector<std::pair<std::string, std::string>> data_;
bool delete_current_ = false;
void DeleteCurrentIfNeeded() {
if (!delete_current_) {
return;
}
data_.erase(data_.begin() + iter_);
delete_current_ = false;
}
};
class DBIteratorTest : public testing::Test {
@ -3018,6 +3074,41 @@ TEST_F(DBIteratorTest, SeekLessLowerBound) {
ASSERT_EQ(lower_bound_str, db_iter->key().ToString());
}
TEST_F(DBIteratorTest, ReverseToForwardWithDisappearingKeys) {
Options options;
options.prefix_extractor.reset(NewCappedPrefixTransform(0));
TestIterator* internal_iter = new TestIterator(BytewiseComparator());
internal_iter->AddPut("a", "A");
internal_iter->AddPut("b", "B");
for (int i = 0; i < 100; ++i) {
internal_iter->AddPut("c" + ToString(i), "");
}
internal_iter->Finish();
std::unique_ptr<Iterator> db_iter(NewDBIterator(
env_, ReadOptions(), ImmutableCFOptions(options), BytewiseComparator(),
internal_iter, 10, options.max_sequential_skip_in_iterations,
nullptr /*read_callback*/));
db_iter->SeekForPrev("a");
ASSERT_TRUE(db_iter->Valid());
ASSERT_OK(db_iter->status());
ASSERT_EQ("a", db_iter->key().ToString());
internal_iter->Vanish("a");
db_iter->Next();
ASSERT_TRUE(db_iter->Valid());
ASSERT_OK(db_iter->status());
ASSERT_EQ("b", db_iter->key().ToString());
// A (sort of) bug used to cause DBIter to pointlessly drag the internal
// iterator all the way to the end. But this doesn't really matter at the time
// of writing because the only iterator that can see disappearing keys is
// ForwardIterator, which doesn't support SeekForPrev().
EXPECT_LT(internal_iter->steps(), 20);
}
} // namespace rocksdb
int main(int argc, char** argv) {

View File

@ -2232,6 +2232,46 @@ TEST_P(DBIteratorTest, SeekAfterHittingManyInternalKeys) {
ASSERT_EQ(iter2->value().ToString(), "val_6");
}
// Reproduces a former bug where iterator would skip some records when DBIter
// re-seeks subiterator with Incomplete status.
TEST_P(DBIteratorTest, NonBlockingIterationBugRepro) {
Options options = CurrentOptions();
BlockBasedTableOptions table_options;
// Make sure the sst file has more than one block.
table_options.flush_block_policy_factory =
std::make_shared<FlushBlockEveryKeyPolicyFactory>();
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
DestroyAndReopen(options);
// Two records in sst file, each in its own block.
Put("b", "");
Put("d", "");
Flush();
// Create a nonblocking iterator before writing to memtable.
ReadOptions ropt;
ropt.read_tier = kBlockCacheTier;
unique_ptr<Iterator> iter(NewIterator(ropt));
// Overwrite a key in memtable many times to hit
// max_sequential_skip_in_iterations (which is 8 by default).
for (int i = 0; i < 20; ++i) {
Put("c", "");
}
// Load the second block in sst file into the block cache.
{
unique_ptr<Iterator> iter2(NewIterator(ReadOptions()));
iter2->Seek("d");
}
// Finally seek the nonblocking iterator.
iter->Seek("a");
// With the bug, the status used to be OK, and the iterator used to point to
// "d".
EXPECT_TRUE(iter->status().IsIncomplete());
}
INSTANTIATE_TEST_CASE_P(DBIteratorTestInstance, DBIteratorTest,
testing::Values(true, false));

View File

@ -27,7 +27,7 @@ namespace rocksdb {
// Usage:
// ForwardLevelIterator iter;
// iter.SetFileIndex(file_index);
// iter.Seek(target);
// iter.Seek(target); // or iter.SeekToFirst();
// iter.Next()
class ForwardLevelIterator : public InternalIterator {
public:
@ -53,11 +53,11 @@ class ForwardLevelIterator : public InternalIterator {
void SetFileIndex(uint32_t file_index) {
assert(file_index < files_.size());
status_ = Status::OK();
if (file_index != file_index_) {
file_index_ = file_index;
Reset();
}
valid_ = false;
}
void Reset() {
assert(file_index_ < files_.size());
@ -77,10 +77,10 @@ class ForwardLevelIterator : public InternalIterator {
read_options_.ignore_range_deletions ? nullptr : &range_del_agg,
nullptr /* table_reader_ptr */, nullptr, false);
file_iter_->SetPinnedItersMgr(pinned_iters_mgr_);
valid_ = false;
if (!range_del_agg.IsEmpty()) {
status_ = Status::NotSupported(
"Range tombstones unsupported with ForwardIterator");
valid_ = false;
}
}
void SeekToLast() override {
@ -95,12 +95,27 @@ class ForwardLevelIterator : public InternalIterator {
return valid_;
}
void SeekToFirst() override {
SetFileIndex(0);
assert(file_iter_ != nullptr);
if (!status_.ok()) {
assert(!valid_);
return;
}
file_iter_->SeekToFirst();
valid_ = file_iter_->Valid();
}
void Seek(const Slice& internal_key) override {
assert(file_iter_ != nullptr);
// This deviates from the usual convention for InternalIterator::Seek() in
// that it doesn't discard pre-existing error status. That's because this
// Seek() is only supposed to be called immediately after SetFileIndex()
// (which discards pre-existing error status), and SetFileIndex() may set
// an error status, which we shouldn't discard.
if (!status_.ok()) {
assert(!valid_);
return;
}
file_iter_->Seek(internal_key);
valid_ = file_iter_->Valid();
}
@ -112,8 +127,12 @@ class ForwardLevelIterator : public InternalIterator {
assert(valid_);
file_iter_->Next();
for (;;) {
if (file_iter_->status().IsIncomplete() || file_iter_->Valid()) {
valid_ = !file_iter_->status().IsIncomplete();
valid_ = file_iter_->Valid();
if (!file_iter_->status().ok()) {
assert(!valid_);
return;
}
if (valid_) {
return;
}
if (file_index_ + 1 >= files_.size()) {
@ -121,6 +140,10 @@ class ForwardLevelIterator : public InternalIterator {
return;
}
SetFileIndex(file_index_ + 1);
if (!status_.ok()) {
assert(!valid_);
return;
}
file_iter_->SeekToFirst();
}
}
@ -135,7 +158,7 @@ class ForwardLevelIterator : public InternalIterator {
Status status() const override {
if (!status_.ok()) {
return status_;
} else if (file_iter_ && !file_iter_->status().ok()) {
} else if (file_iter_) {
return file_iter_->status();
}
return Status::OK();
@ -299,9 +322,6 @@ bool ForwardIterator::IsOverUpperBound(const Slice& internal_key) const {
}
void ForwardIterator::Seek(const Slice& internal_key) {
if (IsOverUpperBound(internal_key)) {
valid_ = false;
}
if (sv_ == nullptr) {
RebuildIterators(true);
} else if (sv_->version_number != cfd_->GetSuperVersionNumber()) {
@ -605,7 +625,9 @@ void ForwardIterator::RebuildIterators(bool refresh_sv) {
if ((read_options_.iterate_upper_bound != nullptr) &&
cfd_->internal_comparator().user_comparator()->Compare(
l0->smallest.user_key(), *read_options_.iterate_upper_bound) > 0) {
has_iter_trimmed_for_upper_bound_ = true;
// No need to set has_iter_trimmed_for_upper_bound_: this ForwardIterator
// will never be interested in files with smallest key above
// iterate_upper_bound, since iterate_upper_bound can't be changed.
l0_iters_.push_back(nullptr);
continue;
}
@ -773,7 +795,7 @@ void ForwardIterator::UpdateCurrent() {
current_ = mutable_iter_;
}
}
valid_ = (current_ != nullptr);
valid_ = current_ != nullptr && immutable_status_.ok();
if (!status_.ok()) {
status_ = Status::OK();
}

View File

@ -101,9 +101,7 @@ void ManagedIterator::SeekToLast() {
}
assert(mutable_iter_ != nullptr);
mutable_iter_->SeekToLast();
if (mutable_iter_->status().ok()) {
UpdateCurrent();
}
UpdateCurrent();
}
void ManagedIterator::SeekToFirst() {
@ -146,27 +144,13 @@ void ManagedIterator::Prev() {
}
MILock l(&in_use_, this);
if (NeedToRebuild()) {
std::string current_key = key().ToString();
Slice old_key(current_key);
RebuildIterator();
SeekInternal(old_key, false);
UpdateCurrent();
RebuildIterator(true);
if (!valid_) {
return;
}
if (key().compare(old_key) != 0) {
valid_ = false;
status_ = Status::Incomplete("Cannot do Prev now");
return;
}
}
mutable_iter_->Prev();
if (mutable_iter_->status().ok()) {
UpdateCurrent();
status_ = Status::OK();
} else {
status_ = mutable_iter_->status();
}
UpdateCurrent();
}
void ManagedIterator::Next() {
@ -176,19 +160,10 @@ void ManagedIterator::Next() {
}
MILock l(&in_use_, this);
if (NeedToRebuild()) {
std::string current_key = key().ToString();
Slice old_key(current_key.data(), cached_key_.Size());
RebuildIterator();
SeekInternal(old_key, false);
UpdateCurrent();
RebuildIterator(true);
if (!valid_) {
return;
}
if (key().compare(old_key) != 0) {
valid_ = false;
status_ = Status::Incomplete("Cannot do Next now");
return;
}
}
mutable_iter_->Next();
UpdateCurrent();
@ -206,21 +181,38 @@ Slice ManagedIterator::value() const {
Status ManagedIterator::status() const { return status_; }
void ManagedIterator::RebuildIterator() {
void ManagedIterator::RebuildIterator(bool reseek) {
std::string current_key;
if (reseek) {
current_key = key().ToString();
}
svnum_ = cfd_->GetSuperVersionNumber();
mutable_iter_ = unique_ptr<Iterator>(db_->NewIterator(read_options_, &cfh_));
if (reseek) {
Slice old_key(current_key.data(), current_key.size());
SeekInternal(old_key, false);
UpdateCurrent();
if (!valid_ || key().compare(old_key) != 0) {
valid_ = false;
status_ = Status::Incomplete(
"Next/Prev failed because current key has "
"been removed");
}
}
}
void ManagedIterator::UpdateCurrent() {
assert(mutable_iter_ != nullptr);
valid_ = mutable_iter_->Valid();
status_ = mutable_iter_->status();
if (!valid_) {
status_ = mutable_iter_->status();
return;
}
status_ = Status::OK();
cached_key_.SetUserKey(mutable_iter_->key());
cached_value_.SetUserKey(mutable_iter_->value());
}

View File

@ -54,7 +54,7 @@ class ManagedIterator : public Iterator {
}
private:
void RebuildIterator();
void RebuildIterator(bool reseek = false);
void UpdateCurrent();
void SeekInternal(const Slice& user_key, bool seek_to_first);
bool NeedToRebuild();

View File

@ -502,13 +502,7 @@ class LevelIterator final : public InternalIterator {
return file_iter_.value();
}
virtual Status status() const override {
// It'd be nice if status() returned a const Status& instead of a Status
if (!status_.ok()) {
return status_;
} else if (file_iter_.iter() != nullptr) {
return file_iter_.status();
}
return Status::OK();
return file_iter_.iter() ? file_iter_.status() : Status::OK();
}
virtual void SetPinnedItersMgr(
PinnedIteratorsManager* pinned_iters_mgr) override {
@ -573,7 +567,6 @@ class LevelIterator final : public InternalIterator {
RangeDelAggregator* range_del_agg_;
IteratorWrapper file_iter_; // May be nullptr
PinnedIteratorsManager* pinned_iters_mgr_;
Status status_;
};
void LevelIterator::Seek(const Slice& target) {
@ -628,16 +621,9 @@ void LevelIterator::Prev() {
}
void LevelIterator::SkipEmptyFileForward() {
// For an error (IO error, checksum mismatch, etc), we skip the file
// and move to the next one and continue reading data.
// TODO this behavior is from LevelDB. We should revisit it.
while (file_iter_.iter() == nullptr ||
(!file_iter_.Valid() && !file_iter_.status().IsIncomplete())) {
if (file_iter_.iter() != nullptr && !file_iter_.Valid() &&
file_iter_.iter()->IsOutOfBound()) {
return;
}
(!file_iter_.Valid() && file_iter_.status().ok() &&
!file_iter_.iter()->IsOutOfBound())) {
// Move to next file
if (file_index_ >= flevel_->num_files - 1) {
// Already at the last file
@ -657,7 +643,7 @@ void LevelIterator::SkipEmptyFileForward() {
void LevelIterator::SkipEmptyFileBackward() {
while (file_iter_.iter() == nullptr ||
(!file_iter_.Valid() && !file_iter_.status().IsIncomplete())) {
(!file_iter_.Valid() && file_iter_.status().ok())) {
// Move to previous file
if (file_index_ == 0) {
// Already the first file
@ -672,13 +658,6 @@ void LevelIterator::SkipEmptyFileBackward() {
}
void LevelIterator::SetFileIterator(InternalIterator* iter) {
if (file_iter_.iter() != nullptr && status_.ok()) {
// TODO right now we don't invalidate the iterator even if the status is
// not OK. We should consider to do that so that it is harder for users to
// skip errors.
status_ = file_iter_.status();
}
if (pinned_iters_mgr_ && iter) {
iter->SetPinnedItersMgr(pinned_iters_mgr_);
}

View File

@ -33,6 +33,7 @@ class Iterator : public Cleanable {
// An iterator is either positioned at a key/value pair, or
// not valid. This method returns true iff the iterator is valid.
// Always returns false if !status().ok().
virtual bool Valid() const = 0;
// Position at the first key in the source. The iterator is Valid()
@ -46,6 +47,9 @@ class Iterator : public Cleanable {
// Position at the first key in the source that at or past target.
// The iterator is Valid() after this call iff the source contains
// an entry that comes at or past target.
// All Seek*() methods clear any error status() that the iterator had prior to
// the call; after the seek, status() indicates only the error (if any) that
// happened during the seek, not any past errors.
virtual void Seek(const Slice& target) = 0;
// Position at the last key in the source that at or before target.
@ -72,7 +76,7 @@ class Iterator : public Cleanable {
// Return the value for the current entry. The underlying storage for
// the returned slice is valid only until the next modification of
// the iterator.
// REQUIRES: !AtEnd() && !AtStart()
// REQUIRES: Valid()
virtual Slice value() const = 0;
// If an error has occurred, return it. Else return an ok status.

1
src.mk
View File

@ -267,6 +267,7 @@ MAIN_SOURCES = \
db/db_inplace_update_test.cc \
db/db_io_failure_test.cc \
db/db_iter_test.cc \
db/db_iter_stress_test.cc \
db/db_iterator_test.cc \
db/db_log_iter_test.cc \
db/db_memtable_test.cc \

View File

@ -429,12 +429,13 @@ BlockIter* Block::NewIterator(const Comparator* cmp, BlockIter* iter,
ret_iter = new BlockIter;
}
if (size_ < 2*sizeof(uint32_t)) {
ret_iter->SetStatus(Status::Corruption("bad block contents"));
ret_iter->Invalidate(Status::Corruption("bad block contents"));
return ret_iter;
}
const uint32_t num_restarts = NumRestarts();
if (num_restarts == 0) {
ret_iter->SetStatus(Status::OK());
// Empty block.
ret_iter->Invalidate(Status::OK());
return ret_iter;
} else {
BlockPrefixIndex* prefix_index_ptr =

View File

@ -241,8 +241,21 @@ class BlockIter final : public InternalIterator {
last_bitmap_offset_ = current_ + 1;
}
void SetStatus(Status s) {
// Makes Valid() return false, status() return `s`, and Seek()/Prev()/etc do
// nothing.
void Invalidate(Status s) {
// Assert that the BlockIter is never deleted while Pinning is Enabled.
assert(!pinned_iters_mgr_ ||
(pinned_iters_mgr_ && !pinned_iters_mgr_->PinningEnabled()));
data_ = nullptr;
current_ = restarts_;
status_ = s;
// Clear prev entries cache.
prev_entries_keys_buff_.clear();
prev_entries_.clear();
prev_entries_idx_ = -1;
}
virtual bool Valid() const override { return current_ < restarts_; }

View File

@ -1391,7 +1391,7 @@ InternalIterator* BlockBasedTable::NewIndexIterator(
if (cache_handle == nullptr && no_io) {
if (input_iter != nullptr) {
input_iter->SetStatus(Status::Incomplete("no blocking io"));
input_iter->Invalidate(Status::Incomplete("no blocking io"));
return input_iter;
} else {
return NewErrorInternalIterator(Status::Incomplete("no blocking io"));
@ -1438,7 +1438,7 @@ InternalIterator* BlockBasedTable::NewIndexIterator(
RecordTick(statistics, BLOCK_CACHE_ADD_FAILURES);
// make sure if something goes wrong, index_reader shall remain intact.
if (input_iter != nullptr) {
input_iter->SetStatus(s);
input_iter->Invalidate(s);
return input_iter;
} else {
return NewErrorInternalIterator(s);
@ -1506,7 +1506,7 @@ BlockIter* BlockBasedTable::NewDataBlockIterator(
if (s.ok() && block.value == nullptr) {
if (no_io) {
// Could not read from block_cache and can't do IO
iter->SetStatus(Status::Incomplete("no blocking io"));
iter->Invalidate(Status::Incomplete("no blocking io"));
return iter;
}
std::unique_ptr<Block> block_value;
@ -1566,7 +1566,7 @@ BlockIter* BlockBasedTable::NewDataBlockIterator(
}
} else {
assert(block.value == nullptr);
iter->SetStatus(s);
iter->Invalidate(s);
}
return iter;
}
@ -1876,7 +1876,9 @@ void BlockBasedTableIterator::InitDataBlock() {
BlockHandle data_block_handle;
Slice handle_slice = index_iter_->value();
if (!block_iter_points_to_real_block_ ||
handle_slice.compare(prev_index_value_) != 0) {
handle_slice.compare(prev_index_value_) != 0 ||
// if previous attempt of reading the block missed cache, try again
data_block_iter_.status().IsIncomplete()) {
if (block_iter_points_to_real_block_) {
ResetDataIter();
}

View File

@ -528,11 +528,9 @@ class BlockBasedTableIterator : public InternalIterator {
return data_block_iter_.value();
}
Status status() const override {
// It'd be nice if status() returned a const Status& instead of a Status
if (!index_iter_->status().ok()) {
return index_iter_->status();
} else if (block_iter_points_to_real_block_ &&
!data_block_iter_.status().ok()) {
} else if (block_iter_points_to_real_block_) {
return data_block_iter_.status();
} else {
return Status::OK();
@ -571,8 +569,7 @@ class BlockBasedTableIterator : public InternalIterator {
if (pinned_iters_mgr_ != nullptr && pinned_iters_mgr_->PinningEnabled()) {
data_block_iter_.DelegateCleanupsTo(pinned_iters_mgr_);
}
data_block_iter_.~BlockIter();
new (&data_block_iter_) BlockIter();
data_block_iter_.Invalidate(Status::OK());
block_iter_points_to_real_block_ = false;
}
}

View File

@ -206,7 +206,7 @@ class CuckooTableIterator : public InternalIterator {
void Prev() override;
Slice key() const override;
Slice value() const override;
Status status() const override { return status_; }
Status status() const override { return Status::OK(); }
void InitIfNeeded();
private:
@ -241,7 +241,6 @@ class CuckooTableIterator : public InternalIterator {
void PrepareKVAtCurrIdx();
CuckooTableReader* reader_;
bool initialized_;
Status status_;
// Contains a map of keys to bucket_id sorted in key order.
std::vector<uint32_t> sorted_bucket_ids_;
// We assume that the number of items can be stored in uint32 (4 Billion).

View File

@ -22,6 +22,7 @@ class InternalIterator : public Cleanable {
// An iterator is either positioned at a key/value pair, or
// not valid. This method returns true iff the iterator is valid.
// Always returns false if !status().ok().
virtual bool Valid() const = 0;
// Position at the first key in the source. The iterator is Valid()
@ -35,6 +36,9 @@ class InternalIterator : public Cleanable {
// Position at the first key in the source that at or past target
// The iterator is Valid() after this call iff the source contains
// an entry that comes at or past target.
// All Seek*() methods clear any error status() that the iterator had prior to
// the call; after the seek, status() indicates only the error (if any) that
// happened during the seek, not any past errors.
virtual void Seek(const Slice& target) = 0;
// Position at the first key in the source that at or before target
@ -61,7 +65,7 @@ class InternalIterator : public Cleanable {
// Return the value for the current entry. The underlying storage for
// the returned slice is valid only until the next modification of
// the iterator.
// REQUIRES: !AtEnd() && !AtStart()
// REQUIRES: Valid()
virtual Slice value() const = 0;
// If an error has occurred, return it. Else return an ok status.

View File

@ -87,6 +87,7 @@ class IteratorWrapper {
valid_ = iter_->Valid();
if (valid_) {
key_ = iter_->key();
assert(iter_->status().ok());
}
}

View File

@ -52,12 +52,21 @@ class MergingIterator : public InternalIterator {
}
for (auto& child : children_) {
if (child.Valid()) {
assert(child.status().ok());
minHeap_.push(&child);
} else {
considerStatus(child.status());
}
}
current_ = CurrentForward();
}
void considerStatus(Status s) {
if (!s.ok() && status_.ok()) {
status_ = s;
}
}
virtual void AddIterator(InternalIterator* iter) {
assert(direction_ == kForward);
children_.emplace_back(iter);
@ -66,8 +75,11 @@ class MergingIterator : public InternalIterator {
}
auto new_wrapper = children_.back();
if (new_wrapper.Valid()) {
assert(new_wrapper.status().ok());
minHeap_.push(&new_wrapper);
current_ = CurrentForward();
} else {
considerStatus(new_wrapper.status());
}
}
@ -77,14 +89,22 @@ class MergingIterator : public InternalIterator {
}
}
virtual bool Valid() const override { return (current_ != nullptr); }
virtual bool Valid() const override {
return current_ != nullptr && status_.ok();
}
virtual Status status() const override { return status_; }
virtual void SeekToFirst() override {
ClearHeaps();
status_ = Status::OK();
for (auto& child : children_) {
child.SeekToFirst();
if (child.Valid()) {
assert(child.status().ok());
minHeap_.push(&child);
} else {
considerStatus(child.status());
}
}
direction_ = kForward;
@ -94,10 +114,14 @@ class MergingIterator : public InternalIterator {
virtual void SeekToLast() override {
ClearHeaps();
InitMaxHeap();
status_ = Status::OK();
for (auto& child : children_) {
child.SeekToLast();
if (child.Valid()) {
assert(child.status().ok());
maxHeap_->push(&child);
} else {
considerStatus(child.status());
}
}
direction_ = kReverse;
@ -106,6 +130,7 @@ class MergingIterator : public InternalIterator {
virtual void Seek(const Slice& target) override {
ClearHeaps();
status_ = Status::OK();
for (auto& child : children_) {
{
PERF_TIMER_GUARD(seek_child_seek_time);
@ -114,8 +139,11 @@ class MergingIterator : public InternalIterator {
PERF_COUNTER_ADD(seek_child_seek_count, 1);
if (child.Valid()) {
assert(child.status().ok());
PERF_TIMER_GUARD(seek_min_heap_time);
minHeap_.push(&child);
} else {
considerStatus(child.status());
}
}
direction_ = kForward;
@ -128,6 +156,7 @@ class MergingIterator : public InternalIterator {
virtual void SeekForPrev(const Slice& target) override {
ClearHeaps();
InitMaxHeap();
status_ = Status::OK();
for (auto& child : children_) {
{
@ -137,8 +166,11 @@ class MergingIterator : public InternalIterator {
PERF_COUNTER_ADD(seek_child_seek_count, 1);
if (child.Valid()) {
assert(child.status().ok());
PERF_TIMER_GUARD(seek_max_heap_time);
maxHeap_->push(&child);
} else {
considerStatus(child.status());
}
}
direction_ = kReverse;
@ -172,9 +204,11 @@ class MergingIterator : public InternalIterator {
// current is still valid after the Next() call above. Call
// replace_top() to restore the heap property. When the same child
// iterator yields a sequence of keys, this is cheap.
assert(current_->status().ok());
minHeap_.replace_top(current_);
} else {
// current stopped being valid, remove it from the heap.
considerStatus(current_->status());
minHeap_.pop();
}
current_ = CurrentForward();
@ -191,28 +225,35 @@ class MergingIterator : public InternalIterator {
// just after the if-block.
ClearHeaps();
InitMaxHeap();
Slice target = key();
for (auto& child : children_) {
if (&child != current_) {
if (!prefix_seek_mode_) {
child.Seek(key());
child.Seek(target);
if (child.Valid()) {
// Child is at first entry >= key(). Step back one to be < key()
TEST_SYNC_POINT_CALLBACK("MergeIterator::Prev:BeforePrev",
&child);
assert(child.status().ok());
child.Prev();
} else {
// Child has no entries >= key(). Position at last entry.
TEST_SYNC_POINT("MergeIterator::Prev:BeforeSeekToLast");
considerStatus(child.status());
child.SeekToLast();
}
considerStatus(child.status());
} else {
child.SeekForPrev(key());
if (child.Valid() && comparator_->Equal(key(), child.key())) {
child.SeekForPrev(target);
considerStatus(child.status());
if (child.Valid() && comparator_->Equal(target, child.key())) {
child.Prev();
considerStatus(child.status());
}
}
}
if (child.Valid()) {
assert(child.status().ok());
maxHeap_->push(&child);
}
}
@ -238,9 +279,11 @@ class MergingIterator : public InternalIterator {
// current is still valid after the Prev() call above. Call
// replace_top() to restore the heap property. When the same child
// iterator yields a sequence of keys, this is cheap.
assert(current_->status().ok());
maxHeap_->replace_top(current_);
} else {
// current stopped being valid, remove it from the heap.
considerStatus(current_->status());
maxHeap_->pop();
}
current_ = CurrentReverse();
@ -256,17 +299,6 @@ class MergingIterator : public InternalIterator {
return current_->value();
}
virtual Status status() const override {
Status s;
for (auto& child : children_) {
s = child.status();
if (!s.ok()) {
break;
}
}
return s;
}
virtual void SetPinnedItersMgr(
PinnedIteratorsManager* pinned_iters_mgr) override {
pinned_iters_mgr_ = pinned_iters_mgr;
@ -302,6 +334,8 @@ class MergingIterator : public InternalIterator {
// child iterators are valid. This is the top of minHeap_ or maxHeap_
// depending on the direction.
IteratorWrapper* current_;
// If any of the children have non-ok status, this is one of them.
Status status_;
// Which direction is the iterator moving?
enum Direction {
kForward,
@ -334,11 +368,14 @@ void MergingIterator::SwitchToForward() {
// Otherwise, advance the non-current children. We advance current_
// just after the if-block.
ClearHeaps();
Slice target = key();
for (auto& child : children_) {
if (&child != current_) {
child.Seek(key());
if (child.Valid() && comparator_->Equal(key(), child.key())) {
child.Seek(target);
considerStatus(child.status());
if (child.Valid() && comparator_->Equal(target, child.key())) {
child.Next();
considerStatus(child.status());
}
}
if (child.Valid()) {

View File

@ -625,6 +625,7 @@ bool PlainTableIterator::Valid() const {
}
void PlainTableIterator::SeekToFirst() {
status_ = Status::OK();
next_offset_ = table_->data_start_offset_;
if (next_offset_ >= table_->file_info_.data_end_offset) {
next_offset_ = offset_ = table_->file_info_.data_end_offset;
@ -636,6 +637,7 @@ void PlainTableIterator::SeekToFirst() {
void PlainTableIterator::SeekToLast() {
assert(false);
status_ = Status::NotSupported("SeekToLast() is not supported in PlainTable");
next_offset_ = offset_ = table_->file_info_.data_end_offset;
}
void PlainTableIterator::Seek(const Slice& target) {
@ -676,6 +678,7 @@ void PlainTableIterator::Seek(const Slice& target) {
if (!table_->IsTotalOrderMode()) {
prefix_hash = GetSliceHash(prefix_slice);
if (!table_->MatchBloom(prefix_hash)) {
status_ = Status::OK();
offset_ = next_offset_ = table_->file_info_.data_end_offset;
return;
}
@ -711,6 +714,7 @@ void PlainTableIterator::SeekForPrev(const Slice& /*target*/) {
assert(false);
status_ =
Status::NotSupported("SeekForPrev() is not supported in PlainTable");
offset_ = next_offset_ = table_->file_info_.data_end_offset;
}
void PlainTableIterator::Next() {

View File

@ -256,7 +256,7 @@ class KeyConvertingIterator : public InternalIterator {
delete iter_;
}
}
virtual bool Valid() const override { return iter_->Valid(); }
virtual bool Valid() const override { return iter_->Valid() && status_.ok(); }
virtual void Seek(const Slice& target) override {
ParsedInternalKey ikey(target, kMaxSequenceNumber, kTypeValue);
std::string encoded;
@ -2368,6 +2368,7 @@ TEST_F(BlockBasedTableTest, BlockCacheLeak) {
iter->Next();
}
ASSERT_OK(iter->status());
iter.reset();
const ImmutableCFOptions ioptions1(opt);
ASSERT_OK(c.Reopen(ioptions1));

View File

@ -47,8 +47,8 @@ class TwoLevelIterator : public InternalIterator {
return second_level_iter_.value();
}
virtual Status status() const override {
// It'd be nice if status() returned a const Status& instead of a Status
if (!first_level_iter_.status().ok()) {
assert(second_level_iter_.iter() == nullptr);
return first_level_iter_.status();
} else if (second_level_iter_.iter() != nullptr &&
!second_level_iter_.status().ok()) {
@ -101,7 +101,7 @@ void TwoLevelIterator::SeekForPrev(const Slice& target) {
second_level_iter_.SeekForPrev(target);
}
if (!Valid()) {
if (!first_level_iter_.Valid()) {
if (!first_level_iter_.Valid() && first_level_iter_.status().ok()) {
first_level_iter_.SeekToLast();
InitDataBlock();
if (second_level_iter_.iter() != nullptr) {
@ -144,8 +144,7 @@ void TwoLevelIterator::Prev() {
void TwoLevelIterator::SkipEmptyDataBlocksForward() {
while (second_level_iter_.iter() == nullptr ||
(!second_level_iter_.Valid() &&
!second_level_iter_.status().IsIncomplete())) {
(!second_level_iter_.Valid() && second_level_iter_.status().ok())) {
// Move to next block
if (!first_level_iter_.Valid()) {
SetSecondLevelIterator(nullptr);
@ -161,8 +160,7 @@ void TwoLevelIterator::SkipEmptyDataBlocksForward() {
void TwoLevelIterator::SkipEmptyDataBlocksBackward() {
while (second_level_iter_.iter() == nullptr ||
(!second_level_iter_.Valid() &&
!second_level_iter_.status().IsIncomplete())) {
(!second_level_iter_.Valid() && second_level_iter_.status().ok())) {
// Move to next block
if (!first_level_iter_.Valid()) {
SetSecondLevelIterator(nullptr);
@ -177,9 +175,6 @@ void TwoLevelIterator::SkipEmptyDataBlocksBackward() {
}
void TwoLevelIterator::SetSecondLevelIterator(InternalIterator* iter) {
if (second_level_iter_.iter() != nullptr) {
SaveError(second_level_iter_.status());
}
InternalIterator* old_iter = second_level_iter_.Set(iter);
delete old_iter;
}

View File

@ -119,6 +119,7 @@ class BlobDBIterator : public Iterator {
TEST_SYNC_POINT("BlobDBIterator::UpdateBlobValue:Start:1");
TEST_SYNC_POINT("BlobDBIterator::UpdateBlobValue:Start:2");
value_.Reset();
status_ = Status::OK();
if (iter_->Valid() && iter_->status().ok() && iter_->IsBlob()) {
Status s = blob_db_->GetBlobValue(iter_->key(), iter_->value(), &value_);
if (s.IsNotFound()) {

View File

@ -79,6 +79,7 @@ class BaseDeltaIterator : public Iterator {
void Next() override {
if (!Valid()) {
status_ = Status::NotSupported("Next() on invalid iterator");
return;
}
if (!forward_) {
@ -114,6 +115,7 @@ class BaseDeltaIterator : public Iterator {
void Prev() override {
if (!Valid()) {
status_ = Status::NotSupported("Prev() on invalid iterator");
return;
}
if (forward_) {
@ -170,6 +172,21 @@ class BaseDeltaIterator : public Iterator {
private:
void AssertInvariants() {
#ifndef NDEBUG
bool not_ok = false;
if (!base_iterator_->status().ok()) {
assert(!base_iterator_->Valid());
not_ok = true;
}
if (!delta_iterator_->status().ok()) {
assert(!delta_iterator_->Valid());
not_ok = true;
}
if (not_ok) {
assert(!Valid());
assert(!status().ok());
return;
}
if (!Valid()) {
return;
}
@ -238,13 +255,25 @@ class BaseDeltaIterator : public Iterator {
void UpdateCurrent() {
// Suppress false positive clang analyzer warnings.
#ifndef __clang_analyzer__
status_ = Status::OK();
while (true) {
WriteEntry delta_entry;
if (DeltaValid()) {
assert(delta_iterator_->status().ok());
delta_entry = delta_iterator_->Entry();
} else if (!delta_iterator_->status().ok()) {
// Expose the error status and stop.
current_at_base_ = false;
return;
}
equal_keys_ = false;
if (!BaseValid()) {
if (!base_iterator_->status().ok()) {
// Expose the error status and stop.
current_at_base_ = true;
return;
}
// Base has finished.
if (!DeltaValid()) {
// Finished