From ef535039f3010cbc2dd0cb3746b737955c55242c Mon Sep 17 00:00:00 2001 From: Levi Tamasi Date: Thu, 31 Oct 2024 14:20:33 -0700 Subject: [PATCH] Call PrepareValue on the base iterator in BaseDeltaIterator (#13105) Summary: Pull Request resolved: https://github.com/facebook/rocksdb/pull/13105 The `WriteBatchWithIndex::NewIteratorWithBase` interface enables creating a `BaseDeltaIterator` with an arbitrary base iterator passed in by the client, which has potentially been created with the `allow_unprepared_value` read option set. Because of this, `BaseDeltaIterator` has to call `PrepareValue` before using the `value()` or `columns()` from the base iterator. This includes both the case when `BaseDeltaIterator` exposes the `value()` and `columns()` of the base iterator as is and the case when the final `value()` / `columns()` is a result of merging key-values across the base and delta iterators. Note that `BaseDeltaIterator` itself does not support `allow_unprepared_value` yet; this will be implemented in an upcoming patch. Reviewed By: jowlyzhang Differential Revision: D65249643 fbshipit-source-id: b0a1ccc0dfd31105b2eef167b463ed15a8bb83b7 --- .../base_delta_iterator_prepare_value.md | 1 + .../write_batch_with_index_internal.cc | 22 ++ .../write_batch_with_index_test.cc | 240 +++++++++++++++++- 3 files changed, 254 insertions(+), 9 deletions(-) create mode 100644 unreleased_history/bug_fixes/base_delta_iterator_prepare_value.md diff --git a/unreleased_history/bug_fixes/base_delta_iterator_prepare_value.md b/unreleased_history/bug_fixes/base_delta_iterator_prepare_value.md new file mode 100644 index 0000000000..3cc28d3c6f --- /dev/null +++ b/unreleased_history/bug_fixes/base_delta_iterator_prepare_value.md @@ -0,0 +1 @@ +`BaseDeltaIterator` now calls `PrepareValue` on the base iterator in case it has been created with the `allow_unprepared_value` read option set. Earlier, such base iterators could lead to incorrect values being exposed from `BaseDeltaIterator`. diff --git a/utilities/write_batch_with_index/write_batch_with_index_internal.cc b/utilities/write_batch_with_index/write_batch_with_index_internal.cc index e2e60ab6df..1e52686c06 100644 --- a/utilities/write_batch_with_index/write_batch_with_index_internal.cc +++ b/utilities/write_batch_with_index/write_batch_with_index_internal.cc @@ -262,6 +262,17 @@ void BaseDeltaIterator::SetValueAndColumnsFromBase() { assert(value_.empty()); assert(columns_.empty()); + if (!base_iterator_->PrepareValue()) { + assert(!BaseValid()); + assert(!base_iterator_->status().ok()); + + Invalidate(base_iterator_->status()); + + assert(!Valid()); + assert(!status().ok()); + return; + } + value_ = base_iterator_->value(); columns_ = base_iterator_->columns(); } @@ -314,6 +325,17 @@ void BaseDeltaIterator::SetValueAndColumnsFromDelta() { /* result_operand */ nullptr, &result_type); } else if (delta_entry.type == kMergeRecord) { if (equal_keys_) { + if (!base_iterator_->PrepareValue()) { + assert(!BaseValid()); + assert(!base_iterator_->status().ok()); + + Invalidate(base_iterator_->status()); + + assert(!Valid()); + assert(!status().ok()); + return; + } + if (WideColumnsHelper::HasDefaultColumnOnly(base_iterator_->columns())) { status_ = WriteBatchWithIndexInternal::MergeKeyWithBaseValue( column_family_, delta_entry.key, MergeHelper::kPlainBaseValue, diff --git a/utilities/write_batch_with_index/write_batch_with_index_test.cc b/utilities/write_batch_with_index/write_batch_with_index_test.cc index d706682a5f..4d07eb60e3 100644 --- a/utilities/write_batch_with_index/write_batch_with_index_test.cc +++ b/utilities/write_batch_with_index/write_batch_with_index_test.cc @@ -81,73 +81,116 @@ using KVMap = std::map; class KVIter : public Iterator { public: - explicit KVIter(const KVMap* map) : map_(map), iter_(map_->end()) {} + explicit KVIter(const KVMap* map, bool allow_unprepared_value = false, + bool fail_prepare_value = false) + : map_(map), + iter_(map_->end()), + allow_unprepared_value_(allow_unprepared_value), + fail_prepare_value_(fail_prepare_value) {} - bool Valid() const override { return iter_ != map_->end(); } + bool Valid() const override { return status_.ok() && iter_ != map_->end(); } void SeekToFirst() override { + status_ = Status::OK(); + Reset(); + iter_ = map_->begin(); - if (Valid()) { + if (Valid() && !allow_unprepared_value_) { Update(); } } void SeekToLast() override { + status_ = Status::OK(); + Reset(); + if (map_->empty()) { iter_ = map_->end(); } else { iter_ = map_->find(map_->rbegin()->first); } - if (Valid()) { + if (Valid() && !allow_unprepared_value_) { Update(); } } void Seek(const Slice& k) override { + status_ = Status::OK(); + Reset(); + iter_ = map_->lower_bound(k.ToString()); - if (Valid()) { + if (Valid() && !allow_unprepared_value_) { Update(); } } void SeekForPrev(const Slice& k) override { + status_ = Status::OK(); + Reset(); + iter_ = map_->upper_bound(k.ToString()); Prev(); - if (Valid()) { + if (Valid() && !allow_unprepared_value_) { Update(); } } void Next() override { + Reset(); + ++iter_; - if (Valid()) { + if (Valid() && !allow_unprepared_value_) { Update(); } } void Prev() override { + Reset(); + if (iter_ == map_->begin()) { iter_ = map_->end(); return; } --iter_; - if (Valid()) { + if (Valid() && !allow_unprepared_value_) { Update(); } } + bool PrepareValue() override { + assert(Valid()); + + if (!allow_unprepared_value_) { + return true; + } + + if (fail_prepare_value_) { + status_ = Status::Corruption("PrepareValue() failed"); + return false; + } + + Update(); + + return true; + } + Slice key() const override { return iter_->first; } Slice value() const override { return value_; } const WideColumns& columns() const override { return columns_; } - Status status() const override { return Status::OK(); } + Status status() const override { return status_; } private: + void Reset() { + value_.clear(); + columns_.clear(); + } + void Update() { assert(Valid()); @@ -157,8 +200,11 @@ class KVIter : public Iterator { const KVMap* const map_; KVMap::const_iterator iter_; + Status status_; Slice value_; WideColumns columns_; + bool allow_unprepared_value_; + bool fail_prepare_value_; }; static std::string PrintContents(WriteBatchWithIndex* batch, @@ -1693,6 +1739,182 @@ TEST_P(WriteBatchWithIndexTest, TestNewIteratorWithBaseFromWbwi) { ASSERT_OK(iter->status()); } +TEST_P(WriteBatchWithIndexTest, NewIteratorWithBasePrepareValue) { + // BaseDeltaIterator by default should call PrepareValue if it lands on the + // base iterator in case it was created with allow_unprepared_value=true. + // (Note that BaseDeltaIterator itself does not support allow_unprepared_value + // yet but the base iterator might have been created with the flag set.) + ColumnFamilyHandleImplDummy cf1(1, BytewiseComparator()); + KVMap map{{"a", "aa"}, {"c", "cc"}, {"e", "ee"}}; + + ASSERT_OK(batch_->Put(&cf1, "c", "cc1")); + + { + std::unique_ptr iter(batch_->NewIteratorWithBase( + &cf1, new KVIter(&map, /* allow_unprepared_value */ true))); + + iter->SeekToFirst(); + ASSERT_TRUE(iter->Valid()); + ASSERT_OK(iter->status()); + ASSERT_EQ(iter->key(), "a"); + ASSERT_EQ(iter->value(), "aa"); + + iter->Next(); + ASSERT_TRUE(iter->Valid()); + ASSERT_OK(iter->status()); + ASSERT_EQ(iter->key(), "c"); + ASSERT_EQ(iter->value(), "cc1"); + + iter->Next(); + ASSERT_TRUE(iter->Valid()); + ASSERT_OK(iter->status()); + ASSERT_EQ(iter->key(), "e"); + ASSERT_EQ(iter->value(), "ee"); + + iter->Next(); + ASSERT_FALSE(iter->Valid()); + ASSERT_OK(iter->status()); + + iter->SeekToLast(); + ASSERT_TRUE(iter->Valid()); + ASSERT_OK(iter->status()); + ASSERT_EQ(iter->key(), "e"); + ASSERT_EQ(iter->value(), "ee"); + + iter->Prev(); + ASSERT_TRUE(iter->Valid()); + ASSERT_OK(iter->status()); + ASSERT_EQ(iter->key(), "c"); + ASSERT_EQ(iter->value(), "cc1"); + + iter->Prev(); + ASSERT_TRUE(iter->Valid()); + ASSERT_OK(iter->status()); + ASSERT_EQ(iter->key(), "a"); + ASSERT_EQ(iter->value(), "aa"); + + iter->Prev(); + ASSERT_FALSE(iter->Valid()); + ASSERT_OK(iter->status()); + } + + // PrepareValue failures from the base iterator should be propagated + { + std::unique_ptr iter(batch_->NewIteratorWithBase( + &cf1, new KVIter(&map, /* allow_unprepared_value */ true, + /* fail_prepare_value */ true))); + + iter->SeekToFirst(); + ASSERT_FALSE(iter->Valid()); + ASSERT_TRUE(iter->status().IsCorruption()); + + iter->SeekToLast(); + ASSERT_FALSE(iter->Valid()); + ASSERT_TRUE(iter->status().IsCorruption()); + } +} + +TEST_P(WriteBatchWithIndexTest, NewIteratorWithBaseMergePrepareValue) { + // When performing a merge across the base and delta iterators, + // BaseDeltaIterator should call PrepareValue on the base iterator in case it + // was created with allow_unprepared_value=true. (Note: we use BlobDB here + // to ensure PrepareValue is not a no-op.) + options_.enable_blob_files = true; + + ASSERT_OK(OpenDB()); + + ASSERT_OK(db_->Put(write_opts_, db_->DefaultColumnFamily(), "a", "aa")); + ASSERT_OK(db_->Put(write_opts_, db_->DefaultColumnFamily(), "c", "cc")); + ASSERT_OK(db_->Put(write_opts_, db_->DefaultColumnFamily(), "e", "ee")); + ASSERT_OK(db_->Flush(FlushOptions(), db_->DefaultColumnFamily())); + + ASSERT_OK(batch_->Merge(db_->DefaultColumnFamily(), "a", "aa1")); + ASSERT_OK(batch_->Merge(db_->DefaultColumnFamily(), "c", "cc1")); + ASSERT_OK(batch_->Merge(db_->DefaultColumnFamily(), "e", "ee1")); + + ReadOptions db_read_options = read_opts_; + db_read_options.allow_unprepared_value = true; + + { + std::unique_ptr iter(batch_->NewIteratorWithBase( + db_->DefaultColumnFamily(), + db_->NewIterator(db_read_options, db_->DefaultColumnFamily()), + &read_opts_)); + + iter->SeekToFirst(); + ASSERT_TRUE(iter->Valid()); + ASSERT_OK(iter->status()); + ASSERT_EQ(iter->key(), "a"); + ASSERT_EQ(iter->value(), "aa,aa1"); + + iter->Next(); + ASSERT_TRUE(iter->Valid()); + ASSERT_OK(iter->status()); + ASSERT_EQ(iter->key(), "c"); + ASSERT_EQ(iter->value(), "cc,cc1"); + + iter->Next(); + ASSERT_TRUE(iter->Valid()); + ASSERT_OK(iter->status()); + ASSERT_EQ(iter->key(), "e"); + ASSERT_EQ(iter->value(), "ee,ee1"); + + iter->Next(); + ASSERT_FALSE(iter->Valid()); + ASSERT_OK(iter->status()); + + iter->SeekToLast(); + ASSERT_TRUE(iter->Valid()); + ASSERT_OK(iter->status()); + ASSERT_EQ(iter->key(), "e"); + ASSERT_EQ(iter->value(), "ee,ee1"); + + iter->Prev(); + ASSERT_TRUE(iter->Valid()); + ASSERT_OK(iter->status()); + ASSERT_EQ(iter->key(), "c"); + ASSERT_EQ(iter->value(), "cc,cc1"); + + iter->Prev(); + ASSERT_TRUE(iter->Valid()); + ASSERT_OK(iter->status()); + ASSERT_EQ(iter->key(), "a"); + ASSERT_EQ(iter->value(), "aa,aa1"); + + iter->Prev(); + ASSERT_FALSE(iter->Valid()); + ASSERT_OK(iter->status()); + } + + SyncPoint::GetInstance()->SetCallBack( + "BlobFileReader::GetBlob:TamperWithResult", [](void* arg) { + Slice* const blob_index = static_cast(arg); + assert(blob_index); + assert(!blob_index->empty()); + blob_index->remove_prefix(1); + }); + SyncPoint::GetInstance()->EnableProcessing(); + + // PrepareValue failures from the base iterator should be propagated + { + std::unique_ptr iter(batch_->NewIteratorWithBase( + db_->DefaultColumnFamily(), + db_->NewIterator(db_read_options, db_->DefaultColumnFamily()), + &read_opts_)); + + iter->SeekToFirst(); + ASSERT_FALSE(iter->Valid()); + ASSERT_TRUE(iter->status().IsCorruption()); + + iter->SeekToLast(); + ASSERT_FALSE(iter->Valid()); + ASSERT_TRUE(iter->status().IsCorruption()); + } + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); +} + TEST_P(WriteBatchWithIndexTest, TestBoundsCheckingInDeltaIterator) { Status s = OpenDB(); ASSERT_OK(s);