Re-generate WriteEntry on WBWIIterator::Entry()

Summary:
[This is the resubmit of D39813. Tests were failing, so I reverted the diff. I found the bug and I'm now resubmitting]

If we don't do this, any calls to Entry() after WBWI mutation will result in undefined behavior. We need to re-fetch the offset from the skip list and regenerate the new pointer (because string's base pointer can change while mutating).

Test Plan: COMPILE_WITH_ASAN=1 make write_batch_with_index_test && ./write_batch_with_index_test

Reviewers: sdong

Reviewed By: sdong

Subscribers: leveldb, dhruba

Differential Revision: https://reviews.facebook.net/D39897
This commit is contained in:
Igor Canadi 2015-06-10 12:57:38 -07:00
parent d03f110904
commit 821cff114e
4 changed files with 56 additions and 79 deletions

View File

@ -11,6 +11,7 @@
* DB::CompactRange()'s parameter reduce_level is changed to change_level, to allow users to move levels to lower levels if allowed. It can be used to migrate a DB from options.level_compaction_dynamic_level_bytes=false to options.level_compaction_dynamic_level_bytes.true.
* Change default value for options.compaction_filter_factory and options.compaction_filter_factory_v2 to nullptr instead of DefaultCompactionFilterFactory and DefaultCompactionFilterFactoryV2.
* If CancelAllBackgroundWork is called without doing a flush after doing loads with WAL disabled, the changes which haven't been flushed before the call to CancelAllBackgroundWork will be lost.
* WBWIIterator::Entry() now returns WriteEntry instead of `const WriteEntry&`
## 3.11.0 (5/19/2015)
### New Features

View File

@ -55,7 +55,9 @@ class WBWIIterator {
virtual void Prev() = 0;
virtual const WriteEntry& Entry() const = 0;
// the return WriteEntry is only valid until the next mutation of
// WriteBatchWithIndex
virtual WriteEntry Entry() const = 0;
virtual Status status() const = 0;
};

View File

@ -89,7 +89,8 @@ class BaseDeltaIterator : public Iterator {
AdvanceBase();
}
if (DeltaValid() && BaseValid()) {
if (Compare() == 0) {
if (comparator_->Compare(delta_iterator_->Entry().key,
base_iterator_->key()) == 0) {
equal_keys_ = true;
}
}
@ -123,7 +124,8 @@ class BaseDeltaIterator : public Iterator {
AdvanceBase();
}
if (DeltaValid() && BaseValid()) {
if (Compare() == 0) {
if (comparator_->Compare(delta_iterator_->Entry().key,
base_iterator_->key()) == 0) {
equal_keys_ = true;
}
}
@ -153,23 +155,6 @@ class BaseDeltaIterator : public Iterator {
}
private:
// -1 -- delta less advanced than base
// 0 -- delta == base
// 1 -- delta more advanced than base
int Compare() const {
assert(delta_iterator_->Valid() && base_iterator_->Valid());
int cmp = comparator_->Compare(delta_iterator_->Entry().key,
base_iterator_->key());
if (forward_) {
return cmp;
} else {
return -cmp;
}
}
bool IsDeltaDelete() {
assert(DeltaValid());
return delta_iterator_->Entry().type == kDeleteRecord;
}
void AssertInvariants() {
#ifndef NDEBUG
if (!Valid()) {
@ -239,6 +224,10 @@ class BaseDeltaIterator : public Iterator {
bool DeltaValid() const { return delta_iterator_->Valid(); }
void UpdateCurrent() {
while (true) {
WriteEntry delta_entry;
if (DeltaValid()) {
delta_entry = delta_iterator_->Entry();
}
equal_keys_ = false;
if (!BaseValid()) {
// Base has finished.
@ -246,7 +235,7 @@ class BaseDeltaIterator : public Iterator {
// Finished
return;
}
if (IsDeltaDelete()) {
if (delta_entry.type == kDeleteRecord) {
AdvanceDelta();
} else {
current_at_base_ = false;
@ -257,12 +246,14 @@ class BaseDeltaIterator : public Iterator {
current_at_base_ = true;
return;
} else {
int compare = Compare();
int compare =
(forward_ ? 1 : -1) *
comparator_->Compare(delta_entry.key, base_iterator_->key());
if (compare <= 0) { // delta bigger or equal
if (compare == 0) {
equal_keys_ = true;
}
if (!IsDeltaDelete()) {
if (delta_entry.type != kDeleteRecord) {
current_at_base_ = false;
return;
}
@ -300,23 +291,26 @@ class WBWIIteratorImpl : public WBWIIterator {
const ReadableWriteBatch* write_batch)
: column_family_id_(column_family_id),
skip_list_iter_(skip_list),
write_batch_(write_batch),
valid_(false) {}
write_batch_(write_batch) {}
virtual ~WBWIIteratorImpl() {}
virtual bool Valid() const override { return valid_; }
virtual bool Valid() const override {
if (!skip_list_iter_.Valid()) {
return false;
}
const WriteBatchIndexEntry* iter_entry = skip_list_iter_.key();
return (iter_entry != nullptr &&
iter_entry->column_family == column_family_id_);
}
virtual void SeekToFirst() override {
valid_ = true;
WriteBatchIndexEntry search_entry(WriteBatchIndexEntry::kFlagMin,
column_family_id_);
skip_list_iter_.Seek(&search_entry);
ReadEntry();
}
virtual void SeekToLast() override {
valid_ = true;
WriteBatchIndexEntry search_entry(WriteBatchIndexEntry::kFlagMin,
column_family_id_ + 1);
skip_list_iter_.Seek(&search_entry);
@ -325,30 +319,38 @@ class WBWIIteratorImpl : public WBWIIterator {
} else {
skip_list_iter_.Prev();
}
ReadEntry();
}
virtual void Seek(const Slice& key) override {
valid_ = true;
WriteBatchIndexEntry search_entry(&key, column_family_id_);
skip_list_iter_.Seek(&search_entry);
ReadEntry();
}
virtual void Next() override {
skip_list_iter_.Next();
ReadEntry();
virtual void Next() override { skip_list_iter_.Next(); }
virtual void Prev() override { skip_list_iter_.Prev(); }
virtual WriteEntry Entry() const override {
WriteEntry ret;
Slice blob;
const WriteBatchIndexEntry* iter_entry = skip_list_iter_.key();
// this is guaranteed with Valid()
assert(iter_entry != nullptr &&
iter_entry->column_family == column_family_id_);
auto s = write_batch_->GetEntryFromDataOffset(iter_entry->offset, &ret.type,
&ret.key, &ret.value, &blob);
assert(s.ok());
assert(ret.type == kPutRecord || ret.type == kDeleteRecord ||
ret.type == kMergeRecord);
return ret;
}
virtual void Prev() override {
skip_list_iter_.Prev();
ReadEntry();
virtual Status status() const override {
// this is in-memory data structure, so the only way status can be non-ok is
// through memory corruption
return Status::OK();
}
virtual const WriteEntry& Entry() const override { return current_; }
virtual Status status() const override { return status_; }
const WriteBatchIndexEntry* GetRawEntry() const {
return skip_list_iter_.key();
}
@ -357,33 +359,6 @@ class WBWIIteratorImpl : public WBWIIterator {
uint32_t column_family_id_;
WriteBatchEntrySkipList::Iterator skip_list_iter_;
const ReadableWriteBatch* write_batch_;
Status status_;
bool valid_;
WriteEntry current_;
void ReadEntry() {
if (!status_.ok() || !skip_list_iter_.Valid()) {
valid_ = false;
return;
}
const WriteBatchIndexEntry* iter_entry = skip_list_iter_.key();
if (iter_entry == nullptr ||
iter_entry->column_family != column_family_id_) {
valid_ = false;
return;
}
Slice blob;
status_ = write_batch_->GetEntryFromDataOffset(
iter_entry->offset, &current_.type, &current_.key, &current_.value,
&blob);
if (!status_.ok()) {
valid_ = false;
} else if (current_.type != kPutRecord && current_.type != kDeleteRecord &&
current_.type != kMergeRecord) {
valid_ = false;
status_ = Status::Corruption("write batch index is corrupted");
}
}
};
struct WriteBatchWithIndex::Rep {

View File

@ -11,6 +11,7 @@
#include <memory>
#include <map>
#include "db/column_family.h"
#include "port/stack_trace.h"
#include "rocksdb/utilities/write_batch_with_index.h"
#include "util/string_util.h"
#include "util/testharness.h"
@ -103,7 +104,7 @@ void TestValueAsSecondaryIndexHelper(std::vector<Entry> entries,
std::unique_ptr<WBWIIterator> iter(batch->NewIterator(&data));
iter->Seek(e.key);
ASSERT_OK(iter->status());
auto& write_entry = iter->Entry();
auto write_entry = iter->Entry();
ASSERT_EQ(e.key, write_entry.key.ToString());
ASSERT_EQ(e.value, write_entry.value.ToString());
batch->Delete(&data, e.key);
@ -124,7 +125,7 @@ void TestValueAsSecondaryIndexHelper(std::vector<Entry> entries,
for (auto v : pair.second) {
ASSERT_OK(iter->status());
ASSERT_TRUE(iter->Valid());
auto& write_entry = iter->Entry();
auto write_entry = iter->Entry();
ASSERT_EQ(pair.first, write_entry.key.ToString());
ASSERT_EQ(v->type, write_entry.type);
if (write_entry.type != kDeleteRecord) {
@ -140,7 +141,7 @@ void TestValueAsSecondaryIndexHelper(std::vector<Entry> entries,
for (auto v = pair->second.rbegin(); v != pair->second.rend(); v++) {
ASSERT_OK(iter->status());
ASSERT_TRUE(iter->Valid());
auto& write_entry = iter->Entry();
auto write_entry = iter->Entry();
ASSERT_EQ(pair->first, write_entry.key.ToString());
ASSERT_EQ((*v)->type, write_entry.type);
if (write_entry.type != kDeleteRecord) {
@ -165,7 +166,7 @@ void TestValueAsSecondaryIndexHelper(std::vector<Entry> entries,
for (auto v : pair.second) {
ASSERT_OK(iter->status());
ASSERT_TRUE(iter->Valid());
auto& write_entry = iter->Entry();
auto write_entry = iter->Entry();
ASSERT_EQ(pair.first, write_entry.key.ToString());
if (v->type != kDeleteRecord) {
ASSERT_EQ(v->key, write_entry.value.ToString());
@ -182,7 +183,7 @@ void TestValueAsSecondaryIndexHelper(std::vector<Entry> entries,
for (auto v = pair->second.rbegin(); v != pair->second.rend(); v++) {
ASSERT_OK(iter->status());
ASSERT_TRUE(iter->Valid());
auto& write_entry = iter->Entry();
auto write_entry = iter->Entry();
ASSERT_EQ(pair->first, write_entry.key.ToString());
if ((*v)->type != kDeleteRecord) {
ASSERT_EQ((*v)->key, write_entry.value.ToString());
@ -204,7 +205,7 @@ void TestValueAsSecondaryIndexHelper(std::vector<Entry> entries,
ASSERT_OK(iter->status());
for (auto v : pair->second) {
ASSERT_TRUE(iter->Valid());
auto& write_entry = iter->Entry();
auto write_entry = iter->Entry();
ASSERT_EQ(pair->first, write_entry.key.ToString());
ASSERT_EQ(v->type, write_entry.type);
if (write_entry.type != kDeleteRecord) {
@ -226,7 +227,7 @@ void TestValueAsSecondaryIndexHelper(std::vector<Entry> entries,
ASSERT_OK(iter->status());
for (auto v : pair->second) {
ASSERT_TRUE(iter->Valid());
auto& write_entry = iter->Entry();
auto write_entry = iter->Entry();
ASSERT_EQ(pair->first, write_entry.key.ToString());
ASSERT_EQ(v->value, write_entry.key.ToString());
if (v->type != kDeleteRecord) {
@ -1268,9 +1269,6 @@ TEST_F(WriteBatchWithIndexTest, MutateWhileIteratingBaseCorrectnessTest) {
AssertIterKey("mm", iter.get());
AssertIterValue("kk", iter.get());
batch.Delete("mm");
// still mm even though it's deleted
AssertIterKey("mm", iter.get());
AssertIterValue("kk", iter.get());
iter->Next();
AssertIterKey("n", iter.get());
iter->Prev();
@ -1368,6 +1366,7 @@ TEST_F(WriteBatchWithIndexTest, MutateWhileIteratingBaseStressTest) {
} // namespace
int main(int argc, char** argv) {
rocksdb::port::InstallStackTraceHandler();
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}