diff --git a/CMakeLists.txt b/CMakeLists.txt index 5cfc1b4803..24d0f3ef61 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -191,6 +191,7 @@ else() endif() if(MINGW) set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-format") + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wa,-mbig-obj") add_definitions(-D_POSIX_C_SOURCE=1) endif() if(NOT CMAKE_BUILD_TYPE STREQUAL "Debug") @@ -691,6 +692,7 @@ set(SOURCES db/memtable_list.cc db/merge_helper.cc db/merge_operator.cc + db/multi_cf_iterator.cc db/output_validator.cc db/periodic_task_scheduler.cc db/range_del_aggregator.cc @@ -1343,6 +1345,7 @@ if(WITH_TESTS) db/memtable_list_test.cc db/merge_helper_test.cc db/merge_test.cc + db/multi_cf_iterator_test.cc db/options_file_test.cc db/perf_context_test.cc db/periodic_task_scheduler_test.cc diff --git a/Makefile b/Makefile index 74a35d3fe8..cafefb4954 100644 --- a/Makefile +++ b/Makefile @@ -1640,6 +1640,9 @@ wal_edit_test: $(OBJ_DIR)/db/wal_edit_test.o $(TEST_LIBRARY) $(LIBRARY) dbformat_test: $(OBJ_DIR)/db/dbformat_test.o $(TEST_LIBRARY) $(LIBRARY) $(AM_LINK) +multi_cf_iterator_test: $(OBJ_DIR)/db/multi_cf_iterator_test.o $(TEST_LIBRARY) $(LIBRARY) + $(AM_LINK) + env_basic_test: $(OBJ_DIR)/env/env_basic_test.o $(TEST_LIBRARY) $(LIBRARY) $(AM_LINK) diff --git a/TARGETS b/TARGETS index e8aaf325d4..73ddeabc31 100644 --- a/TARGETS +++ b/TARGETS @@ -83,6 +83,7 @@ cpp_library_wrapper(name="rocksdb_lib", srcs=[ "db/memtable_list.cc", "db/merge_helper.cc", "db/merge_operator.cc", + "db/multi_cf_iterator.cc", "db/output_validator.cc", "db/periodic_task_scheduler.cc", "db/range_del_aggregator.cc", @@ -5226,6 +5227,12 @@ cpp_unittest_wrapper(name="mock_env_test", extra_compiler_flags=[]) +cpp_unittest_wrapper(name="multi_cf_iterator_test", + srcs=["db/multi_cf_iterator_test.cc"], + deps=[":rocksdb_test_lib"], + extra_compiler_flags=[]) + + cpp_unittest_wrapper(name="object_registry_test", srcs=["utilities/object_registry_test.cc"], deps=[":rocksdb_test_lib"], diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index a8ea205b4a..9e4c3b0c19 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -44,7 +44,7 @@ #include "db/memtable.h" #include "db/memtable_list.h" #include "db/merge_context.h" -#include "db/merge_helper.h" +#include "db/multi_cf_iterator.h" #include "db/periodic_task_scheduler.h" #include "db/range_tombstone_fragmenter.h" #include "db/table_cache.h" @@ -3735,6 +3735,31 @@ ArenaWrappedDBIter* DBImpl::NewIteratorImpl( return db_iter; } +std::unique_ptr DBImpl::NewMultiCfIterator( + const ReadOptions& _read_options, + const std::vector& column_families) { + if (column_families.size() == 0) { + return std::unique_ptr(NewErrorIterator( + Status::InvalidArgument("No Column Family was provided"))); + } + const Comparator* first_comparator = column_families[0]->GetComparator(); + for (size_t i = 1; i < column_families.size(); ++i) { + const Comparator* cf_comparator = column_families[i]->GetComparator(); + if (first_comparator != cf_comparator && + first_comparator->GetId().compare(cf_comparator->GetId()) != 0) { + return std::unique_ptr(NewErrorIterator(Status::InvalidArgument( + "Different comparators are being used across CFs"))); + } + } + std::vector child_iterators; + Status s = NewIterators(_read_options, column_families, &child_iterators); + if (s.ok()) { + return std::make_unique(first_comparator, column_families, + std::move(child_iterators)); + } + return std::unique_ptr(NewErrorIterator(s)); +} + Status DBImpl::NewIterators( const ReadOptions& _read_options, const std::vector& column_families, diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 327a49344e..29264474bd 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -351,6 +351,13 @@ class DBImpl : public DB { const Snapshot* GetSnapshot() override; void ReleaseSnapshot(const Snapshot* snapshot) override; + + // UNDER CONSTRUCTION - DO NOT USE + // Return a cross-column-family iterator from a consistent database state. + std::unique_ptr NewMultiCfIterator( + const ReadOptions& options, + const std::vector& column_families) override; + // Create a timestamped snapshot. This snapshot can be shared by multiple // readers. If any of them uses it for write conflict checking, then // is_write_conflict_boundary is true. For simplicity, set it to true by diff --git a/db/db_test.cc b/db/db_test.cc index 7f9eda724f..b2c6aab6d6 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -3199,6 +3199,14 @@ class ModelDB : public DB { std::vector* /*iterators*/) override { return Status::NotSupported("Not supported yet"); } + + // UNDER CONSTRUCTION - DO NOT USE + std::unique_ptr NewMultiCfIterator( + const ReadOptions& /*options*/, + const std::vector& /*column_families*/) override { + return nullptr; + } + const Snapshot* GetSnapshot() override { ModelSnapshot* snapshot = new ModelSnapshot; snapshot->map_ = map_; diff --git a/db/multi_cf_iterator.cc b/db/multi_cf_iterator.cc new file mode 100644 index 0000000000..66a0e85d78 --- /dev/null +++ b/db/multi_cf_iterator.cc @@ -0,0 +1,62 @@ +// Copyright (c) Meta Platforms, Inc. and affiliates. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "db/multi_cf_iterator.h" + +#include + +namespace ROCKSDB_NAMESPACE { + +void MultiCfIterator::SeekToFirst() { + Reset(); + int i = 0; + for (auto& cfh_iter_pair : cfh_iter_pairs_) { + auto& cfh = cfh_iter_pair.first; + auto& iter = cfh_iter_pair.second; + iter->SeekToFirst(); + if (iter->Valid()) { + assert(iter->status().ok()); + min_heap_.push(MultiCfIteratorInfo{iter.get(), cfh, i}); + } else { + considerStatus(iter->status()); + } + ++i; + } +} + +void MultiCfIterator::Next() { + assert(Valid()); + // 1. Keep the top iterator (by popping it from the heap) + // 2. Make sure all others have iterated past the top iterator key slice + // 3. Advance the top iterator, and add it back to the heap if valid + auto top = min_heap_.top(); + min_heap_.pop(); + if (!min_heap_.empty()) { + auto* current = min_heap_.top().iterator; + while (current->Valid() && + comparator_->Compare(top.iterator->key(), current->key()) == 0) { + assert(current->status().ok()); + current->Next(); + if (current->Valid()) { + min_heap_.replace_top(min_heap_.top()); + } else { + considerStatus(current->status()); + min_heap_.pop(); + } + if (!min_heap_.empty()) { + current = min_heap_.top().iterator; + } + } + } + top.iterator->Next(); + if (top.iterator->Valid()) { + assert(top.iterator->status().ok()); + min_heap_.push(top); + } else { + considerStatus(top.iterator->status()); + } +} + +} // namespace ROCKSDB_NAMESPACE diff --git a/db/multi_cf_iterator.h b/db/multi_cf_iterator.h new file mode 100644 index 0000000000..77a42cfc04 --- /dev/null +++ b/db/multi_cf_iterator.h @@ -0,0 +1,116 @@ +// Copyright (c) Meta Platforms, Inc. and affiliates. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#include "rocksdb/comparator.h" +#include "rocksdb/iterator.h" +#include "rocksdb/options.h" +#include "util/heap.h" + +namespace ROCKSDB_NAMESPACE { + +// UNDER CONSTRUCTION - DO NOT USE +// A cross-column-family iterator from a consistent database state. +// When the same key exists in more than one column families, the iterator +// selects the value from the first column family containing the key, in the +// order provided in the `column_families` parameter. +class MultiCfIterator : public Iterator { + public: + MultiCfIterator(const Comparator* comparator, + const std::vector& column_families, + const std::vector& child_iterators) + : comparator_(comparator), + min_heap_(MultiCfMinHeapItemComparator(comparator_)) { + assert(column_families.size() > 0 && + column_families.size() == child_iterators.size()); + cfh_iter_pairs_.reserve(column_families.size()); + for (size_t i = 0; i < column_families.size(); ++i) { + cfh_iter_pairs_.emplace_back( + column_families[i], std::unique_ptr(child_iterators[i])); + } + } + ~MultiCfIterator() override { status_.PermitUncheckedError(); } + + // No copy allowed + MultiCfIterator(const MultiCfIterator&) = delete; + MultiCfIterator& operator=(const MultiCfIterator&) = delete; + + private: + std::vector>> + cfh_iter_pairs_; + ReadOptions read_options_; + Status status_; + + AttributeGroups attribute_groups_; + + struct MultiCfIteratorInfo { + Iterator* iterator; + ColumnFamilyHandle* cfh; + int order; + }; + + class MultiCfMinHeapItemComparator { + public: + explicit MultiCfMinHeapItemComparator(const Comparator* comparator) + : comparator_(comparator) {} + + bool operator()(const MultiCfIteratorInfo& a, + const MultiCfIteratorInfo& b) const { + assert(a.iterator); + assert(b.iterator); + assert(a.iterator->Valid()); + assert(b.iterator->Valid()); + int c = comparator_->Compare(a.iterator->key(), b.iterator->key()); + assert(c != 0 || a.order != b.order); + return c == 0 ? a.order - b.order > 0 : c > 0; + } + + private: + const Comparator* comparator_; + }; + + const Comparator* comparator_; + using MultiCfMinHeap = + BinaryHeap; + MultiCfMinHeap min_heap_; + // TODO: MaxHeap for Reverse Iteration + // TODO: Lower and Upper bounds + + Slice key() const override { + assert(Valid()); + return min_heap_.top().iterator->key(); + } + bool Valid() const override { return !min_heap_.empty() && status_.ok(); } + Status status() const override { return status_; } + void considerStatus(Status s) { + if (!s.ok() && status_.ok()) { + status_ = std::move(s); + } + } + void Reset() { + min_heap_.clear(); + status_ = Status::OK(); + } + + void SeekToFirst() override; + void Next() override; + + // TODO - Implement these + void Seek(const Slice& /*target*/) override {} + void SeekForPrev(const Slice& /*target*/) override {} + void SeekToLast() override {} + void Prev() override { assert(false); } + Slice value() const override { + assert(Valid()); + return min_heap_.top().iterator->value(); + } + const WideColumns& columns() const override { + assert(Valid()); + return min_heap_.top().iterator->columns(); + } +}; + +} // namespace ROCKSDB_NAMESPACE diff --git a/db/multi_cf_iterator_test.cc b/db/multi_cf_iterator_test.cc new file mode 100644 index 0000000000..0b7ef0de91 --- /dev/null +++ b/db/multi_cf_iterator_test.cc @@ -0,0 +1,364 @@ +// Copyright (c) Meta Platforms, Inc. and affiliates. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include + +#include "db/db_test_util.h" + +namespace ROCKSDB_NAMESPACE { + +class MultiCfIteratorTest : public DBTestBase { + public: + MultiCfIteratorTest() + : DBTestBase("multi_cf_iterator_test", /*env_do_fsync=*/true) {} + + void verifyMultiCfIterator( + const std::vector& cfhs, + const std::vector& expected_keys, + const std::optional>& expected_values = std::nullopt, + const std::optional>& expected_wide_columns = + std::nullopt, + const std::optional>& + expected_attribute_groups = std::nullopt) { + int i = 0; + std::unique_ptr iter = + db_->NewMultiCfIterator(ReadOptions(), cfhs); + for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { + ASSERT_EQ(expected_keys[i], iter->key()); + if (expected_values.has_value()) { + ASSERT_EQ(expected_values.value()[i], iter->value()); + } + if (expected_wide_columns.has_value()) { + ASSERT_EQ(expected_wide_columns.value()[i], iter->columns()); + } + if (expected_attribute_groups.has_value()) { + // TODO - Add this back when attribute_groups() API is added + // ASSERT_EQ(expected_attribute_groups.value()[i], + // iter->attribute_groups()); + } + if (expected_values.has_value()) { + ASSERT_EQ(expected_values.value()[i], iter->value()); + } + ++i; + } + ASSERT_EQ(expected_keys.size(), i); + ASSERT_OK(iter->status()); + } + + void verifyExpectedKeys(ColumnFamilyHandle* cfh, + const std::vector& expected_keys) { + int i = 0; + Iterator* iter = db_->NewIterator(ReadOptions(), cfh); + for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { + ASSERT_EQ(expected_keys[i], iter->key()); + ++i; + } + ASSERT_EQ(expected_keys.size(), i); + ASSERT_OK(iter->status()); + delete iter; + } +}; + +TEST_F(MultiCfIteratorTest, InvalidArguments) { + Options options = GetDefaultOptions(); + { + CreateAndReopenWithCF({"cf_1", "cf_2", "cf_3"}, options); + + // Invalid - No CF is provided + std::unique_ptr iter_with_no_cf = + db_->NewMultiCfIterator(ReadOptions(), {}); + ASSERT_NOK(iter_with_no_cf->status()); + ASSERT_TRUE(iter_with_no_cf->status().IsInvalidArgument()); + } +} + +TEST_F(MultiCfIteratorTest, SimpleValues) { + Options options = GetDefaultOptions(); + { + // Case 1: Unique key per CF + CreateAndReopenWithCF({"cf_1", "cf_2", "cf_3"}, options); + + ASSERT_OK(Put(0, "key_1", "key_1_cf_0_val")); + ASSERT_OK(Put(1, "key_2", "key_2_cf_1_val")); + ASSERT_OK(Put(2, "key_3", "key_3_cf_2_val")); + ASSERT_OK(Put(3, "key_4", "key_4_cf_3_val")); + + std::vector expected_keys = {"key_1", "key_2", "key_3", "key_4"}; + std::vector expected_values = {"key_1_cf_0_val", "key_2_cf_1_val", + "key_3_cf_2_val", "key_4_cf_3_val"}; + + // Test for iteration over CF default->1->2->3 + std::vector cfhs_order_0_1_2_3 = { + handles_[0], handles_[1], handles_[2], handles_[3]}; + verifyMultiCfIterator(cfhs_order_0_1_2_3, expected_keys, expected_values); + + // Test for iteration over CF 3->1->default_cf->2 + std::vector cfhs_order_3_1_0_2 = { + handles_[3], handles_[1], handles_[0], handles_[2]}; + // Iteration order and the return values should be the same since keys are + // unique per CF + verifyMultiCfIterator(cfhs_order_3_1_0_2, expected_keys, expected_values); + } + { + // Case 2: Same key in multiple CFs + options = CurrentOptions(options); + DestroyAndReopen(options); + CreateAndReopenWithCF({"cf_1", "cf_2", "cf_3"}, options); + + ASSERT_OK(Put(0, "key_1", "key_1_cf_0_val")); + ASSERT_OK(Put(3, "key_1", "key_1_cf_3_val")); + ASSERT_OK(Put(1, "key_2", "key_2_cf_1_val")); + ASSERT_OK(Put(2, "key_2", "key_2_cf_2_val")); + ASSERT_OK(Put(0, "key_3", "key_3_cf_0_val")); + ASSERT_OK(Put(1, "key_3", "key_3_cf_1_val")); + ASSERT_OK(Put(3, "key_3", "key_3_cf_3_val")); + + std::vector expected_keys = {"key_1", "key_2", "key_3"}; + + // Test for iteration over CFs default->1->2->3 + std::vector cfhs_order_0_1_2_3 = { + handles_[0], handles_[1], handles_[2], handles_[3]}; + std::vector expected_values = {"key_1_cf_0_val", "key_2_cf_1_val", + "key_3_cf_0_val"}; + verifyMultiCfIterator(cfhs_order_0_1_2_3, expected_keys, expected_values); + + // Test for iteration over CFs 3->2->default_cf->1 + std::vector cfhs_order_3_2_0_1 = { + handles_[3], handles_[2], handles_[0], handles_[1]}; + expected_values = {"key_1_cf_3_val", "key_2_cf_2_val", "key_3_cf_3_val"}; + verifyMultiCfIterator(cfhs_order_3_2_0_1, expected_keys, expected_values); + } +} + +TEST_F(MultiCfIteratorTest, WideColumns) { + // Set up the DB and Column Families + Options options = GetDefaultOptions(); + CreateAndReopenWithCF({"cf_1", "cf_2", "cf_3"}, options); + + constexpr char key_1[] = "key_1"; + WideColumns key_1_columns_in_cf_2{ + {kDefaultWideColumnName, "cf_2_col_val_0_key_1"}, + {"cf_2_col_name_1", "cf_2_col_val_1_key_1"}, + {"cf_2_col_name_2", "cf_2_col_val_2_key_1"}}; + WideColumns key_1_columns_in_cf_3{ + {"cf_3_col_name_1", "cf_3_col_val_1_key_1"}, + {"cf_3_col_name_2", "cf_3_col_val_2_key_1"}, + {"cf_3_col_name_3", "cf_3_col_val_3_key_1"}}; + + constexpr char key_2[] = "key_2"; + WideColumns key_2_columns_in_cf_1{ + {"cf_1_col_name_1", "cf_1_col_val_1_key_2"}}; + WideColumns key_2_columns_in_cf_2{ + {"cf_2_col_name_1", "cf_2_col_val_1_key_2"}, + {"cf_2_col_name_2", "cf_2_col_val_2_key_2"}}; + + constexpr char key_3[] = "key_3"; + WideColumns key_3_columns_in_cf_1{ + {"cf_1_col_name_1", "cf_1_col_val_1_key_3"}}; + WideColumns key_3_columns_in_cf_3{ + {"cf_3_col_name_1", "cf_3_col_val_1_key_3"}}; + + constexpr char key_4[] = "key_4"; + WideColumns key_4_columns_in_cf_0{ + {"cf_0_col_name_1", "cf_0_col_val_1_key_4"}}; + WideColumns key_4_columns_in_cf_2{ + {"cf_2_col_name_1", "cf_2_col_val_1_key_4"}}; + + // Use AttributeGroup PutEntity API to insert them together + AttributeGroups key_1_attribute_groups{ + AttributeGroup(handles_[2], key_1_columns_in_cf_2), + AttributeGroup(handles_[3], key_1_columns_in_cf_3)}; + AttributeGroups key_2_attribute_groups{ + AttributeGroup(handles_[1], key_2_columns_in_cf_1), + AttributeGroup(handles_[2], key_2_columns_in_cf_2)}; + AttributeGroups key_3_attribute_groups{ + AttributeGroup(handles_[1], key_3_columns_in_cf_1), + AttributeGroup(handles_[3], key_3_columns_in_cf_3)}; + AttributeGroups key_4_attribute_groups{ + AttributeGroup(handles_[0], key_4_columns_in_cf_0), + AttributeGroup(handles_[2], key_4_columns_in_cf_2)}; + + ASSERT_OK(db_->PutEntity(WriteOptions(), key_1, key_1_attribute_groups)); + ASSERT_OK(db_->PutEntity(WriteOptions(), key_2, key_2_attribute_groups)); + ASSERT_OK(db_->PutEntity(WriteOptions(), key_3, key_3_attribute_groups)); + ASSERT_OK(db_->PutEntity(WriteOptions(), key_4, key_4_attribute_groups)); + + // Test for iteration over CF default->1->2->3 + std::vector cfhs_order_0_1_2_3 = { + handles_[0], handles_[1], handles_[2], handles_[3]}; + std::vector expected_keys = {key_1, key_2, key_3, key_4}; + // Pick what DBIter would return for value() in the first CF that key exists + // Since value for kDefaultWideColumnName only exists for key_1, rest will + // return empty value + std::vector expected_values = {"cf_2_col_val_0_key_1", "", "", ""}; + + // Pick columns from the first CF that the key exists and value is stored as + // wide column + std::vector expected_wide_columns = { + {{kDefaultWideColumnName, "cf_2_col_val_0_key_1"}, + {"cf_2_col_name_1", "cf_2_col_val_1_key_1"}, + {"cf_2_col_name_2", "cf_2_col_val_2_key_1"}}, + {{"cf_1_col_name_1", "cf_1_col_val_1_key_2"}}, + {{"cf_1_col_name_1", "cf_1_col_val_1_key_3"}}, + {{"cf_0_col_name_1", "cf_0_col_val_1_key_4"}}}; + verifyMultiCfIterator(cfhs_order_0_1_2_3, expected_keys, expected_values, + expected_wide_columns); +} + +TEST_F(MultiCfIteratorTest, DifferentComparatorsInMultiCFs) { + // This test creates two column families with two different comparators. + // Attempting to create the MultiCFIterator should fail. + Options options = GetDefaultOptions(); + options.create_if_missing = true; + DestroyAndReopen(options); + options.comparator = BytewiseComparator(); + CreateColumnFamilies({"cf_forward"}, options); + options.comparator = ReverseBytewiseComparator(); + CreateColumnFamilies({"cf_reverse"}, options); + + ASSERT_OK(Put(0, "key_1", "value_1")); + ASSERT_OK(Put(0, "key_2", "value_2")); + ASSERT_OK(Put(0, "key_3", "value_3")); + ASSERT_OK(Put(1, "key_1", "value_1")); + ASSERT_OK(Put(1, "key_2", "value_2")); + ASSERT_OK(Put(1, "key_3", "value_3")); + + verifyExpectedKeys(handles_[0], {"key_1", "key_2", "key_3"}); + verifyExpectedKeys(handles_[1], {"key_3", "key_2", "key_1"}); + + std::unique_ptr iter = + db_->NewMultiCfIterator(ReadOptions(), handles_); + ASSERT_NOK(iter->status()); + ASSERT_TRUE(iter->status().IsInvalidArgument()); +} + +TEST_F(MultiCfIteratorTest, CustomComparatorsInMultiCFs) { + // This test creates two column families with the same custom test + // comparators (but instantiated independently). Attempting to create the + // MultiCFIterator should not fail. + Options options = GetDefaultOptions(); + options.create_if_missing = true; + DestroyAndReopen(options); + static auto comparator_1 = + std::make_unique( + test::SimpleSuffixReverseComparator()); + static auto comparator_2 = + std::make_unique( + test::SimpleSuffixReverseComparator()); + ASSERT_NE(comparator_1, comparator_2); + + options.comparator = comparator_1.get(); + CreateColumnFamilies({"cf_1"}, options); + options.comparator = comparator_2.get(); + CreateColumnFamilies({"cf_2"}, options); + + ASSERT_OK(Put(0, "key_001_001", "value_0_3")); + ASSERT_OK(Put(0, "key_001_002", "value_0_2")); + ASSERT_OK(Put(0, "key_001_003", "value_0_1")); + ASSERT_OK(Put(0, "key_002_001", "value_0_6")); + ASSERT_OK(Put(0, "key_002_002", "value_0_5")); + ASSERT_OK(Put(0, "key_002_003", "value_0_4")); + ASSERT_OK(Put(1, "key_001_001", "value_1_3")); + ASSERT_OK(Put(1, "key_001_002", "value_1_2")); + ASSERT_OK(Put(1, "key_001_003", "value_1_1")); + ASSERT_OK(Put(1, "key_003_004", "value_1_6")); + ASSERT_OK(Put(1, "key_003_005", "value_1_5")); + ASSERT_OK(Put(1, "key_003_006", "value_1_4")); + + verifyExpectedKeys( + handles_[0], {"key_001_003", "key_001_002", "key_001_001", "key_002_003", + "key_002_002", "key_002_001"}); + verifyExpectedKeys( + handles_[1], {"key_001_003", "key_001_002", "key_001_001", "key_003_006", + "key_003_005", "key_003_004"}); + + std::vector expected_keys = { + "key_001_003", "key_001_002", "key_001_001", "key_002_003", "key_002_002", + "key_002_001", "key_003_006", "key_003_005", "key_003_004"}; + std::vector expected_values = {"value_0_1", "value_0_2", "value_0_3", + "value_0_4", "value_0_5", "value_0_6", + "value_1_4", "value_1_5", "value_1_6"}; + int i = 0; + std::unique_ptr iter = + db_->NewMultiCfIterator(ReadOptions(), handles_); + for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { + ASSERT_EQ(expected_keys[i], iter->key()); + ASSERT_EQ(expected_values[i], iter->value()); + ++i; + } + ASSERT_OK(iter->status()); +} + +TEST_F(MultiCfIteratorTest, DISABLED_IterateAttributeGroups) { + // Set up the DB and Column Families + Options options = GetDefaultOptions(); + CreateAndReopenWithCF({"cf_1", "cf_2", "cf_3"}, options); + + constexpr char key_1[] = "key_1"; + WideColumns key_1_columns_in_cf_2{ + {kDefaultWideColumnName, "cf_2_col_val_0_key_1"}, + {"cf_2_col_name_1", "cf_2_col_val_1_key_1"}, + {"cf_2_col_name_2", "cf_2_col_val_2_key_1"}}; + WideColumns key_1_columns_in_cf_3{ + {"cf_3_col_name_1", "cf_3_col_val_1_key_1"}, + {"cf_3_col_name_2", "cf_3_col_val_2_key_1"}, + {"cf_3_col_name_3", "cf_3_col_val_3_key_1"}}; + + constexpr char key_2[] = "key_2"; + WideColumns key_2_columns_in_cf_1{ + {"cf_1_col_name_1", "cf_1_col_val_1_key_2"}}; + WideColumns key_2_columns_in_cf_2{ + {"cf_2_col_name_1", "cf_2_col_val_1_key_2"}, + {"cf_2_col_name_2", "cf_2_col_val_2_key_2"}}; + + constexpr char key_3[] = "key_3"; + WideColumns key_3_columns_in_cf_1{ + {"cf_1_col_name_1", "cf_1_col_val_1_key_3"}}; + WideColumns key_3_columns_in_cf_3{ + {"cf_3_col_name_1", "cf_3_col_val_1_key_3"}}; + + constexpr char key_4[] = "key_4"; + WideColumns key_4_columns_in_cf_0{ + {"cf_0_col_name_1", "cf_0_col_val_1_key_4"}}; + WideColumns key_4_columns_in_cf_2{ + {"cf_2_col_name_1", "cf_2_col_val_1_key_4"}}; + + AttributeGroups key_1_attribute_groups{ + AttributeGroup(handles_[2], key_1_columns_in_cf_2), + AttributeGroup(handles_[3], key_1_columns_in_cf_3)}; + AttributeGroups key_2_attribute_groups{ + AttributeGroup(handles_[1], key_2_columns_in_cf_1), + AttributeGroup(handles_[2], key_2_columns_in_cf_2)}; + AttributeGroups key_3_attribute_groups{ + AttributeGroup(handles_[1], key_3_columns_in_cf_1), + AttributeGroup(handles_[3], key_3_columns_in_cf_3)}; + AttributeGroups key_4_attribute_groups{ + AttributeGroup(handles_[0], key_4_columns_in_cf_0), + AttributeGroup(handles_[2], key_4_columns_in_cf_2)}; + + ASSERT_OK(db_->PutEntity(WriteOptions(), key_1, key_1_attribute_groups)); + ASSERT_OK(db_->PutEntity(WriteOptions(), key_2, key_2_attribute_groups)); + ASSERT_OK(db_->PutEntity(WriteOptions(), key_3, key_3_attribute_groups)); + ASSERT_OK(db_->PutEntity(WriteOptions(), key_4, key_4_attribute_groups)); + + // Test for iteration over CF default->1->2->3 + std::vector cfhs_order_0_1_2_3 = { + handles_[0], handles_[1], handles_[2], handles_[3]}; + std::vector expected_keys = {key_1, key_2, key_3, key_4}; + std::vector expected_attribute_groups = { + key_1_attribute_groups, key_2_attribute_groups, key_3_attribute_groups, + key_4_attribute_groups}; + verifyMultiCfIterator( + cfhs_order_0_1_2_3, expected_keys, std::nullopt /* expected_values */, + std::nullopt /* expected_wide_columns */, expected_attribute_groups); +} + +} // namespace ROCKSDB_NAMESPACE + +int main(int argc, char** argv) { + ROCKSDB_NAMESPACE::port::InstallStackTraceHandler(); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/db/wide/wide_columns.cc b/db/wide/wide_columns.cc index 186be7f854..61f56d715f 100644 --- a/db/wide/wide_columns.cc +++ b/db/wide/wide_columns.cc @@ -12,6 +12,7 @@ namespace ROCKSDB_NAMESPACE { const Slice kDefaultWideColumnName; const WideColumns kNoWideColumns; +const AttributeGroups kNoAttributeGroups; Status PinnableWideColumns::CreateIndexForWideColumns() { Slice value_copy = value_; diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index 7fe13278e2..6a5b1eb038 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -955,6 +955,15 @@ class DB { const std::vector& column_families, std::vector* iterators) = 0; + // UNDER CONSTRUCTION - DO NOT USE + // Return a cross-column-family iterator from a consistent database state. + // When the same key is present in multiple column families, the iterator + // selects the value or columns from the first column family containing the + // key, in the order specified by the `column_families` parameter. + virtual std::unique_ptr NewMultiCfIterator( + const ReadOptions& options, + const std::vector& column_families) = 0; + // Return a handle to the current DB state. Iterators created with // this handle will all observe a stable snapshot of the current DB // state. The caller must call ReleaseSnapshot(result) when the diff --git a/include/rocksdb/utilities/stackable_db.h b/include/rocksdb/utilities/stackable_db.h index 437b6cb490..0aa6a65ebc 100644 --- a/include/rocksdb/utilities/stackable_db.h +++ b/include/rocksdb/utilities/stackable_db.h @@ -259,6 +259,13 @@ class StackableDB : public DB { return db_->NewIterators(options, column_families, iterators); } + using DB::NewMultiCfIterator; + std::unique_ptr NewMultiCfIterator( + const ReadOptions& options, + const std::vector& column_families) override { + return db_->NewMultiCfIterator(options, column_families); + } + const Snapshot* GetSnapshot() override { return db_->GetSnapshot(); } void ReleaseSnapshot(const Snapshot* snapshot) override { diff --git a/include/rocksdb/wide_columns.h b/include/rocksdb/wide_columns.h index 35b81268be..7976375510 100644 --- a/include/rocksdb/wide_columns.h +++ b/include/rocksdb/wide_columns.h @@ -238,9 +238,17 @@ class AttributeGroup { WideColumns columns_; }; +inline bool operator==(const AttributeGroup& lhs, const AttributeGroup& rhs) { + return lhs.column_family() == rhs.column_family() && + lhs.columns() == rhs.columns(); +} + // A collection of Attribute Groups. using AttributeGroups = std::vector; +// An empty set of Attribute Groups. +extern const AttributeGroups kNoAttributeGroups; + // Used in Read Path. Wide-columns returned from the query are pinnable. class PinnableAttributeGroup { public: diff --git a/src.mk b/src.mk index 3acefe7803..b19443f64c 100644 --- a/src.mk +++ b/src.mk @@ -76,6 +76,7 @@ LIB_SOURCES = \ db/memtable_list.cc \ db/merge_helper.cc \ db/merge_operator.cc \ + db/multi_cf_iterator.cc \ db/output_validator.cc \ db/periodic_task_scheduler.cc \ db/range_del_aggregator.cc \ @@ -516,6 +517,7 @@ TEST_MAIN_SOURCES = \ db/memtable_list_test.cc \ db/merge_helper_test.cc \ db/merge_test.cc \ + db/multi_cf_iterator_test.cc \ db/obsolete_files_test.cc \ db/options_file_test.cc \ db/perf_context_test.cc \