MultiCfIterator Implementations (#12422)

Summary:
This PR continues https://github.com/facebook/rocksdb/issues/12153 by implementing the missing `Iterator` APIs - `Seek()`, `SeekForPrev()`, `SeekToLast()`, and `Prev`. A MaxHeap Implementation has been added to handle the reverse direction.

The current implementation does not include upper/lower bounds yet. These will be added in subsequent PRs. The API is still marked as under construction and will be lifted after being added to the stress test.

Please note that changing the iterator direction in the middle of iteration is expensive, as it requires seeking the element in each iterator again in the opposite direction and rebuilding the heap along the way. The first `Next()` after `SeekForPrev()` requires changing the direction under the current implementation. We may optimize this in later PRs.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/12422

Test Plan: The `multi_cf_iterator_test` has been extended to cover the API implementations.

Reviewed By: pdillinger

Differential Revision: D54820754

Pulled By: jaykorean

fbshipit-source-id: 9eb741508df0f7bad598fb8e6bd5cdffc39e81d1
This commit is contained in:
Jay Huh 2024-03-18 09:05:30 -07:00 committed by Facebook GitHub Bot
parent 3d5be596a5
commit db1dea22b1
3 changed files with 284 additions and 45 deletions

View File

@ -9,16 +9,25 @@
namespace ROCKSDB_NAMESPACE {
void MultiCfIterator::SeekToFirst() {
void MultiCfIterator::SeekCommon(
const std::function<void(Iterator*)>& child_seek_func,
Direction direction) {
direction_ = direction;
Reset();
int i = 0;
for (auto& cfh_iter_pair : cfh_iter_pairs_) {
auto& cfh = cfh_iter_pair.first;
auto& iter = cfh_iter_pair.second;
iter->SeekToFirst();
child_seek_func(iter.get());
if (iter->Valid()) {
assert(iter->status().ok());
min_heap_.push(MultiCfIteratorInfo{iter.get(), cfh, i});
if (direction_ == kReverse) {
auto& max_heap = std::get<MultiCfMaxHeap>(heap_);
max_heap.push(MultiCfIteratorInfo{iter.get(), cfh, i});
} else {
auto& min_heap = std::get<MultiCfMinHeap>(heap_);
min_heap.push(MultiCfIteratorInfo{iter.get(), cfh, i});
}
} else {
considerStatus(iter->status());
}
@ -26,37 +35,69 @@ void MultiCfIterator::SeekToFirst() {
}
}
void MultiCfIterator::Next() {
assert(Valid());
template <typename BinaryHeap>
void MultiCfIterator::AdvanceIterator(
BinaryHeap& heap, const std::function<void(Iterator*)>& advance_func) {
// 1. Keep the top iterator (by popping it from the heap)
// 2. Make sure all others have iterated past the top iterator key slice
// 3. Advance the top iterator, and add it back to the heap if valid
auto top = min_heap_.top();
min_heap_.pop();
if (!min_heap_.empty()) {
auto* current = min_heap_.top().iterator;
auto top = heap.top();
heap.pop();
if (!heap.empty()) {
auto* current = heap.top().iterator;
while (current->Valid() &&
comparator_->Compare(top.iterator->key(), current->key()) == 0) {
assert(current->status().ok());
current->Next();
advance_func(current);
if (current->Valid()) {
min_heap_.replace_top(min_heap_.top());
heap.replace_top(heap.top());
} else {
considerStatus(current->status());
min_heap_.pop();
heap.pop();
}
if (!min_heap_.empty()) {
current = min_heap_.top().iterator;
if (!heap.empty()) {
current = heap.top().iterator;
}
}
}
top.iterator->Next();
advance_func(top.iterator);
if (top.iterator->Valid()) {
assert(top.iterator->status().ok());
min_heap_.push(top);
heap.push(top);
} else {
considerStatus(top.iterator->status());
}
}
void MultiCfIterator::SeekToFirst() {
SeekCommon([](Iterator* iter) { iter->SeekToFirst(); }, kForward);
}
void MultiCfIterator::Seek(const Slice& target) {
SeekCommon([&target](Iterator* iter) { iter->Seek(target); }, kForward);
}
void MultiCfIterator::SeekToLast() {
SeekCommon([](Iterator* iter) { iter->SeekToLast(); }, kReverse);
}
void MultiCfIterator::SeekForPrev(const Slice& target) {
SeekCommon([&target](Iterator* iter) { iter->SeekForPrev(target); },
kReverse);
}
void MultiCfIterator::Next() {
assert(Valid());
if (direction_ != kForward) {
SwitchToDirection(kForward);
}
auto& min_heap = std::get<MultiCfMinHeap>(heap_);
AdvanceIterator(min_heap, [](Iterator* iter) { iter->Next(); });
}
void MultiCfIterator::Prev() {
assert(Valid());
if (direction_ != kReverse) {
SwitchToDirection(kReverse);
}
auto& max_heap = std::get<MultiCfMaxHeap>(heap_);
AdvanceIterator(max_heap, [](Iterator* iter) { iter->Prev(); });
}
} // namespace ROCKSDB_NAMESPACE

View File

@ -5,10 +5,13 @@
#pragma once
#include <variant>
#include "rocksdb/comparator.h"
#include "rocksdb/iterator.h"
#include "rocksdb/options.h"
#include "util/heap.h"
#include "util/overload.h"
namespace ROCKSDB_NAMESPACE {
@ -23,7 +26,8 @@ class MultiCfIterator : public Iterator {
const std::vector<ColumnFamilyHandle*>& column_families,
const std::vector<Iterator*>& child_iterators)
: comparator_(comparator),
min_heap_(MultiCfMinHeapItemComparator(comparator_)) {
heap_(MultiCfMinHeap(
MultiCfHeapItemComparator<std::greater<int>>(comparator_))) {
assert(column_families.size() > 0 &&
column_families.size() == child_iterators.size());
cfh_iter_pairs_.reserve(column_families.size());
@ -52,11 +56,11 @@ class MultiCfIterator : public Iterator {
int order;
};
class MultiCfMinHeapItemComparator {
template <typename CompareOp>
class MultiCfHeapItemComparator {
public:
explicit MultiCfMinHeapItemComparator(const Comparator* comparator)
explicit MultiCfHeapItemComparator(const Comparator* comparator)
: comparator_(comparator) {}
bool operator()(const MultiCfIteratorInfo& a,
const MultiCfIteratorInfo& b) const {
assert(a.iterator);
@ -65,25 +69,59 @@ class MultiCfIterator : public Iterator {
assert(b.iterator->Valid());
int c = comparator_->Compare(a.iterator->key(), b.iterator->key());
assert(c != 0 || a.order != b.order);
return c == 0 ? a.order - b.order > 0 : c > 0;
return c == 0 ? a.order - b.order > 0 : CompareOp()(c, 0);
}
private:
const Comparator* comparator_;
};
const Comparator* comparator_;
using MultiCfMinHeap =
BinaryHeap<MultiCfIteratorInfo, MultiCfMinHeapItemComparator>;
MultiCfMinHeap min_heap_;
// TODO: MaxHeap for Reverse Iteration
BinaryHeap<MultiCfIteratorInfo,
MultiCfHeapItemComparator<std::greater<int>>>;
using MultiCfMaxHeap = BinaryHeap<MultiCfIteratorInfo,
MultiCfHeapItemComparator<std::less<int>>>;
using MultiCfIterHeap = std::variant<MultiCfMinHeap, MultiCfMaxHeap>;
MultiCfIterHeap heap_;
enum Direction : uint8_t { kForward, kReverse };
Direction direction_ = kForward;
// TODO: Lower and Upper bounds
Iterator* current() const {
if (direction_ == kReverse) {
auto& max_heap = std::get<MultiCfMaxHeap>(heap_);
return max_heap.top().iterator;
}
auto& min_heap = std::get<MultiCfMinHeap>(heap_);
return min_heap.top().iterator;
}
Slice key() const override {
assert(Valid());
return min_heap_.top().iterator->key();
return current()->key();
}
bool Valid() const override { return !min_heap_.empty() && status_.ok(); }
Slice value() const override {
assert(Valid());
return current()->value();
}
const WideColumns& columns() const override {
assert(Valid());
return current()->columns();
}
bool Valid() const override {
if (direction_ == kReverse) {
auto& max_heap = std::get<MultiCfMaxHeap>(heap_);
return !max_heap.empty() && status_.ok();
}
auto& min_heap = std::get<MultiCfMinHeap>(heap_);
return !min_heap.empty() && status_.ok();
}
Status status() const override { return status_; }
void considerStatus(Status s) {
if (!s.ok() && status_.ok()) {
@ -91,26 +129,53 @@ class MultiCfIterator : public Iterator {
}
}
void Reset() {
min_heap_.clear();
std::visit(overload{[&](MultiCfMinHeap& min_heap) -> void {
min_heap.clear();
if (direction_ == kReverse) {
InitMaxHeap();
}
},
[&](MultiCfMaxHeap& max_heap) -> void {
max_heap.clear();
if (direction_ == kForward) {
InitMinHeap();
}
}},
heap_);
status_ = Status::OK();
}
void SeekToFirst() override;
void Next() override;
void InitMinHeap() {
heap_.emplace<MultiCfMinHeap>(
MultiCfHeapItemComparator<std::greater<int>>(comparator_));
}
void InitMaxHeap() {
heap_.emplace<MultiCfMaxHeap>(
MultiCfHeapItemComparator<std::less<int>>(comparator_));
}
// TODO - Implement these
void Seek(const Slice& /*target*/) override {}
void SeekForPrev(const Slice& /*target*/) override {}
void SeekToLast() override {}
void Prev() override { assert(false); }
Slice value() const override {
assert(Valid());
return min_heap_.top().iterator->value();
}
const WideColumns& columns() const override {
assert(Valid());
return min_heap_.top().iterator->columns();
void SwitchToDirection(Direction new_direction) {
assert(direction_ != new_direction);
Slice target = key();
if (new_direction == kForward) {
Seek(target);
} else {
SeekForPrev(target);
}
}
void SeekCommon(const std::function<void(Iterator*)>& child_seek_func,
Direction direction);
template <typename BinaryHeap>
void AdvanceIterator(BinaryHeap& heap,
const std::function<void(Iterator*)>& advance_func);
void SeekToFirst() override;
void SeekToLast() override;
void Seek(const Slice& /*target*/) override;
void SeekForPrev(const Slice& /*target*/) override;
void Next() override;
void Prev() override;
};
} // namespace ROCKSDB_NAMESPACE

View File

@ -14,6 +14,8 @@ class MultiCfIteratorTest : public DBTestBase {
MultiCfIteratorTest()
: DBTestBase("multi_cf_iterator_test", /*env_do_fsync=*/true) {}
// Verify Iteration of MultiCfIterator
// by SeekToFirst() + Next() and SeekToLast() + Prev()
void verifyMultiCfIterator(
const std::vector<ColumnFamilyHandle*>& cfhs,
const std::vector<Slice>& expected_keys,
@ -38,13 +40,28 @@ class MultiCfIteratorTest : public DBTestBase {
// ASSERT_EQ(expected_attribute_groups.value()[i],
// iter->attribute_groups());
}
if (expected_values.has_value()) {
ASSERT_EQ(expected_values.value()[i], iter->value());
}
++i;
}
ASSERT_EQ(expected_keys.size(), i);
ASSERT_OK(iter->status());
int rev_i = i - 1;
for (iter->SeekToLast(); iter->Valid(); iter->Prev()) {
ASSERT_EQ(expected_keys[rev_i], iter->key());
if (expected_values.has_value()) {
ASSERT_EQ(expected_values.value()[rev_i], iter->value());
}
if (expected_wide_columns.has_value()) {
ASSERT_EQ(expected_wide_columns.value()[rev_i], iter->columns());
}
if (expected_attribute_groups.has_value()) {
// TODO - Add this back when attribute_groups() API is added
// ASSERT_EQ(expected_attribute_groups.value()[rev_i],
// iter->attribute_groups());
}
rev_i--;
}
ASSERT_OK(iter->status());
}
void verifyExpectedKeys(ColumnFamilyHandle* cfh,
@ -100,6 +117,41 @@ TEST_F(MultiCfIteratorTest, SimpleValues) {
// Iteration order and the return values should be the same since keys are
// unique per CF
verifyMultiCfIterator(cfhs_order_3_1_0_2, expected_keys, expected_values);
// Verify Seek()
{
std::unique_ptr<Iterator> iter =
db_->NewMultiCfIterator(ReadOptions(), cfhs_order_0_1_2_3);
iter->Seek("");
ASSERT_EQ(IterStatus(iter.get()), "key_1->key_1_cf_0_val");
iter->Seek("key_1");
ASSERT_EQ(IterStatus(iter.get()), "key_1->key_1_cf_0_val");
iter->Seek("key_2");
ASSERT_EQ(IterStatus(iter.get()), "key_2->key_2_cf_1_val");
iter->Next();
ASSERT_EQ(IterStatus(iter.get()), "key_3->key_3_cf_2_val");
iter->Seek("key_x");
ASSERT_EQ(IterStatus(iter.get()), "(invalid)");
}
// Verify SeekForPrev()
{
std::unique_ptr<Iterator> iter =
db_->NewMultiCfIterator(ReadOptions(), cfhs_order_0_1_2_3);
iter->SeekForPrev("");
ASSERT_EQ(IterStatus(iter.get()), "(invalid)");
iter->SeekForPrev("key_1");
ASSERT_EQ(IterStatus(iter.get()), "key_1->key_1_cf_0_val");
iter->Next();
ASSERT_EQ(IterStatus(iter.get()), "key_2->key_2_cf_1_val");
iter->SeekForPrev("key_x");
ASSERT_EQ(IterStatus(iter.get()), "key_4->key_4_cf_3_val");
iter->Prev();
ASSERT_EQ(IterStatus(iter.get()), "key_3->key_3_cf_2_val");
iter->Next();
ASSERT_EQ(IterStatus(iter.get()), "key_4->key_4_cf_3_val");
iter->Next();
ASSERT_EQ(IterStatus(iter.get()), "(invalid)");
}
}
{
// Case 2: Same key in multiple CFs
@ -129,6 +181,87 @@ TEST_F(MultiCfIteratorTest, SimpleValues) {
handles_[3], handles_[2], handles_[0], handles_[1]};
expected_values = {"key_1_cf_3_val", "key_2_cf_2_val", "key_3_cf_3_val"};
verifyMultiCfIterator(cfhs_order_3_2_0_1, expected_keys, expected_values);
// Verify Seek()
{
std::unique_ptr<Iterator> iter =
db_->NewMultiCfIterator(ReadOptions(), cfhs_order_3_2_0_1);
iter->Seek("");
ASSERT_EQ(IterStatus(iter.get()), "key_1->key_1_cf_3_val");
iter->Seek("key_1");
ASSERT_EQ(IterStatus(iter.get()), "key_1->key_1_cf_3_val");
iter->Seek("key_2");
ASSERT_EQ(IterStatus(iter.get()), "key_2->key_2_cf_2_val");
iter->Next();
ASSERT_EQ(IterStatus(iter.get()), "key_3->key_3_cf_3_val");
iter->Seek("key_x");
ASSERT_EQ(IterStatus(iter.get()), "(invalid)");
}
// Verify SeekForPrev()
{
std::unique_ptr<Iterator> iter =
db_->NewMultiCfIterator(ReadOptions(), cfhs_order_3_2_0_1);
iter->SeekForPrev("");
ASSERT_EQ(IterStatus(iter.get()), "(invalid)");
iter->SeekForPrev("key_1");
ASSERT_EQ(IterStatus(iter.get()), "key_1->key_1_cf_3_val");
iter->Next();
ASSERT_EQ(IterStatus(iter.get()), "key_2->key_2_cf_2_val");
iter->SeekForPrev("key_x");
ASSERT_EQ(IterStatus(iter.get()), "key_3->key_3_cf_3_val");
iter->Next();
ASSERT_EQ(IterStatus(iter.get()), "(invalid)");
}
}
}
TEST_F(MultiCfIteratorTest, EmptyCfs) {
Options options = GetDefaultOptions();
{
// Case 1: No keys in any of the CFs
CreateAndReopenWithCF({"cf_1", "cf_2", "cf_3"}, options);
std::unique_ptr<Iterator> iter =
db_->NewMultiCfIterator(ReadOptions(), handles_);
iter->SeekToFirst();
ASSERT_EQ(IterStatus(iter.get()), "(invalid)");
iter->SeekToLast();
ASSERT_EQ(IterStatus(iter.get()), "(invalid)");
iter->Seek("foo");
ASSERT_EQ(IterStatus(iter.get()), "(invalid)");
iter->SeekForPrev("foo");
ASSERT_EQ(IterStatus(iter.get()), "(invalid)");
ASSERT_OK(iter->status());
}
{
// Case 2: A single key exists in only one of the CF. Rest CFs are empty.
ASSERT_OK(Put(1, "key_1", "key_1_cf_1_val"));
std::unique_ptr<Iterator> iter =
db_->NewMultiCfIterator(ReadOptions(), handles_);
iter->SeekToFirst();
ASSERT_EQ(IterStatus(iter.get()), "key_1->key_1_cf_1_val");
iter->Next();
ASSERT_EQ(IterStatus(iter.get()), "(invalid)");
iter->SeekToLast();
ASSERT_EQ(IterStatus(iter.get()), "key_1->key_1_cf_1_val");
iter->Prev();
ASSERT_EQ(IterStatus(iter.get()), "(invalid)");
}
{
// Case 3: same key exists in all of the CFs except one (cf_2)
ASSERT_OK(Put(0, "key_1", "key_1_cf_0_val"));
ASSERT_OK(Put(3, "key_1", "key_1_cf_3_val"));
// handles_ are in the order of 0->1->2->3. We should expect value from cf_0
std::unique_ptr<Iterator> iter =
db_->NewMultiCfIterator(ReadOptions(), handles_);
iter->SeekToFirst();
ASSERT_EQ(IterStatus(iter.get()), "key_1->key_1_cf_0_val");
iter->Next();
ASSERT_EQ(IterStatus(iter.get()), "(invalid)");
iter->SeekToLast();
ASSERT_EQ(IterStatus(iter.get()), "key_1->key_1_cf_0_val");
iter->Prev();
ASSERT_EQ(IterStatus(iter.get()), "(invalid)");
}
}