MultiCfIterator - AttributeGroupIter Impl & CoalescingIter Optimization (#12534)

Summary:
Continuing from the previous MultiCfIterator Implementations - (https://github.com/facebook/rocksdb/issues/12422, https://github.com/facebook/rocksdb/issues/12480 #12465), this PR completes the `AttributeGroupIterator` by implementing `AttributeGroupIteratorImpl::AddToAttributeGroups()`. While implementing the `AttributeGroupIterator`, we had to make some changes in `MultiCfIteratorImpl` and found an opportunity to improve `Coalesce()` in `CoalescingIterator`.

Lifting `UNDER CONSTRUCTION - DO NOT USE` comment by replacing it with `EXPERIMENTAL`

Here are some implementation details:
- `IteratorAttributeGroups` is introduced to avoid having to copy all `WideColumn` objects during iteration.
- `PopulateIterator()` no longer advances non-top iterators that have the same key as the top iterator in the heap.
- `AdvanceIterator()` needs to advance the non-top iterators when they have the same key as the top iterator in the heap.
- Instead of populating one by one, `PopulateIterator()` now collects all items with the same key and calls `populate_func(items)` at once.
- This allowed optimization in `Coalesce()` such that we no longer do K-1 rounds of 2-way merge, but do one K-way merge instead.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/12534

Test Plan:
Uncommented the assertions in `verifyAttributeGroupIterator()`

```
./multi_cf_iterator_test
```

Reviewed By: ltamasi

Differential Revision: D56089019

Pulled By: jaykorean

fbshipit-source-id: 6b0b4247e221f69b40b147d41492008cc9b15054
This commit is contained in:
Jay Huh 2024-04-16 08:45:38 -07:00 committed by Facebook GitHub Bot
parent d41e568b1c
commit d34712e0ac
11 changed files with 196 additions and 127 deletions

View File

@ -8,10 +8,13 @@
namespace ROCKSDB_NAMESPACE {
const AttributeGroups kNoAttributeGroups;
const IteratorAttributeGroups kNoIteratorAttributeGroups;
void AttributeGroupIteratorImpl::AddToAttributeGroups(
ColumnFamilyHandle* /*cfh*/, const WideColumns& /*columns*/) {
// TODO - Implement AttributeGroup population
const autovector<MultiCfIteratorInfo>& items) {
for (const auto& item : items) {
attribute_groups_.emplace_back(item.cfh, &item.iterator->columns());
}
}
} // namespace ROCKSDB_NAMESPACE

View File

@ -18,8 +18,8 @@ class AttributeGroupIteratorImpl : public AttributeGroupIterator {
const std::vector<Iterator*>& child_iterators)
: impl_(
comparator, column_families, child_iterators, [this]() { Reset(); },
[this](ColumnFamilyHandle* cfh, Iterator* iter) {
AddToAttributeGroups(cfh, iter->columns());
[this](const autovector<MultiCfIteratorInfo>& items) {
AddToAttributeGroups(items);
}) {}
~AttributeGroupIteratorImpl() override {}
@ -38,7 +38,7 @@ class AttributeGroupIteratorImpl : public AttributeGroupIterator {
Slice key() const override { return impl_.key(); }
Status status() const override { return impl_.status(); }
const AttributeGroups& attribute_groups() const override {
const IteratorAttributeGroups& attribute_groups() const override {
assert(Valid());
return attribute_groups_;
}
@ -47,9 +47,8 @@ class AttributeGroupIteratorImpl : public AttributeGroupIterator {
private:
MultiCfIteratorImpl impl_;
AttributeGroups attribute_groups_;
void AddToAttributeGroups(ColumnFamilyHandle* cfh,
const WideColumns& columns);
IteratorAttributeGroups attribute_groups_;
void AddToAttributeGroups(const autovector<MultiCfIteratorInfo>& items);
};
class EmptyAttributeGroupIterator : public AttributeGroupIterator {
@ -68,8 +67,8 @@ class EmptyAttributeGroupIterator : public AttributeGroupIterator {
}
Status status() const override { return status_; }
const AttributeGroups& attribute_groups() const override {
return kNoAttributeGroups;
const IteratorAttributeGroups& attribute_groups() const override {
return kNoIteratorAttributeGroups;
}
private:

View File

@ -9,35 +9,35 @@
namespace ROCKSDB_NAMESPACE {
void CoalescingIterator::Coalesce(const WideColumns& columns) {
WideColumns coalesced;
coalesced.reserve(wide_columns_.size() + columns.size());
auto base_col_iter = wide_columns_.begin();
auto new_col_iter = columns.begin();
while (base_col_iter != wide_columns_.end() &&
new_col_iter != columns.end()) {
auto comparison = base_col_iter->name().compare(new_col_iter->name());
if (comparison < 0) {
coalesced.push_back(*base_col_iter);
++base_col_iter;
} else if (comparison > 0) {
coalesced.push_back(*new_col_iter);
++new_col_iter;
} else {
coalesced.push_back(*new_col_iter);
++new_col_iter;
++base_col_iter;
void CoalescingIterator::Coalesce(
const autovector<MultiCfIteratorInfo>& items) {
assert(wide_columns_.empty());
MinHeap heap;
for (const auto& item : items) {
assert(item.iterator);
for (auto& column : item.iterator->columns()) {
heap.push(WideColumnWithOrder{&column, item.order});
}
}
while (base_col_iter != wide_columns_.end()) {
coalesced.push_back(*base_col_iter);
++base_col_iter;
if (heap.empty()) {
return;
}
while (new_col_iter != columns.end()) {
coalesced.push_back(*new_col_iter);
++new_col_iter;
wide_columns_.reserve(heap.size());
auto current = heap.top();
heap.pop();
while (!heap.empty()) {
int comparison = current.column->name().compare(heap.top().column->name());
if (comparison < 0) {
wide_columns_.push_back(*current.column);
} else if (comparison > 0) {
// Shouldn't reach here.
// Current item in the heap is greater than the top item in the min heap
assert(false);
}
current = heap.top();
heap.pop();
}
wide_columns_.swap(coalesced);
wide_columns_.push_back(*current.column);
if (WideColumnsHelper::HasDefaultColumn(wide_columns_)) {
value_ = WideColumnsHelper::GetDefaultColumn(wide_columns_);

View File

@ -9,7 +9,7 @@
namespace ROCKSDB_NAMESPACE {
// UNDER CONSTRUCTION - DO NOT USE
// EXPERIMENTAL
class CoalescingIterator : public Iterator {
public:
CoalescingIterator(const Comparator* comparator,
@ -17,8 +17,8 @@ class CoalescingIterator : public Iterator {
const std::vector<Iterator*>& child_iterators)
: impl_(
comparator, column_families, child_iterators, [this]() { Reset(); },
[this](ColumnFamilyHandle*, Iterator* iter) {
Coalesce(iter->columns());
[this](const autovector<MultiCfIteratorInfo>& items) {
Coalesce(items);
}) {}
~CoalescingIterator() override {}
@ -55,7 +55,25 @@ class CoalescingIterator : public Iterator {
Slice value_;
WideColumns wide_columns_;
void Coalesce(const WideColumns& columns);
struct WideColumnWithOrder {
const WideColumn* column;
int order;
};
class WideColumnWithOrderComparator {
public:
explicit WideColumnWithOrderComparator() {}
bool operator()(const WideColumnWithOrder& a,
const WideColumnWithOrder& b) const {
int c = a.column->name().compare(b.column->name());
return c == 0 ? a.order - b.order > 0 : c > 0;
}
};
using MinHeap =
BinaryHeap<WideColumnWithOrder, WideColumnWithOrderComparator>;
void Coalesce(const autovector<MultiCfIteratorInfo>& items);
};
} // namespace ROCKSDB_NAMESPACE

View File

@ -352,12 +352,12 @@ class DBImpl : public DB {
const Snapshot* GetSnapshot() override;
void ReleaseSnapshot(const Snapshot* snapshot) override;
// UNDER CONSTRUCTION - DO NOT USE
// EXPERIMENTAL
std::unique_ptr<Iterator> NewCoalescingIterator(
const ReadOptions& options,
const std::vector<ColumnFamilyHandle*>& column_families) override;
// UNDER CONSTRUCTION - DO NOT USE
// EXPERIMENTAL
std::unique_ptr<AttributeGroupIterator> NewAttributeGroupIterator(
const ReadOptions& options,
const std::vector<ColumnFamilyHandle*>& column_families) override;

View File

@ -26,6 +26,7 @@
#endif
#include "cache/lru_cache.h"
#include "db/attribute_group_iterator_impl.h"
#include "db/blob/blob_index.h"
#include "db/blob/blob_log_format.h"
#include "db/db_impl/db_impl.h"
@ -3199,18 +3200,18 @@ class ModelDB : public DB {
return Status::NotSupported("Not supported yet");
}
// UNDER CONSTRUCTION - DO NOT USE
std::unique_ptr<Iterator> NewCoalescingIterator(
const ReadOptions& /*options*/,
const std::vector<ColumnFamilyHandle*>& /*column_families*/) override {
return nullptr;
return std::unique_ptr<Iterator>(
NewErrorIterator(Status::NotSupported("Not supported yet")));
}
// UNDER CONSTRUCTION - DO NOT USE
std::unique_ptr<AttributeGroupIterator> NewAttributeGroupIterator(
const ReadOptions& /*options*/,
const std::vector<ColumnFamilyHandle*>& /*column_families*/) override {
return nullptr;
return NewAttributeGroupErrorIterator(
Status::NotSupported("Not supported yet"));
}
const Snapshot* GetSnapshot() override {

View File

@ -15,6 +15,12 @@
namespace ROCKSDB_NAMESPACE {
struct MultiCfIteratorInfo {
ColumnFamilyHandle* cfh;
Iterator* iterator;
int order;
};
class MultiCfIteratorImpl {
public:
MultiCfIteratorImpl(
@ -22,7 +28,7 @@ class MultiCfIteratorImpl {
const std::vector<ColumnFamilyHandle*>& column_families,
const std::vector<Iterator*>& child_iterators,
std::function<void()> reset_func,
std::function<void(ColumnFamilyHandle*, Iterator*)> populate_func)
std::function<void(const autovector<MultiCfIteratorInfo>&)> populate_func)
: comparator_(comparator),
heap_(MultiCfMinHeap(
MultiCfHeapItemComparator<std::greater<int>>(comparator_))),
@ -60,27 +66,20 @@ class MultiCfIteratorImpl {
void SeekToFirst() {
auto& min_heap = GetHeap<MultiCfMinHeap>([this]() { InitMinHeap(); });
SeekCommon(
min_heap, [](Iterator* iter) { iter->SeekToFirst(); },
[](Iterator* iter) { iter->Next(); });
SeekCommon(min_heap, [](Iterator* iter) { iter->SeekToFirst(); });
}
void Seek(const Slice& target) {
auto& min_heap = GetHeap<MultiCfMinHeap>([this]() { InitMinHeap(); });
SeekCommon(
min_heap, [&target](Iterator* iter) { iter->Seek(target); },
[](Iterator* iter) { iter->Next(); });
SeekCommon(min_heap, [&target](Iterator* iter) { iter->Seek(target); });
}
void SeekToLast() {
auto& max_heap = GetHeap<MultiCfMaxHeap>([this]() { InitMaxHeap(); });
SeekCommon(
max_heap, [](Iterator* iter) { iter->SeekToLast(); },
[](Iterator* iter) { iter->Prev(); });
SeekCommon(max_heap, [](Iterator* iter) { iter->SeekToLast(); });
}
void SeekForPrev(const Slice& target) {
auto& max_heap = GetHeap<MultiCfMaxHeap>([this]() { InitMaxHeap(); });
SeekCommon(
max_heap, [&target](Iterator* iter) { iter->SeekForPrev(target); },
[](Iterator* iter) { iter->Prev(); });
SeekCommon(max_heap,
[&target](Iterator* iter) { iter->SeekForPrev(target); });
}
void Next() {
@ -107,11 +106,6 @@ class MultiCfIteratorImpl {
cfh_iter_pairs_;
Status status_;
struct MultiCfIteratorInfo {
Iterator* iterator;
int order;
};
template <typename CompareOp>
class MultiCfHeapItemComparator {
public:
@ -143,7 +137,7 @@ class MultiCfIteratorImpl {
MultiCfIterHeap heap_;
std::function<void()> reset_func_;
std::function<void(ColumnFamilyHandle*, Iterator*)> populate_func_;
std::function<void(autovector<MultiCfIteratorInfo>)> populate_func_;
// TODO: Lower and Upper bounds
@ -179,19 +173,16 @@ class MultiCfIteratorImpl {
MultiCfHeapItemComparator<std::less<int>>(comparator_));
}
template <typename BinaryHeap, typename ChildSeekFuncType,
typename AdvanceFuncType>
void SeekCommon(BinaryHeap& heap, ChildSeekFuncType child_seek_func,
AdvanceFuncType advance_func) {
template <typename BinaryHeap, typename ChildSeekFuncType>
void SeekCommon(BinaryHeap& heap, ChildSeekFuncType child_seek_func) {
reset_func_();
heap.clear();
int i = 0;
for (auto& cfh_iter_pair : cfh_iter_pairs_) {
auto& iter = cfh_iter_pair.second;
for (auto& [cfh, iter] : cfh_iter_pairs_) {
child_seek_func(iter.get());
if (iter->Valid()) {
assert(iter->status().ok());
heap.push(MultiCfIteratorInfo{iter.get(), i});
heap.push(MultiCfIteratorInfo{cfh, iter.get(), i});
} else {
considerStatus(iter->status());
if (!status_.ok()) {
@ -203,7 +194,7 @@ class MultiCfIteratorImpl {
++i;
}
if (!heap.empty()) {
PopulateIterator(heap, advance_func);
PopulateIterator(heap);
}
}
@ -212,66 +203,88 @@ class MultiCfIteratorImpl {
assert(!heap.empty());
reset_func_();
// Because PopulateIterator() advances the same key in all non-top
// iterators, we do not have to do that here again. Just advance the top
// iterator and re-heapify
// 1. Keep the top iterator (by popping it from the heap)
// 2. Make sure all others have iterated past the top iterator key slice
// 3. Advance the top iterator, and add it back to the heap if valid
auto top = heap.top();
advance_func(top.iterator);
if (top.iterator->Valid()) {
assert(top.iterator->status().ok());
heap.replace_top(top);
} else {
considerStatus(top.iterator->status());
if (!status_.ok()) {
heap.clear();
return;
} else {
heap.pop();
}
}
if (!heap.empty()) {
PopulateIterator(heap, advance_func);
}
}
template <typename BinaryHeap, typename AdvanceFuncType>
void PopulateIterator(BinaryHeap& heap, AdvanceFuncType advance_func) {
// 1. Keep the top iterator (by popping it from the heap) and populate
// value, columns and attribute_groups
// 2. Make sure all others have iterated past the top iterator key slice.
// While iterating, coalesce/populate value, columns and attribute_groups
// 3. Add the top iterator back without advancing it
assert(!heap.empty());
auto top = heap.top();
auto& [top_cfh, top_iter] = cfh_iter_pairs_[top.order];
populate_func_(top_cfh, top_iter.get());
heap.pop();
if (!heap.empty()) {
auto* current = heap.top().iterator;
while (current->Valid() &&
comparator_->Compare(top.iterator->key(), current->key()) == 0) {
assert(current->status().ok());
auto& [curr_cfh, curr_iter] = cfh_iter_pairs_[heap.top().order];
populate_func_(curr_cfh, curr_iter.get());
advance_func(current);
if (current->Valid()) {
auto current = heap.top();
assert(current.iterator);
while (current.iterator->Valid() &&
comparator_->Compare(top.iterator->key(),
current.iterator->key()) == 0) {
assert(current.iterator->status().ok());
advance_func(current.iterator);
if (current.iterator->Valid()) {
heap.replace_top(heap.top());
} else {
considerStatus(current->status());
considerStatus(current.iterator->status());
if (!status_.ok()) {
// Non-OK status from the iterator. Bail out early
heap.clear();
break;
return;
} else {
heap.pop();
}
}
if (!heap.empty()) {
current = heap.top().iterator;
current = heap.top();
}
}
}
heap.push(top);
advance_func(top.iterator);
if (top.iterator->Valid()) {
assert(top.iterator->status().ok());
heap.push(top);
} else {
considerStatus(top.iterator->status());
if (!status_.ok()) {
heap.clear();
return;
}
}
if (!heap.empty()) {
PopulateIterator(heap);
}
}
template <typename BinaryHeap>
void PopulateIterator(BinaryHeap& heap) {
// 1. Keep the top iterator (by popping it from the heap) and add it to list
// to populate
// 2. For all non-top iterators having the same key as top iter popped
// from the previous step, add them to the same list and pop it
// temporarily from the heap
// 3. Once no other iters have the same key as the top iter from step 1,
// populate the value/columns and attribute_groups from the list
// collected in step 1 and 2 and add all the iters back to the heap
assert(!heap.empty());
auto top = heap.top();
heap.pop();
autovector<MultiCfIteratorInfo> to_populate;
to_populate.push_back(top);
if (!heap.empty()) {
auto current = heap.top();
assert(current.iterator);
while (current.iterator->Valid() &&
comparator_->Compare(top.iterator->key(),
current.iterator->key()) == 0) {
assert(current.iterator->status().ok());
to_populate.push_back(current);
heap.pop();
if (!heap.empty()) {
current = heap.top();
} else {
break;
}
}
}
// Add the items back to the heap
for (auto& item : to_populate) {
heap.push(item);
}
populate_func_(to_populate);
}
};

View File

@ -468,18 +468,25 @@ class AttributeGroupIteratorTest : public DBTestBase {
AttributeGroupIteratorTest()
: DBTestBase("attribute_group_iterator_test", /*env_do_fsync=*/true) {}
// TODO - Verify AttributeGroup Iterator
void verifyAttributeGroupIterator(
const std::vector<ColumnFamilyHandle*>& cfhs,
const std::vector<Slice>& expected_keys,
const std::vector<AttributeGroups>& /*expected_attribute_groups*/) {
const std::vector<AttributeGroups>& expected_attribute_groups) {
int i = 0;
std::unique_ptr<AttributeGroupIterator> iter =
db_->NewAttributeGroupIterator(ReadOptions(), cfhs);
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
ASSERT_EQ(expected_keys[i], iter->key());
// TODO - uncomment this after implementing attribute_groups()
// ASSERT_EQ(expected_attribute_groups[i], iter->attribute_groups());
auto iterator_attribute_groups = iter->attribute_groups();
ASSERT_EQ(expected_attribute_groups[i].size(),
iterator_attribute_groups.size());
for (size_t cfh_i = 0; cfh_i < iterator_attribute_groups.size();
cfh_i++) {
ASSERT_EQ(expected_attribute_groups[i][cfh_i].column_family(),
iterator_attribute_groups[cfh_i].column_family());
ASSERT_EQ(expected_attribute_groups[i][cfh_i].columns(),
iterator_attribute_groups[cfh_i].columns());
}
++i;
}
ASSERT_EQ(expected_keys.size(), i);
@ -488,8 +495,16 @@ class AttributeGroupIteratorTest : public DBTestBase {
int rev_i = i - 1;
for (iter->SeekToLast(); iter->Valid(); iter->Prev()) {
ASSERT_EQ(expected_keys[rev_i], iter->key());
// TODO - uncomment this after implementing attribute_groups()
// ASSERT_EQ(expected_attribute_groups[rev_i], iter->attribute_groups());
auto iterator_attribute_groups = iter->attribute_groups();
ASSERT_EQ(expected_attribute_groups[rev_i].size(),
iterator_attribute_groups.size());
for (size_t cfh_i = 0; cfh_i < iterator_attribute_groups.size();
cfh_i++) {
ASSERT_EQ(expected_attribute_groups[rev_i][cfh_i].column_family(),
iterator_attribute_groups[cfh_i].column_family());
ASSERT_EQ(expected_attribute_groups[rev_i][cfh_i].columns(),
iterator_attribute_groups[cfh_i].columns());
}
rev_i--;
}
ASSERT_OK(iter->status());

View File

@ -77,7 +77,26 @@ inline void PinnableAttributeGroup::Reset() {
// A collection of Pinnable Attribute Groups.
using PinnableAttributeGroups = std::vector<PinnableAttributeGroup>;
// UNDER CONSTRUCTION - DO NOT USE
// Used in Iterator Path. Uses pointers to the columns to avoid having to copy
// all WideColumns objs during iteration.
class IteratorAttributeGroup {
public:
explicit IteratorAttributeGroup(ColumnFamilyHandle* column_family,
const WideColumns* columns)
: column_family_(column_family), columns_(columns) {}
ColumnFamilyHandle* column_family() const { return column_family_; }
const WideColumns& columns() const { return *columns_; }
private:
ColumnFamilyHandle* column_family_;
const WideColumns* columns_;
};
using IteratorAttributeGroups = std::vector<IteratorAttributeGroup>;
extern const IteratorAttributeGroups kNoIteratorAttributeGroups;
// EXPERIMENTAL
// A cross-column-family iterator that collects and returns attribute groups for
// each key in order provided by comparator
class AttributeGroupIterator : public IteratorBase {
@ -89,7 +108,7 @@ class AttributeGroupIterator : public IteratorBase {
AttributeGroupIterator(const AttributeGroupIterator&) = delete;
AttributeGroupIterator& operator=(const AttributeGroupIterator&) = delete;
virtual const AttributeGroups& attribute_groups() const = 0;
virtual const IteratorAttributeGroups& attribute_groups() const = 0;
};
} // namespace ROCKSDB_NAMESPACE

View File

@ -972,7 +972,7 @@ class DB {
const std::vector<ColumnFamilyHandle*>& column_families,
std::vector<Iterator*>* iterators) = 0;
// UNDER CONSTRUCTION - DO NOT USE
// EXPERIMENTAL
// Return a cross-column-family iterator from a consistent database state.
//
// If a key exists in more than one column family, value() will be determined
@ -990,7 +990,7 @@ class DB {
const ReadOptions& options,
const std::vector<ColumnFamilyHandle*>& column_families) = 0;
// UNDER CONSTRUCTION - DO NOT USE
// EXPERIMENTAL
// A cross-column-family iterator that collects and returns attribute groups
// for each key in order provided by comparator
virtual std::unique_ptr<AttributeGroupIterator> NewAttributeGroupIterator(

View File

@ -0,0 +1 @@
[Experimental] Introduce two new cross-column-family iterators - CoalescingIterator and AttributeGroupIterator. The CoalescingIterator enables users to iterate over multiple column families and access their values and columns. During this iteration, if the same key exists in more than one column family, the keys in the later column family will overshadow the previous ones. The AttributeGroupIterator allows users to gather wide columns per Column Family and create attribute groups while iterating over keys across all CFs.