Make BaseDeltaIterator honor allow_unprepared_value (#13111)

Summary:
Pull Request resolved: https://github.com/facebook/rocksdb/pull/13111

As a follow-up to https://github.com/facebook/rocksdb/pull/13105, the patch changes `BaseDeltaIterator` so that it honors the read option `allow_unprepared_value`. When the option is set and the `BaseDeltaIterator` lands on the base iterator, it defers calling `PrepareValue` on the base iterator and setting `value()` and `columns()` until `PrepareValue` is called on the `BaseDeltaIterator` itself.

Reviewed By: jowlyzhang

Differential Revision: D65344764

fbshipit-source-id: d79c77b5de7c690bf2deeff435e9b0a9065f6c5c
This commit is contained in:
Levi Tamasi 2024-11-01 14:10:18 -07:00 committed by Facebook GitHub Bot
parent 7c98a2d130
commit 1006eddd63
5 changed files with 134 additions and 8 deletions

View File

@ -0,0 +1 @@
`BaseDeltaIterator` now honors the read option `allow_unprepared_value`.

View File

@ -329,7 +329,8 @@ Iterator* WriteBatchWithIndex::NewIteratorWithBase(
}
return new BaseDeltaIterator(column_family, base_iterator, wbwiii,
GetColumnFamilyUserComparator(column_family));
GetColumnFamilyUserComparator(column_family),
read_options);
}
Iterator* WriteBatchWithIndex::NewIteratorWithBase(Iterator* base_iterator) {
@ -337,7 +338,8 @@ Iterator* WriteBatchWithIndex::NewIteratorWithBase(Iterator* base_iterator) {
auto wbwiii = new WBWIIteratorImpl(0, &(rep->skip_list), &rep->write_batch,
&rep->comparator);
return new BaseDeltaIterator(nullptr, base_iterator, wbwiii,
rep->comparator.default_comparator());
rep->comparator.default_comparator(),
/* read_options */ nullptr);
}
Status WriteBatchWithIndex::Put(ColumnFamilyHandle* column_family,

View File

@ -22,10 +22,13 @@ namespace ROCKSDB_NAMESPACE {
BaseDeltaIterator::BaseDeltaIterator(ColumnFamilyHandle* column_family,
Iterator* base_iterator,
WBWIIteratorImpl* delta_iterator,
const Comparator* comparator)
const Comparator* comparator,
const ReadOptions* read_options)
: forward_(true),
current_at_base_(true),
equal_keys_(false),
allow_unprepared_value_(
read_options ? read_options->allow_unprepared_value : false),
status_(Status::OK()),
column_family_(column_family),
base_iterator_(base_iterator),
@ -424,7 +427,9 @@ void BaseDeltaIterator::UpdateCurrent() {
} else if (!DeltaValid()) {
// Delta has finished.
current_at_base_ = true;
SetValueAndColumnsFromBase();
if (!allow_unprepared_value_) {
SetValueAndColumnsFromBase();
}
return;
} else {
int compare =
@ -448,7 +453,9 @@ void BaseDeltaIterator::UpdateCurrent() {
}
} else {
current_at_base_ = true;
SetValueAndColumnsFromBase();
if (!allow_unprepared_value_) {
SetValueAndColumnsFromBase();
}
return;
}
}

View File

@ -36,7 +36,8 @@ class BaseDeltaIterator : public Iterator {
public:
BaseDeltaIterator(ColumnFamilyHandle* column_family, Iterator* base_iterator,
WBWIIteratorImpl* delta_iterator,
const Comparator* comparator);
const Comparator* comparator,
const ReadOptions* read_options);
~BaseDeltaIterator() override;
@ -47,6 +48,23 @@ class BaseDeltaIterator : public Iterator {
void SeekForPrev(const Slice& k) override;
void Next() override;
void Prev() override;
bool PrepareValue() override {
assert(Valid());
if (!allow_unprepared_value_) {
return true;
}
if (!current_at_base_) {
return true;
}
SetValueAndColumnsFromBase();
return Valid();
}
Slice key() const override;
Slice value() const override { return value_; }
const WideColumns& columns() const override { return columns_; }
@ -69,6 +87,7 @@ class BaseDeltaIterator : public Iterator {
bool forward_;
bool current_at_base_;
bool equal_keys_;
bool allow_unprepared_value_;
Status status_;
ColumnFamilyHandle* column_family_;
std::unique_ptr<Iterator> base_iterator_;

View File

@ -1742,8 +1742,6 @@ TEST_P(WriteBatchWithIndexTest, TestNewIteratorWithBaseFromWbwi) {
TEST_P(WriteBatchWithIndexTest, NewIteratorWithBasePrepareValue) {
// BaseDeltaIterator by default should call PrepareValue if it lands on the
// base iterator in case it was created with allow_unprepared_value=true.
// (Note that BaseDeltaIterator itself does not support allow_unprepared_value
// yet but the base iterator might have been created with the flag set.)
ColumnFamilyHandleImplDummy cf1(1, BytewiseComparator());
KVMap map{{"a", "aa"}, {"c", "cc"}, {"e", "ee"}};
@ -1814,6 +1812,105 @@ TEST_P(WriteBatchWithIndexTest, NewIteratorWithBasePrepareValue) {
}
}
TEST_P(WriteBatchWithIndexTest, NewIteratorWithBaseAllowUnpreparedValue) {
ColumnFamilyHandleImplDummy cf1(1, BytewiseComparator());
KVMap map{{"a", "aa"}, {"c", "cc"}, {"e", "ee"}};
ASSERT_OK(batch_->Put(&cf1, "c", "cc1"));
ReadOptions read_options = read_opts_;
read_options.allow_unprepared_value = true;
{
std::unique_ptr<Iterator> iter(batch_->NewIteratorWithBase(
&cf1, new KVIter(&map, /* allow_unprepared_value */ true),
&read_options));
iter->SeekToFirst();
ASSERT_TRUE(iter->Valid());
ASSERT_OK(iter->status());
ASSERT_EQ(iter->key(), "a");
ASSERT_TRUE(iter->value().empty());
ASSERT_TRUE(iter->PrepareValue());
ASSERT_EQ(iter->value(), "aa");
iter->Next();
ASSERT_TRUE(iter->Valid());
ASSERT_OK(iter->status());
ASSERT_EQ(iter->key(), "c");
ASSERT_EQ(iter->value(), "cc1");
// This key is served out of the delta iterator so this PrepareValue() is a
// no-op
ASSERT_TRUE(iter->PrepareValue());
ASSERT_EQ(iter->value(), "cc1");
iter->Next();
ASSERT_TRUE(iter->Valid());
ASSERT_OK(iter->status());
ASSERT_EQ(iter->key(), "e");
ASSERT_TRUE(iter->value().empty());
ASSERT_TRUE(iter->PrepareValue());
ASSERT_EQ(iter->value(), "ee");
iter->Next();
ASSERT_FALSE(iter->Valid());
ASSERT_OK(iter->status());
iter->SeekToLast();
ASSERT_TRUE(iter->Valid());
ASSERT_OK(iter->status());
ASSERT_EQ(iter->key(), "e");
ASSERT_TRUE(iter->value().empty());
ASSERT_TRUE(iter->PrepareValue());
ASSERT_EQ(iter->value(), "ee");
iter->Prev();
ASSERT_TRUE(iter->Valid());
ASSERT_OK(iter->status());
ASSERT_EQ(iter->key(), "c");
ASSERT_EQ(iter->value(), "cc1");
// This key is served out of the delta iterator so this PrepareValue() is a
// no-op
ASSERT_TRUE(iter->PrepareValue());
ASSERT_EQ(iter->value(), "cc1");
iter->Prev();
ASSERT_TRUE(iter->Valid());
ASSERT_OK(iter->status());
ASSERT_EQ(iter->key(), "a");
ASSERT_TRUE(iter->value().empty());
ASSERT_TRUE(iter->PrepareValue());
ASSERT_EQ(iter->value(), "aa");
iter->Prev();
ASSERT_FALSE(iter->Valid());
ASSERT_OK(iter->status());
}
// PrepareValue failures from the base iterator should be propagated
{
std::unique_ptr<Iterator> iter(batch_->NewIteratorWithBase(
&cf1,
new KVIter(&map, /* allow_unprepared_value */ true,
/* fail_prepare_value */ true),
&read_options));
iter->SeekToFirst();
ASSERT_TRUE(iter->Valid());
ASSERT_OK(iter->status());
ASSERT_FALSE(iter->PrepareValue());
ASSERT_FALSE(iter->Valid());
ASSERT_TRUE(iter->status().IsCorruption());
iter->SeekToLast();
ASSERT_TRUE(iter->Valid());
ASSERT_OK(iter->status());
ASSERT_FALSE(iter->PrepareValue());
ASSERT_FALSE(iter->Valid());
ASSERT_TRUE(iter->status().IsCorruption());
}
}
TEST_P(WriteBatchWithIndexTest, NewIteratorWithBaseMergePrepareValue) {
// When performing a merge across the base and delta iterators,
// BaseDeltaIterator should call PrepareValue on the base iterator in case it