Call PrepareValue on the base iterator in BaseDeltaIterator (#13105)

Summary:
Pull Request resolved: https://github.com/facebook/rocksdb/pull/13105

The `WriteBatchWithIndex::NewIteratorWithBase` interface enables creating a `BaseDeltaIterator` with an arbitrary base iterator passed in by the client, which has potentially been created with the `allow_unprepared_value` read option set. Because of this, `BaseDeltaIterator` has to call `PrepareValue` before using the `value()` or `columns()` from the base iterator. This includes both the case when `BaseDeltaIterator` exposes the `value()` and `columns()` of the base iterator as is and the case when the final `value()` / `columns()` is a result of merging key-values across the base and delta iterators. Note that `BaseDeltaIterator` itself does not support `allow_unprepared_value` yet; this will be implemented in an upcoming patch.

Reviewed By: jowlyzhang

Differential Revision: D65249643

fbshipit-source-id: b0a1ccc0dfd31105b2eef167b463ed15a8bb83b7
This commit is contained in:
Levi Tamasi 2024-10-31 14:20:33 -07:00 committed by Facebook GitHub Bot
parent 1987313a94
commit ef535039f3
3 changed files with 254 additions and 9 deletions

View File

@ -0,0 +1 @@
`BaseDeltaIterator` now calls `PrepareValue` on the base iterator in case it has been created with the `allow_unprepared_value` read option set. Earlier, such base iterators could lead to incorrect values being exposed from `BaseDeltaIterator`.

View File

@ -262,6 +262,17 @@ void BaseDeltaIterator::SetValueAndColumnsFromBase() {
assert(value_.empty());
assert(columns_.empty());
if (!base_iterator_->PrepareValue()) {
assert(!BaseValid());
assert(!base_iterator_->status().ok());
Invalidate(base_iterator_->status());
assert(!Valid());
assert(!status().ok());
return;
}
value_ = base_iterator_->value();
columns_ = base_iterator_->columns();
}
@ -314,6 +325,17 @@ void BaseDeltaIterator::SetValueAndColumnsFromDelta() {
/* result_operand */ nullptr, &result_type);
} else if (delta_entry.type == kMergeRecord) {
if (equal_keys_) {
if (!base_iterator_->PrepareValue()) {
assert(!BaseValid());
assert(!base_iterator_->status().ok());
Invalidate(base_iterator_->status());
assert(!Valid());
assert(!status().ok());
return;
}
if (WideColumnsHelper::HasDefaultColumnOnly(base_iterator_->columns())) {
status_ = WriteBatchWithIndexInternal::MergeKeyWithBaseValue(
column_family_, delta_entry.key, MergeHelper::kPlainBaseValue,

View File

@ -81,73 +81,116 @@ using KVMap = std::map<std::string, std::string>;
class KVIter : public Iterator {
public:
explicit KVIter(const KVMap* map) : map_(map), iter_(map_->end()) {}
explicit KVIter(const KVMap* map, bool allow_unprepared_value = false,
bool fail_prepare_value = false)
: map_(map),
iter_(map_->end()),
allow_unprepared_value_(allow_unprepared_value),
fail_prepare_value_(fail_prepare_value) {}
bool Valid() const override { return iter_ != map_->end(); }
bool Valid() const override { return status_.ok() && iter_ != map_->end(); }
void SeekToFirst() override {
status_ = Status::OK();
Reset();
iter_ = map_->begin();
if (Valid()) {
if (Valid() && !allow_unprepared_value_) {
Update();
}
}
void SeekToLast() override {
status_ = Status::OK();
Reset();
if (map_->empty()) {
iter_ = map_->end();
} else {
iter_ = map_->find(map_->rbegin()->first);
}
if (Valid()) {
if (Valid() && !allow_unprepared_value_) {
Update();
}
}
void Seek(const Slice& k) override {
status_ = Status::OK();
Reset();
iter_ = map_->lower_bound(k.ToString());
if (Valid()) {
if (Valid() && !allow_unprepared_value_) {
Update();
}
}
void SeekForPrev(const Slice& k) override {
status_ = Status::OK();
Reset();
iter_ = map_->upper_bound(k.ToString());
Prev();
if (Valid()) {
if (Valid() && !allow_unprepared_value_) {
Update();
}
}
void Next() override {
Reset();
++iter_;
if (Valid()) {
if (Valid() && !allow_unprepared_value_) {
Update();
}
}
void Prev() override {
Reset();
if (iter_ == map_->begin()) {
iter_ = map_->end();
return;
}
--iter_;
if (Valid()) {
if (Valid() && !allow_unprepared_value_) {
Update();
}
}
bool PrepareValue() override {
assert(Valid());
if (!allow_unprepared_value_) {
return true;
}
if (fail_prepare_value_) {
status_ = Status::Corruption("PrepareValue() failed");
return false;
}
Update();
return true;
}
Slice key() const override { return iter_->first; }
Slice value() const override { return value_; }
const WideColumns& columns() const override { return columns_; }
Status status() const override { return Status::OK(); }
Status status() const override { return status_; }
private:
void Reset() {
value_.clear();
columns_.clear();
}
void Update() {
assert(Valid());
@ -157,8 +200,11 @@ class KVIter : public Iterator {
const KVMap* const map_;
KVMap::const_iterator iter_;
Status status_;
Slice value_;
WideColumns columns_;
bool allow_unprepared_value_;
bool fail_prepare_value_;
};
static std::string PrintContents(WriteBatchWithIndex* batch,
@ -1693,6 +1739,182 @@ TEST_P(WriteBatchWithIndexTest, TestNewIteratorWithBaseFromWbwi) {
ASSERT_OK(iter->status());
}
TEST_P(WriteBatchWithIndexTest, NewIteratorWithBasePrepareValue) {
// BaseDeltaIterator by default should call PrepareValue if it lands on the
// base iterator in case it was created with allow_unprepared_value=true.
// (Note that BaseDeltaIterator itself does not support allow_unprepared_value
// yet but the base iterator might have been created with the flag set.)
ColumnFamilyHandleImplDummy cf1(1, BytewiseComparator());
KVMap map{{"a", "aa"}, {"c", "cc"}, {"e", "ee"}};
ASSERT_OK(batch_->Put(&cf1, "c", "cc1"));
{
std::unique_ptr<Iterator> iter(batch_->NewIteratorWithBase(
&cf1, new KVIter(&map, /* allow_unprepared_value */ true)));
iter->SeekToFirst();
ASSERT_TRUE(iter->Valid());
ASSERT_OK(iter->status());
ASSERT_EQ(iter->key(), "a");
ASSERT_EQ(iter->value(), "aa");
iter->Next();
ASSERT_TRUE(iter->Valid());
ASSERT_OK(iter->status());
ASSERT_EQ(iter->key(), "c");
ASSERT_EQ(iter->value(), "cc1");
iter->Next();
ASSERT_TRUE(iter->Valid());
ASSERT_OK(iter->status());
ASSERT_EQ(iter->key(), "e");
ASSERT_EQ(iter->value(), "ee");
iter->Next();
ASSERT_FALSE(iter->Valid());
ASSERT_OK(iter->status());
iter->SeekToLast();
ASSERT_TRUE(iter->Valid());
ASSERT_OK(iter->status());
ASSERT_EQ(iter->key(), "e");
ASSERT_EQ(iter->value(), "ee");
iter->Prev();
ASSERT_TRUE(iter->Valid());
ASSERT_OK(iter->status());
ASSERT_EQ(iter->key(), "c");
ASSERT_EQ(iter->value(), "cc1");
iter->Prev();
ASSERT_TRUE(iter->Valid());
ASSERT_OK(iter->status());
ASSERT_EQ(iter->key(), "a");
ASSERT_EQ(iter->value(), "aa");
iter->Prev();
ASSERT_FALSE(iter->Valid());
ASSERT_OK(iter->status());
}
// PrepareValue failures from the base iterator should be propagated
{
std::unique_ptr<Iterator> iter(batch_->NewIteratorWithBase(
&cf1, new KVIter(&map, /* allow_unprepared_value */ true,
/* fail_prepare_value */ true)));
iter->SeekToFirst();
ASSERT_FALSE(iter->Valid());
ASSERT_TRUE(iter->status().IsCorruption());
iter->SeekToLast();
ASSERT_FALSE(iter->Valid());
ASSERT_TRUE(iter->status().IsCorruption());
}
}
TEST_P(WriteBatchWithIndexTest, NewIteratorWithBaseMergePrepareValue) {
// When performing a merge across the base and delta iterators,
// BaseDeltaIterator should call PrepareValue on the base iterator in case it
// was created with allow_unprepared_value=true. (Note: we use BlobDB here
// to ensure PrepareValue is not a no-op.)
options_.enable_blob_files = true;
ASSERT_OK(OpenDB());
ASSERT_OK(db_->Put(write_opts_, db_->DefaultColumnFamily(), "a", "aa"));
ASSERT_OK(db_->Put(write_opts_, db_->DefaultColumnFamily(), "c", "cc"));
ASSERT_OK(db_->Put(write_opts_, db_->DefaultColumnFamily(), "e", "ee"));
ASSERT_OK(db_->Flush(FlushOptions(), db_->DefaultColumnFamily()));
ASSERT_OK(batch_->Merge(db_->DefaultColumnFamily(), "a", "aa1"));
ASSERT_OK(batch_->Merge(db_->DefaultColumnFamily(), "c", "cc1"));
ASSERT_OK(batch_->Merge(db_->DefaultColumnFamily(), "e", "ee1"));
ReadOptions db_read_options = read_opts_;
db_read_options.allow_unprepared_value = true;
{
std::unique_ptr<Iterator> iter(batch_->NewIteratorWithBase(
db_->DefaultColumnFamily(),
db_->NewIterator(db_read_options, db_->DefaultColumnFamily()),
&read_opts_));
iter->SeekToFirst();
ASSERT_TRUE(iter->Valid());
ASSERT_OK(iter->status());
ASSERT_EQ(iter->key(), "a");
ASSERT_EQ(iter->value(), "aa,aa1");
iter->Next();
ASSERT_TRUE(iter->Valid());
ASSERT_OK(iter->status());
ASSERT_EQ(iter->key(), "c");
ASSERT_EQ(iter->value(), "cc,cc1");
iter->Next();
ASSERT_TRUE(iter->Valid());
ASSERT_OK(iter->status());
ASSERT_EQ(iter->key(), "e");
ASSERT_EQ(iter->value(), "ee,ee1");
iter->Next();
ASSERT_FALSE(iter->Valid());
ASSERT_OK(iter->status());
iter->SeekToLast();
ASSERT_TRUE(iter->Valid());
ASSERT_OK(iter->status());
ASSERT_EQ(iter->key(), "e");
ASSERT_EQ(iter->value(), "ee,ee1");
iter->Prev();
ASSERT_TRUE(iter->Valid());
ASSERT_OK(iter->status());
ASSERT_EQ(iter->key(), "c");
ASSERT_EQ(iter->value(), "cc,cc1");
iter->Prev();
ASSERT_TRUE(iter->Valid());
ASSERT_OK(iter->status());
ASSERT_EQ(iter->key(), "a");
ASSERT_EQ(iter->value(), "aa,aa1");
iter->Prev();
ASSERT_FALSE(iter->Valid());
ASSERT_OK(iter->status());
}
SyncPoint::GetInstance()->SetCallBack(
"BlobFileReader::GetBlob:TamperWithResult", [](void* arg) {
Slice* const blob_index = static_cast<Slice*>(arg);
assert(blob_index);
assert(!blob_index->empty());
blob_index->remove_prefix(1);
});
SyncPoint::GetInstance()->EnableProcessing();
// PrepareValue failures from the base iterator should be propagated
{
std::unique_ptr<Iterator> iter(batch_->NewIteratorWithBase(
db_->DefaultColumnFamily(),
db_->NewIterator(db_read_options, db_->DefaultColumnFamily()),
&read_opts_));
iter->SeekToFirst();
ASSERT_FALSE(iter->Valid());
ASSERT_TRUE(iter->status().IsCorruption());
iter->SeekToLast();
ASSERT_FALSE(iter->Valid());
ASSERT_TRUE(iter->status().IsCorruption());
}
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
}
TEST_P(WriteBatchWithIndexTest, TestBoundsCheckingInDeltaIterator) {
Status s = OpenDB();
ASSERT_OK(s);