Introduce MultiCfIterator (#12153)

Summary:
This PR introduces a new implementation of `Iterator` via a new public API called `NewMultiCfIterator()`. The new API takes a vector of column family handles to build a cross-column-family iterator, which internally maintains multiple `DBIter`s as child iterators from a consistent database state. When a key exists in multiple column families, the iterator selects the value (and wide columns) from the first column family containing the key, following the order provided in the `column_families` parameter. Similar to the merging iterator, a min heap is used to iterate across the child iterators. Backward iteration and direction change functionalities will be implemented in future PRs.

The comparator used to compare keys across different column families will be derived from the iterator of the first column family specified in `column_families`. This comparator will be checked against the comparators from all other column families that the iterator will traverse. If there's a mismatch with any of the comparators, the initialization of the iterator will fail.

Please note that this PR is not enough for users to start using `MultiCfIterator`. The `MultiCfIterator` and related APIs are still marked as "**DO NOT USE - UNDER CONSTRUCTION**". This PR is just the first of many PRs that will follow soon.

This PR includes the following:
- Introduction and partial implementation of the `MultiCfIterator`, which implements the generic `Iterator` interface. The implementation includes the construction of the iterator, `SeekToFirst()`, `Next()`, `Valid()`, `key()`, `value()`, and `columns()`.
- Unit tests to verify iteration across multiple column families in two distinct scenarios: (1) keys are unique across all column families, and (2) the same keys exist in multiple column families.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/12153

Reviewed By: pdillinger

Differential Revision: D52308697

Pulled By: jaykorean

fbshipit-source-id: b03e69f13b40af5a8f0598d0f43a0bec01ef8294
This commit is contained in:
Jay Huh 2024-03-05 10:22:43 -08:00 committed by Facebook GitHub Bot
parent 3fff57fa6a
commit 3412195367
14 changed files with 623 additions and 1 deletions

View File

@ -191,6 +191,7 @@ else()
endif()
if(MINGW)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-format")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wa,-mbig-obj")
add_definitions(-D_POSIX_C_SOURCE=1)
endif()
if(NOT CMAKE_BUILD_TYPE STREQUAL "Debug")
@ -691,6 +692,7 @@ set(SOURCES
db/memtable_list.cc
db/merge_helper.cc
db/merge_operator.cc
db/multi_cf_iterator.cc
db/output_validator.cc
db/periodic_task_scheduler.cc
db/range_del_aggregator.cc
@ -1343,6 +1345,7 @@ if(WITH_TESTS)
db/memtable_list_test.cc
db/merge_helper_test.cc
db/merge_test.cc
db/multi_cf_iterator_test.cc
db/options_file_test.cc
db/perf_context_test.cc
db/periodic_task_scheduler_test.cc

View File

@ -1640,6 +1640,9 @@ wal_edit_test: $(OBJ_DIR)/db/wal_edit_test.o $(TEST_LIBRARY) $(LIBRARY)
dbformat_test: $(OBJ_DIR)/db/dbformat_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)
multi_cf_iterator_test: $(OBJ_DIR)/db/multi_cf_iterator_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)
env_basic_test: $(OBJ_DIR)/env/env_basic_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)

View File

@ -83,6 +83,7 @@ cpp_library_wrapper(name="rocksdb_lib", srcs=[
"db/memtable_list.cc",
"db/merge_helper.cc",
"db/merge_operator.cc",
"db/multi_cf_iterator.cc",
"db/output_validator.cc",
"db/periodic_task_scheduler.cc",
"db/range_del_aggregator.cc",
@ -5226,6 +5227,12 @@ cpp_unittest_wrapper(name="mock_env_test",
extra_compiler_flags=[])
cpp_unittest_wrapper(name="multi_cf_iterator_test",
srcs=["db/multi_cf_iterator_test.cc"],
deps=[":rocksdb_test_lib"],
extra_compiler_flags=[])
cpp_unittest_wrapper(name="object_registry_test",
srcs=["utilities/object_registry_test.cc"],
deps=[":rocksdb_test_lib"],

View File

@ -44,7 +44,7 @@
#include "db/memtable.h"
#include "db/memtable_list.h"
#include "db/merge_context.h"
#include "db/merge_helper.h"
#include "db/multi_cf_iterator.h"
#include "db/periodic_task_scheduler.h"
#include "db/range_tombstone_fragmenter.h"
#include "db/table_cache.h"
@ -3735,6 +3735,31 @@ ArenaWrappedDBIter* DBImpl::NewIteratorImpl(
return db_iter;
}
std::unique_ptr<Iterator> DBImpl::NewMultiCfIterator(
const ReadOptions& _read_options,
const std::vector<ColumnFamilyHandle*>& column_families) {
if (column_families.size() == 0) {
return std::unique_ptr<Iterator>(NewErrorIterator(
Status::InvalidArgument("No Column Family was provided")));
}
const Comparator* first_comparator = column_families[0]->GetComparator();
for (size_t i = 1; i < column_families.size(); ++i) {
const Comparator* cf_comparator = column_families[i]->GetComparator();
if (first_comparator != cf_comparator &&
first_comparator->GetId().compare(cf_comparator->GetId()) != 0) {
return std::unique_ptr<Iterator>(NewErrorIterator(Status::InvalidArgument(
"Different comparators are being used across CFs")));
}
}
std::vector<Iterator*> child_iterators;
Status s = NewIterators(_read_options, column_families, &child_iterators);
if (s.ok()) {
return std::make_unique<MultiCfIterator>(first_comparator, column_families,
std::move(child_iterators));
}
return std::unique_ptr<Iterator>(NewErrorIterator(s));
}
Status DBImpl::NewIterators(
const ReadOptions& _read_options,
const std::vector<ColumnFamilyHandle*>& column_families,

View File

@ -351,6 +351,13 @@ class DBImpl : public DB {
const Snapshot* GetSnapshot() override;
void ReleaseSnapshot(const Snapshot* snapshot) override;
// UNDER CONSTRUCTION - DO NOT USE
// Return a cross-column-family iterator from a consistent database state.
std::unique_ptr<Iterator> NewMultiCfIterator(
const ReadOptions& options,
const std::vector<ColumnFamilyHandle*>& column_families) override;
// Create a timestamped snapshot. This snapshot can be shared by multiple
// readers. If any of them uses it for write conflict checking, then
// is_write_conflict_boundary is true. For simplicity, set it to true by

View File

@ -3199,6 +3199,14 @@ class ModelDB : public DB {
std::vector<Iterator*>* /*iterators*/) override {
return Status::NotSupported("Not supported yet");
}
// UNDER CONSTRUCTION - DO NOT USE
std::unique_ptr<Iterator> NewMultiCfIterator(
const ReadOptions& /*options*/,
const std::vector<ColumnFamilyHandle*>& /*column_families*/) override {
return nullptr;
}
const Snapshot* GetSnapshot() override {
ModelSnapshot* snapshot = new ModelSnapshot;
snapshot->map_ = map_;

62
db/multi_cf_iterator.cc Normal file
View File

@ -0,0 +1,62 @@
// Copyright (c) Meta Platforms, Inc. and affiliates.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include "db/multi_cf_iterator.h"
#include <cassert>
namespace ROCKSDB_NAMESPACE {
void MultiCfIterator::SeekToFirst() {
Reset();
int i = 0;
for (auto& cfh_iter_pair : cfh_iter_pairs_) {
auto& cfh = cfh_iter_pair.first;
auto& iter = cfh_iter_pair.second;
iter->SeekToFirst();
if (iter->Valid()) {
assert(iter->status().ok());
min_heap_.push(MultiCfIteratorInfo{iter.get(), cfh, i});
} else {
considerStatus(iter->status());
}
++i;
}
}
void MultiCfIterator::Next() {
assert(Valid());
// 1. Keep the top iterator (by popping it from the heap)
// 2. Make sure all others have iterated past the top iterator key slice
// 3. Advance the top iterator, and add it back to the heap if valid
auto top = min_heap_.top();
min_heap_.pop();
if (!min_heap_.empty()) {
auto* current = min_heap_.top().iterator;
while (current->Valid() &&
comparator_->Compare(top.iterator->key(), current->key()) == 0) {
assert(current->status().ok());
current->Next();
if (current->Valid()) {
min_heap_.replace_top(min_heap_.top());
} else {
considerStatus(current->status());
min_heap_.pop();
}
if (!min_heap_.empty()) {
current = min_heap_.top().iterator;
}
}
}
top.iterator->Next();
if (top.iterator->Valid()) {
assert(top.iterator->status().ok());
min_heap_.push(top);
} else {
considerStatus(top.iterator->status());
}
}
} // namespace ROCKSDB_NAMESPACE

116
db/multi_cf_iterator.h Normal file
View File

@ -0,0 +1,116 @@
// Copyright (c) Meta Platforms, Inc. and affiliates.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#pragma once
#include "rocksdb/comparator.h"
#include "rocksdb/iterator.h"
#include "rocksdb/options.h"
#include "util/heap.h"
namespace ROCKSDB_NAMESPACE {
// UNDER CONSTRUCTION - DO NOT USE
// A cross-column-family iterator from a consistent database state.
// When the same key exists in more than one column families, the iterator
// selects the value from the first column family containing the key, in the
// order provided in the `column_families` parameter.
class MultiCfIterator : public Iterator {
public:
MultiCfIterator(const Comparator* comparator,
const std::vector<ColumnFamilyHandle*>& column_families,
const std::vector<Iterator*>& child_iterators)
: comparator_(comparator),
min_heap_(MultiCfMinHeapItemComparator(comparator_)) {
assert(column_families.size() > 0 &&
column_families.size() == child_iterators.size());
cfh_iter_pairs_.reserve(column_families.size());
for (size_t i = 0; i < column_families.size(); ++i) {
cfh_iter_pairs_.emplace_back(
column_families[i], std::unique_ptr<Iterator>(child_iterators[i]));
}
}
~MultiCfIterator() override { status_.PermitUncheckedError(); }
// No copy allowed
MultiCfIterator(const MultiCfIterator&) = delete;
MultiCfIterator& operator=(const MultiCfIterator&) = delete;
private:
std::vector<std::pair<ColumnFamilyHandle*, std::unique_ptr<Iterator>>>
cfh_iter_pairs_;
ReadOptions read_options_;
Status status_;
AttributeGroups attribute_groups_;
struct MultiCfIteratorInfo {
Iterator* iterator;
ColumnFamilyHandle* cfh;
int order;
};
class MultiCfMinHeapItemComparator {
public:
explicit MultiCfMinHeapItemComparator(const Comparator* comparator)
: comparator_(comparator) {}
bool operator()(const MultiCfIteratorInfo& a,
const MultiCfIteratorInfo& b) const {
assert(a.iterator);
assert(b.iterator);
assert(a.iterator->Valid());
assert(b.iterator->Valid());
int c = comparator_->Compare(a.iterator->key(), b.iterator->key());
assert(c != 0 || a.order != b.order);
return c == 0 ? a.order - b.order > 0 : c > 0;
}
private:
const Comparator* comparator_;
};
const Comparator* comparator_;
using MultiCfMinHeap =
BinaryHeap<MultiCfIteratorInfo, MultiCfMinHeapItemComparator>;
MultiCfMinHeap min_heap_;
// TODO: MaxHeap for Reverse Iteration
// TODO: Lower and Upper bounds
Slice key() const override {
assert(Valid());
return min_heap_.top().iterator->key();
}
bool Valid() const override { return !min_heap_.empty() && status_.ok(); }
Status status() const override { return status_; }
void considerStatus(Status s) {
if (!s.ok() && status_.ok()) {
status_ = std::move(s);
}
}
void Reset() {
min_heap_.clear();
status_ = Status::OK();
}
void SeekToFirst() override;
void Next() override;
// TODO - Implement these
void Seek(const Slice& /*target*/) override {}
void SeekForPrev(const Slice& /*target*/) override {}
void SeekToLast() override {}
void Prev() override { assert(false); }
Slice value() const override {
assert(Valid());
return min_heap_.top().iterator->value();
}
const WideColumns& columns() const override {
assert(Valid());
return min_heap_.top().iterator->columns();
}
};
} // namespace ROCKSDB_NAMESPACE

View File

@ -0,0 +1,364 @@
// Copyright (c) Meta Platforms, Inc. and affiliates.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include <memory>
#include "db/db_test_util.h"
namespace ROCKSDB_NAMESPACE {
class MultiCfIteratorTest : public DBTestBase {
public:
MultiCfIteratorTest()
: DBTestBase("multi_cf_iterator_test", /*env_do_fsync=*/true) {}
void verifyMultiCfIterator(
const std::vector<ColumnFamilyHandle*>& cfhs,
const std::vector<Slice>& expected_keys,
const std::optional<std::vector<Slice>>& expected_values = std::nullopt,
const std::optional<std::vector<WideColumns>>& expected_wide_columns =
std::nullopt,
const std::optional<std::vector<AttributeGroups>>&
expected_attribute_groups = std::nullopt) {
int i = 0;
std::unique_ptr<Iterator> iter =
db_->NewMultiCfIterator(ReadOptions(), cfhs);
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
ASSERT_EQ(expected_keys[i], iter->key());
if (expected_values.has_value()) {
ASSERT_EQ(expected_values.value()[i], iter->value());
}
if (expected_wide_columns.has_value()) {
ASSERT_EQ(expected_wide_columns.value()[i], iter->columns());
}
if (expected_attribute_groups.has_value()) {
// TODO - Add this back when attribute_groups() API is added
// ASSERT_EQ(expected_attribute_groups.value()[i],
// iter->attribute_groups());
}
if (expected_values.has_value()) {
ASSERT_EQ(expected_values.value()[i], iter->value());
}
++i;
}
ASSERT_EQ(expected_keys.size(), i);
ASSERT_OK(iter->status());
}
void verifyExpectedKeys(ColumnFamilyHandle* cfh,
const std::vector<Slice>& expected_keys) {
int i = 0;
Iterator* iter = db_->NewIterator(ReadOptions(), cfh);
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
ASSERT_EQ(expected_keys[i], iter->key());
++i;
}
ASSERT_EQ(expected_keys.size(), i);
ASSERT_OK(iter->status());
delete iter;
}
};
TEST_F(MultiCfIteratorTest, InvalidArguments) {
Options options = GetDefaultOptions();
{
CreateAndReopenWithCF({"cf_1", "cf_2", "cf_3"}, options);
// Invalid - No CF is provided
std::unique_ptr<Iterator> iter_with_no_cf =
db_->NewMultiCfIterator(ReadOptions(), {});
ASSERT_NOK(iter_with_no_cf->status());
ASSERT_TRUE(iter_with_no_cf->status().IsInvalidArgument());
}
}
TEST_F(MultiCfIteratorTest, SimpleValues) {
Options options = GetDefaultOptions();
{
// Case 1: Unique key per CF
CreateAndReopenWithCF({"cf_1", "cf_2", "cf_3"}, options);
ASSERT_OK(Put(0, "key_1", "key_1_cf_0_val"));
ASSERT_OK(Put(1, "key_2", "key_2_cf_1_val"));
ASSERT_OK(Put(2, "key_3", "key_3_cf_2_val"));
ASSERT_OK(Put(3, "key_4", "key_4_cf_3_val"));
std::vector<Slice> expected_keys = {"key_1", "key_2", "key_3", "key_4"};
std::vector<Slice> expected_values = {"key_1_cf_0_val", "key_2_cf_1_val",
"key_3_cf_2_val", "key_4_cf_3_val"};
// Test for iteration over CF default->1->2->3
std::vector<ColumnFamilyHandle*> cfhs_order_0_1_2_3 = {
handles_[0], handles_[1], handles_[2], handles_[3]};
verifyMultiCfIterator(cfhs_order_0_1_2_3, expected_keys, expected_values);
// Test for iteration over CF 3->1->default_cf->2
std::vector<ColumnFamilyHandle*> cfhs_order_3_1_0_2 = {
handles_[3], handles_[1], handles_[0], handles_[2]};
// Iteration order and the return values should be the same since keys are
// unique per CF
verifyMultiCfIterator(cfhs_order_3_1_0_2, expected_keys, expected_values);
}
{
// Case 2: Same key in multiple CFs
options = CurrentOptions(options);
DestroyAndReopen(options);
CreateAndReopenWithCF({"cf_1", "cf_2", "cf_3"}, options);
ASSERT_OK(Put(0, "key_1", "key_1_cf_0_val"));
ASSERT_OK(Put(3, "key_1", "key_1_cf_3_val"));
ASSERT_OK(Put(1, "key_2", "key_2_cf_1_val"));
ASSERT_OK(Put(2, "key_2", "key_2_cf_2_val"));
ASSERT_OK(Put(0, "key_3", "key_3_cf_0_val"));
ASSERT_OK(Put(1, "key_3", "key_3_cf_1_val"));
ASSERT_OK(Put(3, "key_3", "key_3_cf_3_val"));
std::vector<Slice> expected_keys = {"key_1", "key_2", "key_3"};
// Test for iteration over CFs default->1->2->3
std::vector<ColumnFamilyHandle*> cfhs_order_0_1_2_3 = {
handles_[0], handles_[1], handles_[2], handles_[3]};
std::vector<Slice> expected_values = {"key_1_cf_0_val", "key_2_cf_1_val",
"key_3_cf_0_val"};
verifyMultiCfIterator(cfhs_order_0_1_2_3, expected_keys, expected_values);
// Test for iteration over CFs 3->2->default_cf->1
std::vector<ColumnFamilyHandle*> cfhs_order_3_2_0_1 = {
handles_[3], handles_[2], handles_[0], handles_[1]};
expected_values = {"key_1_cf_3_val", "key_2_cf_2_val", "key_3_cf_3_val"};
verifyMultiCfIterator(cfhs_order_3_2_0_1, expected_keys, expected_values);
}
}
TEST_F(MultiCfIteratorTest, WideColumns) {
// Set up the DB and Column Families
Options options = GetDefaultOptions();
CreateAndReopenWithCF({"cf_1", "cf_2", "cf_3"}, options);
constexpr char key_1[] = "key_1";
WideColumns key_1_columns_in_cf_2{
{kDefaultWideColumnName, "cf_2_col_val_0_key_1"},
{"cf_2_col_name_1", "cf_2_col_val_1_key_1"},
{"cf_2_col_name_2", "cf_2_col_val_2_key_1"}};
WideColumns key_1_columns_in_cf_3{
{"cf_3_col_name_1", "cf_3_col_val_1_key_1"},
{"cf_3_col_name_2", "cf_3_col_val_2_key_1"},
{"cf_3_col_name_3", "cf_3_col_val_3_key_1"}};
constexpr char key_2[] = "key_2";
WideColumns key_2_columns_in_cf_1{
{"cf_1_col_name_1", "cf_1_col_val_1_key_2"}};
WideColumns key_2_columns_in_cf_2{
{"cf_2_col_name_1", "cf_2_col_val_1_key_2"},
{"cf_2_col_name_2", "cf_2_col_val_2_key_2"}};
constexpr char key_3[] = "key_3";
WideColumns key_3_columns_in_cf_1{
{"cf_1_col_name_1", "cf_1_col_val_1_key_3"}};
WideColumns key_3_columns_in_cf_3{
{"cf_3_col_name_1", "cf_3_col_val_1_key_3"}};
constexpr char key_4[] = "key_4";
WideColumns key_4_columns_in_cf_0{
{"cf_0_col_name_1", "cf_0_col_val_1_key_4"}};
WideColumns key_4_columns_in_cf_2{
{"cf_2_col_name_1", "cf_2_col_val_1_key_4"}};
// Use AttributeGroup PutEntity API to insert them together
AttributeGroups key_1_attribute_groups{
AttributeGroup(handles_[2], key_1_columns_in_cf_2),
AttributeGroup(handles_[3], key_1_columns_in_cf_3)};
AttributeGroups key_2_attribute_groups{
AttributeGroup(handles_[1], key_2_columns_in_cf_1),
AttributeGroup(handles_[2], key_2_columns_in_cf_2)};
AttributeGroups key_3_attribute_groups{
AttributeGroup(handles_[1], key_3_columns_in_cf_1),
AttributeGroup(handles_[3], key_3_columns_in_cf_3)};
AttributeGroups key_4_attribute_groups{
AttributeGroup(handles_[0], key_4_columns_in_cf_0),
AttributeGroup(handles_[2], key_4_columns_in_cf_2)};
ASSERT_OK(db_->PutEntity(WriteOptions(), key_1, key_1_attribute_groups));
ASSERT_OK(db_->PutEntity(WriteOptions(), key_2, key_2_attribute_groups));
ASSERT_OK(db_->PutEntity(WriteOptions(), key_3, key_3_attribute_groups));
ASSERT_OK(db_->PutEntity(WriteOptions(), key_4, key_4_attribute_groups));
// Test for iteration over CF default->1->2->3
std::vector<ColumnFamilyHandle*> cfhs_order_0_1_2_3 = {
handles_[0], handles_[1], handles_[2], handles_[3]};
std::vector<Slice> expected_keys = {key_1, key_2, key_3, key_4};
// Pick what DBIter would return for value() in the first CF that key exists
// Since value for kDefaultWideColumnName only exists for key_1, rest will
// return empty value
std::vector<Slice> expected_values = {"cf_2_col_val_0_key_1", "", "", ""};
// Pick columns from the first CF that the key exists and value is stored as
// wide column
std::vector<WideColumns> expected_wide_columns = {
{{kDefaultWideColumnName, "cf_2_col_val_0_key_1"},
{"cf_2_col_name_1", "cf_2_col_val_1_key_1"},
{"cf_2_col_name_2", "cf_2_col_val_2_key_1"}},
{{"cf_1_col_name_1", "cf_1_col_val_1_key_2"}},
{{"cf_1_col_name_1", "cf_1_col_val_1_key_3"}},
{{"cf_0_col_name_1", "cf_0_col_val_1_key_4"}}};
verifyMultiCfIterator(cfhs_order_0_1_2_3, expected_keys, expected_values,
expected_wide_columns);
}
TEST_F(MultiCfIteratorTest, DifferentComparatorsInMultiCFs) {
// This test creates two column families with two different comparators.
// Attempting to create the MultiCFIterator should fail.
Options options = GetDefaultOptions();
options.create_if_missing = true;
DestroyAndReopen(options);
options.comparator = BytewiseComparator();
CreateColumnFamilies({"cf_forward"}, options);
options.comparator = ReverseBytewiseComparator();
CreateColumnFamilies({"cf_reverse"}, options);
ASSERT_OK(Put(0, "key_1", "value_1"));
ASSERT_OK(Put(0, "key_2", "value_2"));
ASSERT_OK(Put(0, "key_3", "value_3"));
ASSERT_OK(Put(1, "key_1", "value_1"));
ASSERT_OK(Put(1, "key_2", "value_2"));
ASSERT_OK(Put(1, "key_3", "value_3"));
verifyExpectedKeys(handles_[0], {"key_1", "key_2", "key_3"});
verifyExpectedKeys(handles_[1], {"key_3", "key_2", "key_1"});
std::unique_ptr<Iterator> iter =
db_->NewMultiCfIterator(ReadOptions(), handles_);
ASSERT_NOK(iter->status());
ASSERT_TRUE(iter->status().IsInvalidArgument());
}
TEST_F(MultiCfIteratorTest, CustomComparatorsInMultiCFs) {
// This test creates two column families with the same custom test
// comparators (but instantiated independently). Attempting to create the
// MultiCFIterator should not fail.
Options options = GetDefaultOptions();
options.create_if_missing = true;
DestroyAndReopen(options);
static auto comparator_1 =
std::make_unique<test::SimpleSuffixReverseComparator>(
test::SimpleSuffixReverseComparator());
static auto comparator_2 =
std::make_unique<test::SimpleSuffixReverseComparator>(
test::SimpleSuffixReverseComparator());
ASSERT_NE(comparator_1, comparator_2);
options.comparator = comparator_1.get();
CreateColumnFamilies({"cf_1"}, options);
options.comparator = comparator_2.get();
CreateColumnFamilies({"cf_2"}, options);
ASSERT_OK(Put(0, "key_001_001", "value_0_3"));
ASSERT_OK(Put(0, "key_001_002", "value_0_2"));
ASSERT_OK(Put(0, "key_001_003", "value_0_1"));
ASSERT_OK(Put(0, "key_002_001", "value_0_6"));
ASSERT_OK(Put(0, "key_002_002", "value_0_5"));
ASSERT_OK(Put(0, "key_002_003", "value_0_4"));
ASSERT_OK(Put(1, "key_001_001", "value_1_3"));
ASSERT_OK(Put(1, "key_001_002", "value_1_2"));
ASSERT_OK(Put(1, "key_001_003", "value_1_1"));
ASSERT_OK(Put(1, "key_003_004", "value_1_6"));
ASSERT_OK(Put(1, "key_003_005", "value_1_5"));
ASSERT_OK(Put(1, "key_003_006", "value_1_4"));
verifyExpectedKeys(
handles_[0], {"key_001_003", "key_001_002", "key_001_001", "key_002_003",
"key_002_002", "key_002_001"});
verifyExpectedKeys(
handles_[1], {"key_001_003", "key_001_002", "key_001_001", "key_003_006",
"key_003_005", "key_003_004"});
std::vector<Slice> expected_keys = {
"key_001_003", "key_001_002", "key_001_001", "key_002_003", "key_002_002",
"key_002_001", "key_003_006", "key_003_005", "key_003_004"};
std::vector<Slice> expected_values = {"value_0_1", "value_0_2", "value_0_3",
"value_0_4", "value_0_5", "value_0_6",
"value_1_4", "value_1_5", "value_1_6"};
int i = 0;
std::unique_ptr<Iterator> iter =
db_->NewMultiCfIterator(ReadOptions(), handles_);
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
ASSERT_EQ(expected_keys[i], iter->key());
ASSERT_EQ(expected_values[i], iter->value());
++i;
}
ASSERT_OK(iter->status());
}
TEST_F(MultiCfIteratorTest, DISABLED_IterateAttributeGroups) {
// Set up the DB and Column Families
Options options = GetDefaultOptions();
CreateAndReopenWithCF({"cf_1", "cf_2", "cf_3"}, options);
constexpr char key_1[] = "key_1";
WideColumns key_1_columns_in_cf_2{
{kDefaultWideColumnName, "cf_2_col_val_0_key_1"},
{"cf_2_col_name_1", "cf_2_col_val_1_key_1"},
{"cf_2_col_name_2", "cf_2_col_val_2_key_1"}};
WideColumns key_1_columns_in_cf_3{
{"cf_3_col_name_1", "cf_3_col_val_1_key_1"},
{"cf_3_col_name_2", "cf_3_col_val_2_key_1"},
{"cf_3_col_name_3", "cf_3_col_val_3_key_1"}};
constexpr char key_2[] = "key_2";
WideColumns key_2_columns_in_cf_1{
{"cf_1_col_name_1", "cf_1_col_val_1_key_2"}};
WideColumns key_2_columns_in_cf_2{
{"cf_2_col_name_1", "cf_2_col_val_1_key_2"},
{"cf_2_col_name_2", "cf_2_col_val_2_key_2"}};
constexpr char key_3[] = "key_3";
WideColumns key_3_columns_in_cf_1{
{"cf_1_col_name_1", "cf_1_col_val_1_key_3"}};
WideColumns key_3_columns_in_cf_3{
{"cf_3_col_name_1", "cf_3_col_val_1_key_3"}};
constexpr char key_4[] = "key_4";
WideColumns key_4_columns_in_cf_0{
{"cf_0_col_name_1", "cf_0_col_val_1_key_4"}};
WideColumns key_4_columns_in_cf_2{
{"cf_2_col_name_1", "cf_2_col_val_1_key_4"}};
AttributeGroups key_1_attribute_groups{
AttributeGroup(handles_[2], key_1_columns_in_cf_2),
AttributeGroup(handles_[3], key_1_columns_in_cf_3)};
AttributeGroups key_2_attribute_groups{
AttributeGroup(handles_[1], key_2_columns_in_cf_1),
AttributeGroup(handles_[2], key_2_columns_in_cf_2)};
AttributeGroups key_3_attribute_groups{
AttributeGroup(handles_[1], key_3_columns_in_cf_1),
AttributeGroup(handles_[3], key_3_columns_in_cf_3)};
AttributeGroups key_4_attribute_groups{
AttributeGroup(handles_[0], key_4_columns_in_cf_0),
AttributeGroup(handles_[2], key_4_columns_in_cf_2)};
ASSERT_OK(db_->PutEntity(WriteOptions(), key_1, key_1_attribute_groups));
ASSERT_OK(db_->PutEntity(WriteOptions(), key_2, key_2_attribute_groups));
ASSERT_OK(db_->PutEntity(WriteOptions(), key_3, key_3_attribute_groups));
ASSERT_OK(db_->PutEntity(WriteOptions(), key_4, key_4_attribute_groups));
// Test for iteration over CF default->1->2->3
std::vector<ColumnFamilyHandle*> cfhs_order_0_1_2_3 = {
handles_[0], handles_[1], handles_[2], handles_[3]};
std::vector<Slice> expected_keys = {key_1, key_2, key_3, key_4};
std::vector<AttributeGroups> expected_attribute_groups = {
key_1_attribute_groups, key_2_attribute_groups, key_3_attribute_groups,
key_4_attribute_groups};
verifyMultiCfIterator(
cfhs_order_0_1_2_3, expected_keys, std::nullopt /* expected_values */,
std::nullopt /* expected_wide_columns */, expected_attribute_groups);
}
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {
ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

View File

@ -12,6 +12,7 @@ namespace ROCKSDB_NAMESPACE {
const Slice kDefaultWideColumnName;
const WideColumns kNoWideColumns;
const AttributeGroups kNoAttributeGroups;
Status PinnableWideColumns::CreateIndexForWideColumns() {
Slice value_copy = value_;

View File

@ -955,6 +955,15 @@ class DB {
const std::vector<ColumnFamilyHandle*>& column_families,
std::vector<Iterator*>* iterators) = 0;
// UNDER CONSTRUCTION - DO NOT USE
// Return a cross-column-family iterator from a consistent database state.
// When the same key is present in multiple column families, the iterator
// selects the value or columns from the first column family containing the
// key, in the order specified by the `column_families` parameter.
virtual std::unique_ptr<Iterator> NewMultiCfIterator(
const ReadOptions& options,
const std::vector<ColumnFamilyHandle*>& column_families) = 0;
// Return a handle to the current DB state. Iterators created with
// this handle will all observe a stable snapshot of the current DB
// state. The caller must call ReleaseSnapshot(result) when the

View File

@ -259,6 +259,13 @@ class StackableDB : public DB {
return db_->NewIterators(options, column_families, iterators);
}
using DB::NewMultiCfIterator;
std::unique_ptr<Iterator> NewMultiCfIterator(
const ReadOptions& options,
const std::vector<ColumnFamilyHandle*>& column_families) override {
return db_->NewMultiCfIterator(options, column_families);
}
const Snapshot* GetSnapshot() override { return db_->GetSnapshot(); }
void ReleaseSnapshot(const Snapshot* snapshot) override {

View File

@ -238,9 +238,17 @@ class AttributeGroup {
WideColumns columns_;
};
inline bool operator==(const AttributeGroup& lhs, const AttributeGroup& rhs) {
return lhs.column_family() == rhs.column_family() &&
lhs.columns() == rhs.columns();
}
// A collection of Attribute Groups.
using AttributeGroups = std::vector<AttributeGroup>;
// An empty set of Attribute Groups.
extern const AttributeGroups kNoAttributeGroups;
// Used in Read Path. Wide-columns returned from the query are pinnable.
class PinnableAttributeGroup {
public:

2
src.mk
View File

@ -76,6 +76,7 @@ LIB_SOURCES = \
db/memtable_list.cc \
db/merge_helper.cc \
db/merge_operator.cc \
db/multi_cf_iterator.cc \
db/output_validator.cc \
db/periodic_task_scheduler.cc \
db/range_del_aggregator.cc \
@ -516,6 +517,7 @@ TEST_MAIN_SOURCES = \
db/memtable_list_test.cc \
db/merge_helper_test.cc \
db/merge_test.cc \
db/multi_cf_iterator_test.cc \
db/obsolete_files_test.cc \
db/options_file_test.cc \
db/perf_context_test.cc \