From 341219536725fd8fabb037c0d8ab192c4b038a60 Mon Sep 17 00:00:00 2001 From: Jay Huh Date: Tue, 5 Mar 2024 10:22:43 -0800 Subject: [PATCH] Introduce MultiCfIterator (#12153) Summary: This PR introduces a new implementation of `Iterator` via a new public API called `NewMultiCfIterator()`. The new API takes a vector of column family handles to build a cross-column-family iterator, which internally maintains multiple `DBIter`s as child iterators from a consistent database state. When a key exists in multiple column families, the iterator selects the value (and wide columns) from the first column family containing the key, following the order provided in the `column_families` parameter. Similar to the merging iterator, a min heap is used to iterate across the child iterators. Backward iteration and direction change functionalities will be implemented in future PRs. The comparator used to compare keys across different column families will be derived from the iterator of the first column family specified in `column_families`. This comparator will be checked against the comparators from all other column families that the iterator will traverse. If there's a mismatch with any of the comparators, the initialization of the iterator will fail. Please note that this PR is not enough for users to start using `MultiCfIterator`. The `MultiCfIterator` and related APIs are still marked as "**DO NOT USE - UNDER CONSTRUCTION**". This PR is just the first of many PRs that will follow soon. This PR includes the following: - Introduction and partial implementation of the `MultiCfIterator`, which implements the generic `Iterator` interface. The implementation includes the construction of the iterator, `SeekToFirst()`, `Next()`, `Valid()`, `key()`, `value()`, and `columns()`. - Unit tests to verify iteration across multiple column families in two distinct scenarios: (1) keys are unique across all column families, and (2) the same keys exist in multiple column families. Pull Request resolved: https://github.com/facebook/rocksdb/pull/12153 Reviewed By: pdillinger Differential Revision: D52308697 Pulled By: jaykorean fbshipit-source-id: b03e69f13b40af5a8f0598d0f43a0bec01ef8294 --- CMakeLists.txt | 3 + Makefile | 3 + TARGETS | 7 + db/db_impl/db_impl.cc | 27 +- db/db_impl/db_impl.h | 7 + db/db_test.cc | 8 + db/multi_cf_iterator.cc | 62 ++++ db/multi_cf_iterator.h | 116 ++++++++ db/multi_cf_iterator_test.cc | 364 +++++++++++++++++++++++ db/wide/wide_columns.cc | 1 + include/rocksdb/db.h | 9 + include/rocksdb/utilities/stackable_db.h | 7 + include/rocksdb/wide_columns.h | 8 + src.mk | 2 + 14 files changed, 623 insertions(+), 1 deletion(-) create mode 100644 db/multi_cf_iterator.cc create mode 100644 db/multi_cf_iterator.h create mode 100644 db/multi_cf_iterator_test.cc diff --git a/CMakeLists.txt b/CMakeLists.txt index 5cfc1b4803..24d0f3ef61 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -191,6 +191,7 @@ else() endif() if(MINGW) set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-format") + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wa,-mbig-obj") add_definitions(-D_POSIX_C_SOURCE=1) endif() if(NOT CMAKE_BUILD_TYPE STREQUAL "Debug") @@ -691,6 +692,7 @@ set(SOURCES db/memtable_list.cc db/merge_helper.cc db/merge_operator.cc + db/multi_cf_iterator.cc db/output_validator.cc db/periodic_task_scheduler.cc db/range_del_aggregator.cc @@ -1343,6 +1345,7 @@ if(WITH_TESTS) db/memtable_list_test.cc db/merge_helper_test.cc db/merge_test.cc + db/multi_cf_iterator_test.cc db/options_file_test.cc db/perf_context_test.cc db/periodic_task_scheduler_test.cc diff --git a/Makefile b/Makefile index 74a35d3fe8..cafefb4954 100644 --- a/Makefile +++ b/Makefile @@ -1640,6 +1640,9 @@ wal_edit_test: $(OBJ_DIR)/db/wal_edit_test.o $(TEST_LIBRARY) $(LIBRARY) dbformat_test: $(OBJ_DIR)/db/dbformat_test.o $(TEST_LIBRARY) $(LIBRARY) $(AM_LINK) +multi_cf_iterator_test: $(OBJ_DIR)/db/multi_cf_iterator_test.o $(TEST_LIBRARY) $(LIBRARY) + $(AM_LINK) + env_basic_test: $(OBJ_DIR)/env/env_basic_test.o $(TEST_LIBRARY) $(LIBRARY) $(AM_LINK) diff --git a/TARGETS b/TARGETS index e8aaf325d4..73ddeabc31 100644 --- a/TARGETS +++ b/TARGETS @@ -83,6 +83,7 @@ cpp_library_wrapper(name="rocksdb_lib", srcs=[ "db/memtable_list.cc", "db/merge_helper.cc", "db/merge_operator.cc", + "db/multi_cf_iterator.cc", "db/output_validator.cc", "db/periodic_task_scheduler.cc", "db/range_del_aggregator.cc", @@ -5226,6 +5227,12 @@ cpp_unittest_wrapper(name="mock_env_test", extra_compiler_flags=[]) +cpp_unittest_wrapper(name="multi_cf_iterator_test", + srcs=["db/multi_cf_iterator_test.cc"], + deps=[":rocksdb_test_lib"], + extra_compiler_flags=[]) + + cpp_unittest_wrapper(name="object_registry_test", srcs=["utilities/object_registry_test.cc"], deps=[":rocksdb_test_lib"], diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index a8ea205b4a..9e4c3b0c19 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -44,7 +44,7 @@ #include "db/memtable.h" #include "db/memtable_list.h" #include "db/merge_context.h" -#include "db/merge_helper.h" +#include "db/multi_cf_iterator.h" #include "db/periodic_task_scheduler.h" #include "db/range_tombstone_fragmenter.h" #include "db/table_cache.h" @@ -3735,6 +3735,31 @@ ArenaWrappedDBIter* DBImpl::NewIteratorImpl( return db_iter; } +std::unique_ptr DBImpl::NewMultiCfIterator( + const ReadOptions& _read_options, + const std::vector& column_families) { + if (column_families.size() == 0) { + return std::unique_ptr(NewErrorIterator( + Status::InvalidArgument("No Column Family was provided"))); + } + const Comparator* first_comparator = column_families[0]->GetComparator(); + for (size_t i = 1; i < column_families.size(); ++i) { + const Comparator* cf_comparator = column_families[i]->GetComparator(); + if (first_comparator != cf_comparator && + first_comparator->GetId().compare(cf_comparator->GetId()) != 0) { + return std::unique_ptr(NewErrorIterator(Status::InvalidArgument( + "Different comparators are being used across CFs"))); + } + } + std::vector child_iterators; + Status s = NewIterators(_read_options, column_families, &child_iterators); + if (s.ok()) { + return std::make_unique(first_comparator, column_families, + std::move(child_iterators)); + } + return std::unique_ptr(NewErrorIterator(s)); +} + Status DBImpl::NewIterators( const ReadOptions& _read_options, const std::vector& column_families, diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 327a49344e..29264474bd 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -351,6 +351,13 @@ class DBImpl : public DB { const Snapshot* GetSnapshot() override; void ReleaseSnapshot(const Snapshot* snapshot) override; + + // UNDER CONSTRUCTION - DO NOT USE + // Return a cross-column-family iterator from a consistent database state. + std::unique_ptr NewMultiCfIterator( + const ReadOptions& options, + const std::vector& column_families) override; + // Create a timestamped snapshot. This snapshot can be shared by multiple // readers. If any of them uses it for write conflict checking, then // is_write_conflict_boundary is true. For simplicity, set it to true by diff --git a/db/db_test.cc b/db/db_test.cc index 7f9eda724f..b2c6aab6d6 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -3199,6 +3199,14 @@ class ModelDB : public DB { std::vector* /*iterators*/) override { return Status::NotSupported("Not supported yet"); } + + // UNDER CONSTRUCTION - DO NOT USE + std::unique_ptr NewMultiCfIterator( + const ReadOptions& /*options*/, + const std::vector& /*column_families*/) override { + return nullptr; + } + const Snapshot* GetSnapshot() override { ModelSnapshot* snapshot = new ModelSnapshot; snapshot->map_ = map_; diff --git a/db/multi_cf_iterator.cc b/db/multi_cf_iterator.cc new file mode 100644 index 0000000000..66a0e85d78 --- /dev/null +++ b/db/multi_cf_iterator.cc @@ -0,0 +1,62 @@ +// Copyright (c) Meta Platforms, Inc. and affiliates. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "db/multi_cf_iterator.h" + +#include + +namespace ROCKSDB_NAMESPACE { + +void MultiCfIterator::SeekToFirst() { + Reset(); + int i = 0; + for (auto& cfh_iter_pair : cfh_iter_pairs_) { + auto& cfh = cfh_iter_pair.first; + auto& iter = cfh_iter_pair.second; + iter->SeekToFirst(); + if (iter->Valid()) { + assert(iter->status().ok()); + min_heap_.push(MultiCfIteratorInfo{iter.get(), cfh, i}); + } else { + considerStatus(iter->status()); + } + ++i; + } +} + +void MultiCfIterator::Next() { + assert(Valid()); + // 1. Keep the top iterator (by popping it from the heap) + // 2. Make sure all others have iterated past the top iterator key slice + // 3. Advance the top iterator, and add it back to the heap if valid + auto top = min_heap_.top(); + min_heap_.pop(); + if (!min_heap_.empty()) { + auto* current = min_heap_.top().iterator; + while (current->Valid() && + comparator_->Compare(top.iterator->key(), current->key()) == 0) { + assert(current->status().ok()); + current->Next(); + if (current->Valid()) { + min_heap_.replace_top(min_heap_.top()); + } else { + considerStatus(current->status()); + min_heap_.pop(); + } + if (!min_heap_.empty()) { + current = min_heap_.top().iterator; + } + } + } + top.iterator->Next(); + if (top.iterator->Valid()) { + assert(top.iterator->status().ok()); + min_heap_.push(top); + } else { + considerStatus(top.iterator->status()); + } +} + +} // namespace ROCKSDB_NAMESPACE diff --git a/db/multi_cf_iterator.h b/db/multi_cf_iterator.h new file mode 100644 index 0000000000..77a42cfc04 --- /dev/null +++ b/db/multi_cf_iterator.h @@ -0,0 +1,116 @@ +// Copyright (c) Meta Platforms, Inc. and affiliates. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#include "rocksdb/comparator.h" +#include "rocksdb/iterator.h" +#include "rocksdb/options.h" +#include "util/heap.h" + +namespace ROCKSDB_NAMESPACE { + +// UNDER CONSTRUCTION - DO NOT USE +// A cross-column-family iterator from a consistent database state. +// When the same key exists in more than one column families, the iterator +// selects the value from the first column family containing the key, in the +// order provided in the `column_families` parameter. +class MultiCfIterator : public Iterator { + public: + MultiCfIterator(const Comparator* comparator, + const std::vector& column_families, + const std::vector& child_iterators) + : comparator_(comparator), + min_heap_(MultiCfMinHeapItemComparator(comparator_)) { + assert(column_families.size() > 0 && + column_families.size() == child_iterators.size()); + cfh_iter_pairs_.reserve(column_families.size()); + for (size_t i = 0; i < column_families.size(); ++i) { + cfh_iter_pairs_.emplace_back( + column_families[i], std::unique_ptr(child_iterators[i])); + } + } + ~MultiCfIterator() override { status_.PermitUncheckedError(); } + + // No copy allowed + MultiCfIterator(const MultiCfIterator&) = delete; + MultiCfIterator& operator=(const MultiCfIterator&) = delete; + + private: + std::vector>> + cfh_iter_pairs_; + ReadOptions read_options_; + Status status_; + + AttributeGroups attribute_groups_; + + struct MultiCfIteratorInfo { + Iterator* iterator; + ColumnFamilyHandle* cfh; + int order; + }; + + class MultiCfMinHeapItemComparator { + public: + explicit MultiCfMinHeapItemComparator(const Comparator* comparator) + : comparator_(comparator) {} + + bool operator()(const MultiCfIteratorInfo& a, + const MultiCfIteratorInfo& b) const { + assert(a.iterator); + assert(b.iterator); + assert(a.iterator->Valid()); + assert(b.iterator->Valid()); + int c = comparator_->Compare(a.iterator->key(), b.iterator->key()); + assert(c != 0 || a.order != b.order); + return c == 0 ? a.order - b.order > 0 : c > 0; + } + + private: + const Comparator* comparator_; + }; + + const Comparator* comparator_; + using MultiCfMinHeap = + BinaryHeap; + MultiCfMinHeap min_heap_; + // TODO: MaxHeap for Reverse Iteration + // TODO: Lower and Upper bounds + + Slice key() const override { + assert(Valid()); + return min_heap_.top().iterator->key(); + } + bool Valid() const override { return !min_heap_.empty() && status_.ok(); } + Status status() const override { return status_; } + void considerStatus(Status s) { + if (!s.ok() && status_.ok()) { + status_ = std::move(s); + } + } + void Reset() { + min_heap_.clear(); + status_ = Status::OK(); + } + + void SeekToFirst() override; + void Next() override; + + // TODO - Implement these + void Seek(const Slice& /*target*/) override {} + void SeekForPrev(const Slice& /*target*/) override {} + void SeekToLast() override {} + void Prev() override { assert(false); } + Slice value() const override { + assert(Valid()); + return min_heap_.top().iterator->value(); + } + const WideColumns& columns() const override { + assert(Valid()); + return min_heap_.top().iterator->columns(); + } +}; + +} // namespace ROCKSDB_NAMESPACE diff --git a/db/multi_cf_iterator_test.cc b/db/multi_cf_iterator_test.cc new file mode 100644 index 0000000000..0b7ef0de91 --- /dev/null +++ b/db/multi_cf_iterator_test.cc @@ -0,0 +1,364 @@ +// Copyright (c) Meta Platforms, Inc. and affiliates. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include + +#include "db/db_test_util.h" + +namespace ROCKSDB_NAMESPACE { + +class MultiCfIteratorTest : public DBTestBase { + public: + MultiCfIteratorTest() + : DBTestBase("multi_cf_iterator_test", /*env_do_fsync=*/true) {} + + void verifyMultiCfIterator( + const std::vector& cfhs, + const std::vector& expected_keys, + const std::optional>& expected_values = std::nullopt, + const std::optional>& expected_wide_columns = + std::nullopt, + const std::optional>& + expected_attribute_groups = std::nullopt) { + int i = 0; + std::unique_ptr iter = + db_->NewMultiCfIterator(ReadOptions(), cfhs); + for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { + ASSERT_EQ(expected_keys[i], iter->key()); + if (expected_values.has_value()) { + ASSERT_EQ(expected_values.value()[i], iter->value()); + } + if (expected_wide_columns.has_value()) { + ASSERT_EQ(expected_wide_columns.value()[i], iter->columns()); + } + if (expected_attribute_groups.has_value()) { + // TODO - Add this back when attribute_groups() API is added + // ASSERT_EQ(expected_attribute_groups.value()[i], + // iter->attribute_groups()); + } + if (expected_values.has_value()) { + ASSERT_EQ(expected_values.value()[i], iter->value()); + } + ++i; + } + ASSERT_EQ(expected_keys.size(), i); + ASSERT_OK(iter->status()); + } + + void verifyExpectedKeys(ColumnFamilyHandle* cfh, + const std::vector& expected_keys) { + int i = 0; + Iterator* iter = db_->NewIterator(ReadOptions(), cfh); + for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { + ASSERT_EQ(expected_keys[i], iter->key()); + ++i; + } + ASSERT_EQ(expected_keys.size(), i); + ASSERT_OK(iter->status()); + delete iter; + } +}; + +TEST_F(MultiCfIteratorTest, InvalidArguments) { + Options options = GetDefaultOptions(); + { + CreateAndReopenWithCF({"cf_1", "cf_2", "cf_3"}, options); + + // Invalid - No CF is provided + std::unique_ptr iter_with_no_cf = + db_->NewMultiCfIterator(ReadOptions(), {}); + ASSERT_NOK(iter_with_no_cf->status()); + ASSERT_TRUE(iter_with_no_cf->status().IsInvalidArgument()); + } +} + +TEST_F(MultiCfIteratorTest, SimpleValues) { + Options options = GetDefaultOptions(); + { + // Case 1: Unique key per CF + CreateAndReopenWithCF({"cf_1", "cf_2", "cf_3"}, options); + + ASSERT_OK(Put(0, "key_1", "key_1_cf_0_val")); + ASSERT_OK(Put(1, "key_2", "key_2_cf_1_val")); + ASSERT_OK(Put(2, "key_3", "key_3_cf_2_val")); + ASSERT_OK(Put(3, "key_4", "key_4_cf_3_val")); + + std::vector expected_keys = {"key_1", "key_2", "key_3", "key_4"}; + std::vector expected_values = {"key_1_cf_0_val", "key_2_cf_1_val", + "key_3_cf_2_val", "key_4_cf_3_val"}; + + // Test for iteration over CF default->1->2->3 + std::vector cfhs_order_0_1_2_3 = { + handles_[0], handles_[1], handles_[2], handles_[3]}; + verifyMultiCfIterator(cfhs_order_0_1_2_3, expected_keys, expected_values); + + // Test for iteration over CF 3->1->default_cf->2 + std::vector cfhs_order_3_1_0_2 = { + handles_[3], handles_[1], handles_[0], handles_[2]}; + // Iteration order and the return values should be the same since keys are + // unique per CF + verifyMultiCfIterator(cfhs_order_3_1_0_2, expected_keys, expected_values); + } + { + // Case 2: Same key in multiple CFs + options = CurrentOptions(options); + DestroyAndReopen(options); + CreateAndReopenWithCF({"cf_1", "cf_2", "cf_3"}, options); + + ASSERT_OK(Put(0, "key_1", "key_1_cf_0_val")); + ASSERT_OK(Put(3, "key_1", "key_1_cf_3_val")); + ASSERT_OK(Put(1, "key_2", "key_2_cf_1_val")); + ASSERT_OK(Put(2, "key_2", "key_2_cf_2_val")); + ASSERT_OK(Put(0, "key_3", "key_3_cf_0_val")); + ASSERT_OK(Put(1, "key_3", "key_3_cf_1_val")); + ASSERT_OK(Put(3, "key_3", "key_3_cf_3_val")); + + std::vector expected_keys = {"key_1", "key_2", "key_3"}; + + // Test for iteration over CFs default->1->2->3 + std::vector cfhs_order_0_1_2_3 = { + handles_[0], handles_[1], handles_[2], handles_[3]}; + std::vector expected_values = {"key_1_cf_0_val", "key_2_cf_1_val", + "key_3_cf_0_val"}; + verifyMultiCfIterator(cfhs_order_0_1_2_3, expected_keys, expected_values); + + // Test for iteration over CFs 3->2->default_cf->1 + std::vector cfhs_order_3_2_0_1 = { + handles_[3], handles_[2], handles_[0], handles_[1]}; + expected_values = {"key_1_cf_3_val", "key_2_cf_2_val", "key_3_cf_3_val"}; + verifyMultiCfIterator(cfhs_order_3_2_0_1, expected_keys, expected_values); + } +} + +TEST_F(MultiCfIteratorTest, WideColumns) { + // Set up the DB and Column Families + Options options = GetDefaultOptions(); + CreateAndReopenWithCF({"cf_1", "cf_2", "cf_3"}, options); + + constexpr char key_1[] = "key_1"; + WideColumns key_1_columns_in_cf_2{ + {kDefaultWideColumnName, "cf_2_col_val_0_key_1"}, + {"cf_2_col_name_1", "cf_2_col_val_1_key_1"}, + {"cf_2_col_name_2", "cf_2_col_val_2_key_1"}}; + WideColumns key_1_columns_in_cf_3{ + {"cf_3_col_name_1", "cf_3_col_val_1_key_1"}, + {"cf_3_col_name_2", "cf_3_col_val_2_key_1"}, + {"cf_3_col_name_3", "cf_3_col_val_3_key_1"}}; + + constexpr char key_2[] = "key_2"; + WideColumns key_2_columns_in_cf_1{ + {"cf_1_col_name_1", "cf_1_col_val_1_key_2"}}; + WideColumns key_2_columns_in_cf_2{ + {"cf_2_col_name_1", "cf_2_col_val_1_key_2"}, + {"cf_2_col_name_2", "cf_2_col_val_2_key_2"}}; + + constexpr char key_3[] = "key_3"; + WideColumns key_3_columns_in_cf_1{ + {"cf_1_col_name_1", "cf_1_col_val_1_key_3"}}; + WideColumns key_3_columns_in_cf_3{ + {"cf_3_col_name_1", "cf_3_col_val_1_key_3"}}; + + constexpr char key_4[] = "key_4"; + WideColumns key_4_columns_in_cf_0{ + {"cf_0_col_name_1", "cf_0_col_val_1_key_4"}}; + WideColumns key_4_columns_in_cf_2{ + {"cf_2_col_name_1", "cf_2_col_val_1_key_4"}}; + + // Use AttributeGroup PutEntity API to insert them together + AttributeGroups key_1_attribute_groups{ + AttributeGroup(handles_[2], key_1_columns_in_cf_2), + AttributeGroup(handles_[3], key_1_columns_in_cf_3)}; + AttributeGroups key_2_attribute_groups{ + AttributeGroup(handles_[1], key_2_columns_in_cf_1), + AttributeGroup(handles_[2], key_2_columns_in_cf_2)}; + AttributeGroups key_3_attribute_groups{ + AttributeGroup(handles_[1], key_3_columns_in_cf_1), + AttributeGroup(handles_[3], key_3_columns_in_cf_3)}; + AttributeGroups key_4_attribute_groups{ + AttributeGroup(handles_[0], key_4_columns_in_cf_0), + AttributeGroup(handles_[2], key_4_columns_in_cf_2)}; + + ASSERT_OK(db_->PutEntity(WriteOptions(), key_1, key_1_attribute_groups)); + ASSERT_OK(db_->PutEntity(WriteOptions(), key_2, key_2_attribute_groups)); + ASSERT_OK(db_->PutEntity(WriteOptions(), key_3, key_3_attribute_groups)); + ASSERT_OK(db_->PutEntity(WriteOptions(), key_4, key_4_attribute_groups)); + + // Test for iteration over CF default->1->2->3 + std::vector cfhs_order_0_1_2_3 = { + handles_[0], handles_[1], handles_[2], handles_[3]}; + std::vector expected_keys = {key_1, key_2, key_3, key_4}; + // Pick what DBIter would return for value() in the first CF that key exists + // Since value for kDefaultWideColumnName only exists for key_1, rest will + // return empty value + std::vector expected_values = {"cf_2_col_val_0_key_1", "", "", ""}; + + // Pick columns from the first CF that the key exists and value is stored as + // wide column + std::vector expected_wide_columns = { + {{kDefaultWideColumnName, "cf_2_col_val_0_key_1"}, + {"cf_2_col_name_1", "cf_2_col_val_1_key_1"}, + {"cf_2_col_name_2", "cf_2_col_val_2_key_1"}}, + {{"cf_1_col_name_1", "cf_1_col_val_1_key_2"}}, + {{"cf_1_col_name_1", "cf_1_col_val_1_key_3"}}, + {{"cf_0_col_name_1", "cf_0_col_val_1_key_4"}}}; + verifyMultiCfIterator(cfhs_order_0_1_2_3, expected_keys, expected_values, + expected_wide_columns); +} + +TEST_F(MultiCfIteratorTest, DifferentComparatorsInMultiCFs) { + // This test creates two column families with two different comparators. + // Attempting to create the MultiCFIterator should fail. + Options options = GetDefaultOptions(); + options.create_if_missing = true; + DestroyAndReopen(options); + options.comparator = BytewiseComparator(); + CreateColumnFamilies({"cf_forward"}, options); + options.comparator = ReverseBytewiseComparator(); + CreateColumnFamilies({"cf_reverse"}, options); + + ASSERT_OK(Put(0, "key_1", "value_1")); + ASSERT_OK(Put(0, "key_2", "value_2")); + ASSERT_OK(Put(0, "key_3", "value_3")); + ASSERT_OK(Put(1, "key_1", "value_1")); + ASSERT_OK(Put(1, "key_2", "value_2")); + ASSERT_OK(Put(1, "key_3", "value_3")); + + verifyExpectedKeys(handles_[0], {"key_1", "key_2", "key_3"}); + verifyExpectedKeys(handles_[1], {"key_3", "key_2", "key_1"}); + + std::unique_ptr iter = + db_->NewMultiCfIterator(ReadOptions(), handles_); + ASSERT_NOK(iter->status()); + ASSERT_TRUE(iter->status().IsInvalidArgument()); +} + +TEST_F(MultiCfIteratorTest, CustomComparatorsInMultiCFs) { + // This test creates two column families with the same custom test + // comparators (but instantiated independently). Attempting to create the + // MultiCFIterator should not fail. + Options options = GetDefaultOptions(); + options.create_if_missing = true; + DestroyAndReopen(options); + static auto comparator_1 = + std::make_unique( + test::SimpleSuffixReverseComparator()); + static auto comparator_2 = + std::make_unique( + test::SimpleSuffixReverseComparator()); + ASSERT_NE(comparator_1, comparator_2); + + options.comparator = comparator_1.get(); + CreateColumnFamilies({"cf_1"}, options); + options.comparator = comparator_2.get(); + CreateColumnFamilies({"cf_2"}, options); + + ASSERT_OK(Put(0, "key_001_001", "value_0_3")); + ASSERT_OK(Put(0, "key_001_002", "value_0_2")); + ASSERT_OK(Put(0, "key_001_003", "value_0_1")); + ASSERT_OK(Put(0, "key_002_001", "value_0_6")); + ASSERT_OK(Put(0, "key_002_002", "value_0_5")); + ASSERT_OK(Put(0, "key_002_003", "value_0_4")); + ASSERT_OK(Put(1, "key_001_001", "value_1_3")); + ASSERT_OK(Put(1, "key_001_002", "value_1_2")); + ASSERT_OK(Put(1, "key_001_003", "value_1_1")); + ASSERT_OK(Put(1, "key_003_004", "value_1_6")); + ASSERT_OK(Put(1, "key_003_005", "value_1_5")); + ASSERT_OK(Put(1, "key_003_006", "value_1_4")); + + verifyExpectedKeys( + handles_[0], {"key_001_003", "key_001_002", "key_001_001", "key_002_003", + "key_002_002", "key_002_001"}); + verifyExpectedKeys( + handles_[1], {"key_001_003", "key_001_002", "key_001_001", "key_003_006", + "key_003_005", "key_003_004"}); + + std::vector expected_keys = { + "key_001_003", "key_001_002", "key_001_001", "key_002_003", "key_002_002", + "key_002_001", "key_003_006", "key_003_005", "key_003_004"}; + std::vector expected_values = {"value_0_1", "value_0_2", "value_0_3", + "value_0_4", "value_0_5", "value_0_6", + "value_1_4", "value_1_5", "value_1_6"}; + int i = 0; + std::unique_ptr iter = + db_->NewMultiCfIterator(ReadOptions(), handles_); + for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { + ASSERT_EQ(expected_keys[i], iter->key()); + ASSERT_EQ(expected_values[i], iter->value()); + ++i; + } + ASSERT_OK(iter->status()); +} + +TEST_F(MultiCfIteratorTest, DISABLED_IterateAttributeGroups) { + // Set up the DB and Column Families + Options options = GetDefaultOptions(); + CreateAndReopenWithCF({"cf_1", "cf_2", "cf_3"}, options); + + constexpr char key_1[] = "key_1"; + WideColumns key_1_columns_in_cf_2{ + {kDefaultWideColumnName, "cf_2_col_val_0_key_1"}, + {"cf_2_col_name_1", "cf_2_col_val_1_key_1"}, + {"cf_2_col_name_2", "cf_2_col_val_2_key_1"}}; + WideColumns key_1_columns_in_cf_3{ + {"cf_3_col_name_1", "cf_3_col_val_1_key_1"}, + {"cf_3_col_name_2", "cf_3_col_val_2_key_1"}, + {"cf_3_col_name_3", "cf_3_col_val_3_key_1"}}; + + constexpr char key_2[] = "key_2"; + WideColumns key_2_columns_in_cf_1{ + {"cf_1_col_name_1", "cf_1_col_val_1_key_2"}}; + WideColumns key_2_columns_in_cf_2{ + {"cf_2_col_name_1", "cf_2_col_val_1_key_2"}, + {"cf_2_col_name_2", "cf_2_col_val_2_key_2"}}; + + constexpr char key_3[] = "key_3"; + WideColumns key_3_columns_in_cf_1{ + {"cf_1_col_name_1", "cf_1_col_val_1_key_3"}}; + WideColumns key_3_columns_in_cf_3{ + {"cf_3_col_name_1", "cf_3_col_val_1_key_3"}}; + + constexpr char key_4[] = "key_4"; + WideColumns key_4_columns_in_cf_0{ + {"cf_0_col_name_1", "cf_0_col_val_1_key_4"}}; + WideColumns key_4_columns_in_cf_2{ + {"cf_2_col_name_1", "cf_2_col_val_1_key_4"}}; + + AttributeGroups key_1_attribute_groups{ + AttributeGroup(handles_[2], key_1_columns_in_cf_2), + AttributeGroup(handles_[3], key_1_columns_in_cf_3)}; + AttributeGroups key_2_attribute_groups{ + AttributeGroup(handles_[1], key_2_columns_in_cf_1), + AttributeGroup(handles_[2], key_2_columns_in_cf_2)}; + AttributeGroups key_3_attribute_groups{ + AttributeGroup(handles_[1], key_3_columns_in_cf_1), + AttributeGroup(handles_[3], key_3_columns_in_cf_3)}; + AttributeGroups key_4_attribute_groups{ + AttributeGroup(handles_[0], key_4_columns_in_cf_0), + AttributeGroup(handles_[2], key_4_columns_in_cf_2)}; + + ASSERT_OK(db_->PutEntity(WriteOptions(), key_1, key_1_attribute_groups)); + ASSERT_OK(db_->PutEntity(WriteOptions(), key_2, key_2_attribute_groups)); + ASSERT_OK(db_->PutEntity(WriteOptions(), key_3, key_3_attribute_groups)); + ASSERT_OK(db_->PutEntity(WriteOptions(), key_4, key_4_attribute_groups)); + + // Test for iteration over CF default->1->2->3 + std::vector cfhs_order_0_1_2_3 = { + handles_[0], handles_[1], handles_[2], handles_[3]}; + std::vector expected_keys = {key_1, key_2, key_3, key_4}; + std::vector expected_attribute_groups = { + key_1_attribute_groups, key_2_attribute_groups, key_3_attribute_groups, + key_4_attribute_groups}; + verifyMultiCfIterator( + cfhs_order_0_1_2_3, expected_keys, std::nullopt /* expected_values */, + std::nullopt /* expected_wide_columns */, expected_attribute_groups); +} + +} // namespace ROCKSDB_NAMESPACE + +int main(int argc, char** argv) { + ROCKSDB_NAMESPACE::port::InstallStackTraceHandler(); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/db/wide/wide_columns.cc b/db/wide/wide_columns.cc index 186be7f854..61f56d715f 100644 --- a/db/wide/wide_columns.cc +++ b/db/wide/wide_columns.cc @@ -12,6 +12,7 @@ namespace ROCKSDB_NAMESPACE { const Slice kDefaultWideColumnName; const WideColumns kNoWideColumns; +const AttributeGroups kNoAttributeGroups; Status PinnableWideColumns::CreateIndexForWideColumns() { Slice value_copy = value_; diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index 7fe13278e2..6a5b1eb038 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -955,6 +955,15 @@ class DB { const std::vector& column_families, std::vector* iterators) = 0; + // UNDER CONSTRUCTION - DO NOT USE + // Return a cross-column-family iterator from a consistent database state. + // When the same key is present in multiple column families, the iterator + // selects the value or columns from the first column family containing the + // key, in the order specified by the `column_families` parameter. + virtual std::unique_ptr NewMultiCfIterator( + const ReadOptions& options, + const std::vector& column_families) = 0; + // Return a handle to the current DB state. Iterators created with // this handle will all observe a stable snapshot of the current DB // state. The caller must call ReleaseSnapshot(result) when the diff --git a/include/rocksdb/utilities/stackable_db.h b/include/rocksdb/utilities/stackable_db.h index 437b6cb490..0aa6a65ebc 100644 --- a/include/rocksdb/utilities/stackable_db.h +++ b/include/rocksdb/utilities/stackable_db.h @@ -259,6 +259,13 @@ class StackableDB : public DB { return db_->NewIterators(options, column_families, iterators); } + using DB::NewMultiCfIterator; + std::unique_ptr NewMultiCfIterator( + const ReadOptions& options, + const std::vector& column_families) override { + return db_->NewMultiCfIterator(options, column_families); + } + const Snapshot* GetSnapshot() override { return db_->GetSnapshot(); } void ReleaseSnapshot(const Snapshot* snapshot) override { diff --git a/include/rocksdb/wide_columns.h b/include/rocksdb/wide_columns.h index 35b81268be..7976375510 100644 --- a/include/rocksdb/wide_columns.h +++ b/include/rocksdb/wide_columns.h @@ -238,9 +238,17 @@ class AttributeGroup { WideColumns columns_; }; +inline bool operator==(const AttributeGroup& lhs, const AttributeGroup& rhs) { + return lhs.column_family() == rhs.column_family() && + lhs.columns() == rhs.columns(); +} + // A collection of Attribute Groups. using AttributeGroups = std::vector; +// An empty set of Attribute Groups. +extern const AttributeGroups kNoAttributeGroups; + // Used in Read Path. Wide-columns returned from the query are pinnable. class PinnableAttributeGroup { public: diff --git a/src.mk b/src.mk index 3acefe7803..b19443f64c 100644 --- a/src.mk +++ b/src.mk @@ -76,6 +76,7 @@ LIB_SOURCES = \ db/memtable_list.cc \ db/merge_helper.cc \ db/merge_operator.cc \ + db/multi_cf_iterator.cc \ db/output_validator.cc \ db/periodic_task_scheduler.cc \ db/range_del_aggregator.cc \ @@ -516,6 +517,7 @@ TEST_MAIN_SOURCES = \ db/memtable_list_test.cc \ db/merge_helper_test.cc \ db/merge_test.cc \ + db/multi_cf_iterator_test.cc \ db/obsolete_files_test.cc \ db/options_file_test.cc \ db/perf_context_test.cc \