Some small improvements around allow_unprepared_value and multi-CF iterators (#13113)

Summary:
Pull Request resolved: https://github.com/facebook/rocksdb/pull/13113

The patch makes some small improvements related to `allow_unprepared_value` and multi-CF iterators as groundwork for further changes:

1) Similarly to `BaseDeltaIterator`'s base iterator, `MultiCfIteratorImpl` gets passed its child iterators by the client. Even though they are currently guaranteed to have been created using the same read options as each other and the multi-CF iterator, it is safer to not assume this and call `PrepareValue` unconditionally before using any child iterator's `value()` or `columns()`.
2) Again similarly to `BaseDeltaIterator`, it makes sense to pass the entire `ReadOptions` structure to `MultiCfIteratorImpl` in case it turns out to require other read options in the future.
3) The constructors of the various multi-CF iterator classes now take an rvalue reference to a vector of column family handle + `unique_ptr` to child iterator pairs and use move semantics to take ownership of this vector (instead of taking two separate vectors of column family handles and raw iterator pointers).
4) Constructor arguments and the members of `MultiCfIteratorImpl` are reordered for consistency.

Reviewed By: jowlyzhang

Differential Revision: D65407521

fbshipit-source-id: 66c2c689ec8b036740bd98641b7b5c0ff7e777f2
This commit is contained in:
Levi Tamasi 2024-11-04 18:06:07 -08:00 committed by Facebook GitHub Bot
parent e7ffca9493
commit 3becc9409e
4 changed files with 73 additions and 72 deletions

View File

@ -13,11 +13,11 @@ namespace ROCKSDB_NAMESPACE {
class AttributeGroupIteratorImpl : public AttributeGroupIterator {
public:
AttributeGroupIteratorImpl(
const Comparator* comparator, bool allow_unprepared_value,
const std::vector<ColumnFamilyHandle*>& column_families,
const std::vector<Iterator*>& child_iterators)
: impl_(comparator, allow_unprepared_value, column_families,
child_iterators, ResetFunc(this), PopulateFunc(this)) {}
const ReadOptions& read_options, const Comparator* comparator,
std::vector<std::pair<ColumnFamilyHandle*, std::unique_ptr<Iterator>>>&&
cfh_iter_pairs)
: impl_(read_options, comparator, std::move(cfh_iter_pairs),
ResetFunc(this), PopulateFunc(this)) {}
~AttributeGroupIteratorImpl() override {}
// No copy allowed

View File

@ -12,11 +12,12 @@ namespace ROCKSDB_NAMESPACE {
// EXPERIMENTAL
class CoalescingIterator : public Iterator {
public:
CoalescingIterator(const Comparator* comparator, bool allow_unprepared_value,
const std::vector<ColumnFamilyHandle*>& column_families,
const std::vector<Iterator*>& child_iterators)
: impl_(comparator, allow_unprepared_value, column_families,
child_iterators, ResetFunc(this), PopulateFunc(this)) {}
CoalescingIterator(
const ReadOptions& read_options, const Comparator* comparator,
std::vector<std::pair<ColumnFamilyHandle*, std::unique_ptr<Iterator>>>&&
cfh_iter_pairs)
: impl_(read_options, comparator, std::move(cfh_iter_pairs),
ResetFunc(this), PopulateFunc(this)) {}
~CoalescingIterator() override {}
// No copy allowed

View File

@ -3998,14 +3998,26 @@ std::unique_ptr<IterType> DBImpl::NewMultiCfIterator(
"Different comparators are being used across CFs"));
}
}
std::vector<Iterator*> child_iterators;
Status s = NewIterators(_read_options, column_families, &child_iterators);
if (!s.ok()) {
return error_iterator_func(s);
}
return std::make_unique<ImplType>(
column_families[0]->GetComparator(), _read_options.allow_unprepared_value,
column_families, std::move(child_iterators));
assert(column_families.size() == child_iterators.size());
std::vector<std::pair<ColumnFamilyHandle*, std::unique_ptr<Iterator>>>
cfh_iter_pairs;
cfh_iter_pairs.reserve(column_families.size());
for (size_t i = 0; i < column_families.size(); ++i) {
cfh_iter_pairs.emplace_back(column_families[i],
std::unique_ptr<Iterator>(child_iterators[i]));
}
return std::make_unique<ImplType>(_read_options,
column_families[0]->GetComparator(),
std::move(cfh_iter_pairs));
}
Status DBImpl::NewIterators(

View File

@ -24,24 +24,18 @@ struct MultiCfIteratorInfo {
template <typename ResetFunc, typename PopulateFunc>
class MultiCfIteratorImpl {
public:
MultiCfIteratorImpl(const Comparator* comparator, bool allow_unprepared_value,
const std::vector<ColumnFamilyHandle*>& column_families,
const std::vector<Iterator*>& child_iterators,
MultiCfIteratorImpl(
const ReadOptions& read_options, const Comparator* comparator,
std::vector<std::pair<ColumnFamilyHandle*, std::unique_ptr<Iterator>>>&&
cfh_iter_pairs,
ResetFunc reset_func, PopulateFunc populate_func)
: comparator_(comparator),
allow_unprepared_value_(allow_unprepared_value),
heap_(MultiCfMinHeap(
MultiCfHeapItemComparator<std::greater<int>>(comparator_))),
: allow_unprepared_value_(read_options.allow_unprepared_value),
comparator_(comparator),
cfh_iter_pairs_(std::move(cfh_iter_pairs)),
reset_func_(std::move(reset_func)),
populate_func_(std::move(populate_func)) {
assert(column_families.size() > 0 &&
column_families.size() == child_iterators.size());
cfh_iter_pairs_.reserve(column_families.size());
for (size_t i = 0; i < column_families.size(); ++i) {
cfh_iter_pairs_.emplace_back(
column_families[i], std::unique_ptr<Iterator>(child_iterators[i]));
}
}
populate_func_(std::move(populate_func)),
heap_(MultiCfMinHeap(
MultiCfHeapItemComparator<std::greater<int>>(comparator_))) {}
~MultiCfIteratorImpl() { status_.PermitUncheckedError(); }
// No copy allowed
@ -108,38 +102,21 @@ class MultiCfIteratorImpl {
return true;
}
auto prepare_value_func = [this](auto& heap, Iterator* iterator) {
assert(iterator);
assert(iterator->Valid());
assert(iterator->status().ok());
if (!iterator->PrepareValue()) {
assert(!iterator->Valid());
assert(!iterator->status().ok());
considerStatus(iterator->status());
assert(!status_.ok());
heap.clear();
return false;
}
return true;
};
if (std::holds_alternative<MultiCfMaxHeap>(heap_)) {
return PopulateIterator(std::get<MultiCfMaxHeap>(heap_),
prepare_value_func);
return PopulateIterator(std::get<MultiCfMaxHeap>(heap_));
}
return PopulateIterator(std::get<MultiCfMinHeap>(heap_),
prepare_value_func);
return PopulateIterator(std::get<MultiCfMinHeap>(heap_));
}
private:
Status status_;
bool allow_unprepared_value_;
const Comparator* comparator_;
std::vector<std::pair<ColumnFamilyHandle*, std::unique_ptr<Iterator>>>
cfh_iter_pairs_;
Status status_;
ResetFunc reset_func_;
PopulateFunc populate_func_;
template <typename CompareOp>
class MultiCfHeapItemComparator {
@ -161,9 +138,6 @@ class MultiCfIteratorImpl {
const Comparator* comparator_;
};
const Comparator* comparator_;
bool allow_unprepared_value_;
using MultiCfMinHeap =
BinaryHeap<MultiCfIteratorInfo,
MultiCfHeapItemComparator<std::greater<int>>>;
@ -174,9 +148,6 @@ class MultiCfIteratorImpl {
MultiCfIterHeap heap_;
ResetFunc reset_func_;
PopulateFunc populate_func_;
Iterator* current() const {
if (std::holds_alternative<MultiCfMaxHeap>(heap_)) {
auto& max_heap = std::get<MultiCfMaxHeap>(heap_);
@ -230,10 +201,8 @@ class MultiCfIteratorImpl {
++i;
}
if (!allow_unprepared_value_ && !heap.empty()) {
[[maybe_unused]] const bool result = PopulateIterator(
heap,
[](auto& /* heap */, Iterator* /* iterator */) { return true; });
assert(result);
[[maybe_unused]] const bool result = PopulateIterator(heap);
assert(result || (!Valid() && !status_.ok()));
}
}
@ -300,15 +269,13 @@ class MultiCfIteratorImpl {
}
if (!allow_unprepared_value_ && !heap.empty()) {
[[maybe_unused]] const bool result = PopulateIterator(
heap,
[](auto& /* heap */, Iterator* /* iterator */) { return true; });
assert(result);
[[maybe_unused]] const bool result = PopulateIterator(heap);
assert(result || (!Valid() && !status_.ok()));
}
}
template <typename BinaryHeap, typename PrepareValueFunc>
bool PopulateIterator(BinaryHeap& heap, PrepareValueFunc prepare_value_func) {
template <typename BinaryHeap>
bool PopulateIterator(BinaryHeap& heap) {
// 1. Keep the top iterator (by popping it from the heap) and add it to list
// to populate
// 2. For all non-top iterators having the same key as top iter popped
@ -319,12 +286,33 @@ class MultiCfIteratorImpl {
// collected in step 1 and 2 and add all the iters back to the heap
assert(!heap.empty());
auto prepare_value = [this, &heap](Iterator* iterator) {
assert(iterator);
assert(iterator->Valid());
assert(iterator->status().ok());
if (!iterator->PrepareValue()) {
assert(!iterator->Valid());
assert(!iterator->status().ok());
considerStatus(iterator->status());
heap.clear();
assert(!Valid());
assert(!status_.ok());
return false;
}
return true;
};
auto top = heap.top();
assert(top.iterator);
assert(top.iterator->Valid());
assert(top.iterator->status().ok());
if (!prepare_value_func(heap, top.iterator)) {
if (!prepare_value(top.iterator)) {
return false;
}
@ -344,7 +332,7 @@ class MultiCfIteratorImpl {
break;
}
if (!prepare_value_func(heap, current.iterator)) {
if (!prepare_value(current.iterator)) {
return false;
}