diff --git a/db/attribute_group_iterator_impl.h b/db/attribute_group_iterator_impl.h index 7f73ec7cc9..c6dc2722a3 100644 --- a/db/attribute_group_iterator_impl.h +++ b/db/attribute_group_iterator_impl.h @@ -13,11 +13,11 @@ namespace ROCKSDB_NAMESPACE { class AttributeGroupIteratorImpl : public AttributeGroupIterator { public: AttributeGroupIteratorImpl( - const Comparator* comparator, + const Comparator* comparator, bool allow_unprepared_value, const std::vector& column_families, const std::vector& child_iterators) - : impl_(comparator, column_families, child_iterators, ResetFunc(this), - PopulateFunc(this)) {} + : impl_(comparator, allow_unprepared_value, column_families, + child_iterators, ResetFunc(this), PopulateFunc(this)) {} ~AttributeGroupIteratorImpl() override {} // No copy allowed @@ -42,6 +42,8 @@ class AttributeGroupIteratorImpl : public AttributeGroupIterator { void Reset() { attribute_groups_.clear(); } + bool PrepareValue() override { return impl_.PrepareValue(); } + private: class ResetFunc { public: diff --git a/db/coalescing_iterator.h b/db/coalescing_iterator.h index a760d3f40b..c4a1c831ed 100644 --- a/db/coalescing_iterator.h +++ b/db/coalescing_iterator.h @@ -12,11 +12,11 @@ namespace ROCKSDB_NAMESPACE { // EXPERIMENTAL class CoalescingIterator : public Iterator { public: - CoalescingIterator(const Comparator* comparator, + CoalescingIterator(const Comparator* comparator, bool allow_unprepared_value, const std::vector& column_families, const std::vector& child_iterators) - : impl_(comparator, column_families, child_iterators, ResetFunc(this), - PopulateFunc(this)) {} + : impl_(comparator, allow_unprepared_value, column_families, + child_iterators, ResetFunc(this), PopulateFunc(this)) {} ~CoalescingIterator() override {} // No copy allowed @@ -47,6 +47,8 @@ class CoalescingIterator : public Iterator { wide_columns_.clear(); } + bool PrepareValue() override { return impl_.PrepareValue(); } + private: class ResetFunc { public: diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index 11b83ad79d..49d64c3cf1 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -3992,9 +3992,9 @@ std::unique_ptr DBImpl::NewMultiCfIterator( if (!s.ok()) { return error_iterator_func(s); } - return std::make_unique(column_families[0]->GetComparator(), - column_families, - std::move(child_iterators)); + return std::make_unique( + column_families[0]->GetComparator(), _read_options.allow_unprepared_value, + column_families, std::move(child_iterators)); } Status DBImpl::NewIterators( diff --git a/db/multi_cf_iterator_impl.h b/db/multi_cf_iterator_impl.h index aa91955be5..4bea05a5be 100644 --- a/db/multi_cf_iterator_impl.h +++ b/db/multi_cf_iterator_impl.h @@ -24,11 +24,12 @@ struct MultiCfIteratorInfo { template class MultiCfIteratorImpl { public: - MultiCfIteratorImpl(const Comparator* comparator, + MultiCfIteratorImpl(const Comparator* comparator, bool allow_unprepared_value, const std::vector& column_families, const std::vector& child_iterators, ResetFunc reset_func, PopulateFunc populate_func) : comparator_(comparator), + allow_unprepared_value_(allow_unprepared_value), heap_(MultiCfMinHeap( MultiCfHeapItemComparator>(comparator_))), reset_func_(std::move(reset_func)), @@ -100,6 +101,41 @@ class MultiCfIteratorImpl { AdvanceIterator(max_heap, [](Iterator* iter) { iter->Prev(); }); } + bool PrepareValue() { + assert(Valid()); + + if (!allow_unprepared_value_) { + return true; + } + + auto prepare_value_func = [this](auto& heap, Iterator* iterator) { + assert(iterator); + assert(iterator->Valid()); + assert(iterator->status().ok()); + + if (!iterator->PrepareValue()) { + assert(!iterator->Valid()); + assert(!iterator->status().ok()); + + considerStatus(iterator->status()); + assert(!status_.ok()); + + heap.clear(); + return false; + } + + return true; + }; + + if (std::holds_alternative(heap_)) { + return PopulateIterator(std::get(heap_), + prepare_value_func); + } + + return PopulateIterator(std::get(heap_), + prepare_value_func); + } + private: std::vector>> cfh_iter_pairs_; @@ -124,7 +160,10 @@ class MultiCfIteratorImpl { private: const Comparator* comparator_; }; + const Comparator* comparator_; + bool allow_unprepared_value_; + using MultiCfMinHeap = BinaryHeap>>; @@ -185,13 +224,16 @@ class MultiCfIteratorImpl { if (!status_.ok()) { // Non-OK status from the iterator. Bail out early heap.clear(); - break; + return; } } ++i; } - if (!heap.empty()) { - PopulateIterator(heap); + if (!allow_unprepared_value_ && !heap.empty()) { + [[maybe_unused]] const bool result = PopulateIterator( + heap, + [](auto& /* heap */, Iterator* /* iterator */) { return true; }); + assert(result); } } @@ -257,13 +299,16 @@ class MultiCfIteratorImpl { } } - if (!heap.empty()) { - PopulateIterator(heap); + if (!allow_unprepared_value_ && !heap.empty()) { + [[maybe_unused]] const bool result = PopulateIterator( + heap, + [](auto& /* heap */, Iterator* /* iterator */) { return true; }); + assert(result); } } - template - void PopulateIterator(BinaryHeap& heap) { + template + bool PopulateIterator(BinaryHeap& heap, PrepareValueFunc prepare_value_func) { // 1. Keep the top iterator (by popping it from the heap) and add it to list // to populate // 2. For all non-top iterators having the same key as top iter popped @@ -279,10 +324,14 @@ class MultiCfIteratorImpl { assert(top.iterator->Valid()); assert(top.iterator->status().ok()); - heap.pop(); + if (!prepare_value_func(heap, top.iterator)) { + return false; + } autovector to_populate; + to_populate.push_back(top); + heap.pop(); while (!heap.empty()) { auto current = heap.top(); @@ -295,6 +344,10 @@ class MultiCfIteratorImpl { break; } + if (!prepare_value_func(heap, current.iterator)) { + return false; + } + to_populate.push_back(current); heap.pop(); } @@ -305,6 +358,8 @@ class MultiCfIteratorImpl { } populate_func_(to_populate); + + return true; } }; diff --git a/db/multi_cf_iterator_test.cc b/db/multi_cf_iterator_test.cc index 6c35918480..0cdf5fae91 100644 --- a/db/multi_cf_iterator_test.cc +++ b/db/multi_cf_iterator_test.cc @@ -21,18 +21,27 @@ class CoalescingIteratorTest : public DBTestBase { const std::optional>& expected_wide_columns = std::nullopt, const Slice* lower_bound = nullptr, - const Slice* upper_bound = nullptr) { + const Slice* upper_bound = nullptr, + bool allow_unprepared_value = false) { const size_t num_keys = expected_keys.size(); ReadOptions read_options; read_options.iterate_lower_bound = lower_bound; read_options.iterate_upper_bound = upper_bound; + read_options.allow_unprepared_value = allow_unprepared_value; std::unique_ptr iter = db_->NewCoalescingIterator(read_options, cfhs); auto check_iter_entry = [&](size_t idx) { ASSERT_EQ(iter->key(), expected_keys[idx]); + + if (allow_unprepared_value) { + ASSERT_TRUE(iter->value().empty()); + ASSERT_TRUE(iter->PrepareValue()); + ASSERT_TRUE(iter->Valid()); + } + ASSERT_EQ(iter->value(), expected_values[idx]); if (expected_wide_columns.has_value()) { ASSERT_EQ(iter->columns(), expected_wide_columns.value()[idx]); @@ -781,6 +790,137 @@ TEST_F(CoalescingIteratorTest, CustomComparatorsInMultiCFs) { ASSERT_OK(iter->status()); } +TEST_F(CoalescingIteratorTest, AllowUnpreparedValue) { + Options options = GetDefaultOptions(); + options.enable_blob_files = true; + + CreateAndReopenWithCF({"cf_1", "cf_2", "cf_3"}, options); + + ASSERT_OK(Put(0, "key_1", "key_1_cf_0_val")); + ASSERT_OK(Put(3, "key_1", "key_1_cf_3_val")); + ASSERT_OK(Put(1, "key_2", "key_2_cf_1_val")); + ASSERT_OK(Put(2, "key_2", "key_2_cf_2_val")); + ASSERT_OK(Put(0, "key_3", "key_3_cf_0_val")); + ASSERT_OK(Put(1, "key_3", "key_3_cf_1_val")); + ASSERT_OK(Put(3, "key_3", "key_3_cf_3_val")); + + ASSERT_OK(Flush()); + + std::vector cfhs_order_3_2_0_1{handles_[3], handles_[2], + handles_[0], handles_[1]}; + std::vector expected_keys{"key_1", "key_2", "key_3"}; + std::vector expected_values{"key_1_cf_0_val", "key_2_cf_1_val", + "key_3_cf_1_val"}; + + VerifyCoalescingIterator(cfhs_order_3_2_0_1, expected_keys, expected_values, + /* expected_wide_columns */ std::nullopt, + /* lower_bound */ nullptr, /* upper_bound */ nullptr, + /* allow_unprepared_value */ true); + + ReadOptions read_options; + read_options.allow_unprepared_value = true; + + { + std::unique_ptr iter = + db_->NewCoalescingIterator(read_options, cfhs_order_3_2_0_1); + iter->Seek(""); + ASSERT_EQ(IterStatus(iter.get()), "key_1->"); + ASSERT_TRUE(iter->PrepareValue()); + ASSERT_EQ(IterStatus(iter.get()), "key_1->key_1_cf_0_val"); + + iter->Seek("key_1"); + ASSERT_EQ(IterStatus(iter.get()), "key_1->"); + ASSERT_TRUE(iter->PrepareValue()); + ASSERT_EQ(IterStatus(iter.get()), "key_1->key_1_cf_0_val"); + + iter->Seek("key_2"); + ASSERT_EQ(IterStatus(iter.get()), "key_2->"); + ASSERT_TRUE(iter->PrepareValue()); + ASSERT_EQ(IterStatus(iter.get()), "key_2->key_2_cf_1_val"); + + iter->Next(); + ASSERT_EQ(IterStatus(iter.get()), "key_3->"); + ASSERT_TRUE(iter->PrepareValue()); + ASSERT_EQ(IterStatus(iter.get()), "key_3->key_3_cf_1_val"); + + iter->Seek("key_x"); + ASSERT_EQ(IterStatus(iter.get()), "(invalid)"); + } + + { + std::unique_ptr iter = + db_->NewCoalescingIterator(read_options, cfhs_order_3_2_0_1); + iter->SeekForPrev(""); + ASSERT_EQ(IterStatus(iter.get()), "(invalid)"); + + iter->SeekForPrev("key_1"); + ASSERT_EQ(IterStatus(iter.get()), "key_1->"); + ASSERT_TRUE(iter->PrepareValue()); + ASSERT_EQ(IterStatus(iter.get()), "key_1->key_1_cf_0_val"); + + iter->Next(); + ASSERT_EQ(IterStatus(iter.get()), "key_2->"); + ASSERT_TRUE(iter->PrepareValue()); + ASSERT_EQ(IterStatus(iter.get()), "key_2->key_2_cf_1_val"); + + iter->SeekForPrev("key_x"); + ASSERT_EQ(IterStatus(iter.get()), "key_3->"); + ASSERT_TRUE(iter->PrepareValue()); + ASSERT_EQ(IterStatus(iter.get()), "key_3->key_3_cf_1_val"); + + iter->Next(); + ASSERT_EQ(IterStatus(iter.get()), "(invalid)"); + } +} + +TEST_F(CoalescingIteratorTest, AllowUnpreparedValue_Corruption) { + Options options = GetDefaultOptions(); + options.enable_blob_files = true; + + CreateAndReopenWithCF({"cf_1", "cf_2", "cf_3"}, options); + + ASSERT_OK(Put(0, "key_1", "key_1_cf_0_val")); + ASSERT_OK(Put(3, "key_1", "key_1_cf_3_val")); + ASSERT_OK(Put(1, "key_2", "key_2_cf_1_val")); + ASSERT_OK(Put(2, "key_2", "key_2_cf_2_val")); + ASSERT_OK(Put(0, "key_3", "key_3_cf_0_val")); + ASSERT_OK(Put(1, "key_3", "key_3_cf_1_val")); + ASSERT_OK(Put(3, "key_3", "key_3_cf_3_val")); + + ASSERT_OK(Flush()); + + ReadOptions read_options; + read_options.allow_unprepared_value = true; + + std::vector cfhs_order_3_2_0_1{handles_[3], handles_[2], + handles_[0], handles_[1]}; + + std::unique_ptr iter = + db_->NewCoalescingIterator(read_options, cfhs_order_3_2_0_1); + iter->SeekToFirst(); + + ASSERT_TRUE(iter->Valid()); + ASSERT_OK(iter->status()); + ASSERT_EQ(iter->key(), "key_1"); + ASSERT_TRUE(iter->value().empty()); + + SyncPoint::GetInstance()->SetCallBack( + "BlobFileReader::GetBlob:TamperWithResult", [](void* arg) { + Slice* const blob_index = static_cast(arg); + assert(blob_index); + assert(!blob_index->empty()); + blob_index->remove_prefix(1); + }); + SyncPoint::GetInstance()->EnableProcessing(); + + ASSERT_FALSE(iter->PrepareValue()); + ASSERT_FALSE(iter->Valid()); + ASSERT_TRUE(iter->status().IsCorruption()); + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); +} + class AttributeGroupIteratorTest : public DBTestBase { public: AttributeGroupIteratorTest() @@ -790,17 +930,27 @@ class AttributeGroupIteratorTest : public DBTestBase { const std::vector& cfhs, const std::vector& expected_keys, const std::vector& expected_attribute_groups, - const Slice* lower_bound = nullptr, const Slice* upper_bound = nullptr) { + const Slice* lower_bound = nullptr, const Slice* upper_bound = nullptr, + bool allow_unprepared_value = false) { const size_t num_keys = expected_keys.size(); ReadOptions read_options; read_options.iterate_lower_bound = lower_bound; read_options.iterate_upper_bound = upper_bound; + read_options.allow_unprepared_value = allow_unprepared_value; + std::unique_ptr iter = db_->NewAttributeGroupIterator(read_options, cfhs); auto check_iter_entry = [&](size_t idx) { ASSERT_EQ(iter->key(), expected_keys[idx]); + + if (allow_unprepared_value) { + ASSERT_TRUE(iter->attribute_groups().empty()); + ASSERT_TRUE(iter->PrepareValue()); + ASSERT_TRUE(iter->Valid()); + } + ASSERT_EQ(iter->attribute_groups(), expected_attribute_groups[idx]); }; @@ -938,6 +1088,82 @@ TEST_F(AttributeGroupIteratorTest, IterateAttributeGroups) { } } +TEST_F(AttributeGroupIteratorTest, AllowUnpreparedValue) { + Options options = GetDefaultOptions(); + CreateAndReopenWithCF({"cf_1", "cf_2", "cf_3"}, options); + + constexpr char key_1[] = "key_1"; + WideColumns key_1_columns_in_cf_2{ + {kDefaultWideColumnName, "cf_2_col_val_0_key_1"}, + {"cf_2_col_name_1", "cf_2_col_val_1_key_1"}, + {"cf_2_col_name_2", "cf_2_col_val_2_key_1"}}; + WideColumns key_1_columns_in_cf_3{ + {"cf_3_col_name_1", "cf_3_col_val_1_key_1"}, + {"cf_3_col_name_2", "cf_3_col_val_2_key_1"}, + {"cf_3_col_name_3", "cf_3_col_val_3_key_1"}}; + + constexpr char key_2[] = "key_2"; + WideColumns key_2_columns_in_cf_1{ + {"cf_1_col_name_1", "cf_1_col_val_1_key_2"}}; + WideColumns key_2_columns_in_cf_2{ + {"cf_2_col_name_1", "cf_2_col_val_1_key_2"}, + {"cf_2_col_name_2", "cf_2_col_val_2_key_2"}}; + + constexpr char key_3[] = "key_3"; + WideColumns key_3_columns_in_cf_1{ + {"cf_1_col_name_1", "cf_1_col_val_1_key_3"}}; + WideColumns key_3_columns_in_cf_3{ + {"cf_3_col_name_1", "cf_3_col_val_1_key_3"}}; + + constexpr char key_4[] = "key_4"; + WideColumns key_4_columns_in_cf_0{ + {"cf_0_col_name_1", "cf_0_col_val_1_key_4"}}; + WideColumns key_4_columns_in_cf_2{ + {"cf_2_col_name_1", "cf_2_col_val_1_key_4"}}; + + AttributeGroups key_1_attribute_groups{ + AttributeGroup(handles_[2], key_1_columns_in_cf_2), + AttributeGroup(handles_[3], key_1_columns_in_cf_3)}; + AttributeGroups key_2_attribute_groups{ + AttributeGroup(handles_[1], key_2_columns_in_cf_1), + AttributeGroup(handles_[2], key_2_columns_in_cf_2)}; + AttributeGroups key_3_attribute_groups{ + AttributeGroup(handles_[1], key_3_columns_in_cf_1), + AttributeGroup(handles_[3], key_3_columns_in_cf_3)}; + AttributeGroups key_4_attribute_groups{ + AttributeGroup(handles_[0], key_4_columns_in_cf_0), + AttributeGroup(handles_[2], key_4_columns_in_cf_2)}; + + ASSERT_OK(db_->PutEntity(WriteOptions(), key_1, key_1_attribute_groups)); + ASSERT_OK(db_->PutEntity(WriteOptions(), key_2, key_2_attribute_groups)); + ASSERT_OK(db_->PutEntity(WriteOptions(), key_3, key_3_attribute_groups)); + ASSERT_OK(db_->PutEntity(WriteOptions(), key_4, key_4_attribute_groups)); + + IteratorAttributeGroups key_1_expected_attribute_groups{ + IteratorAttributeGroup(key_1_attribute_groups[0]), + IteratorAttributeGroup(key_1_attribute_groups[1])}; + IteratorAttributeGroups key_2_expected_attribute_groups{ + IteratorAttributeGroup(key_2_attribute_groups[0]), + IteratorAttributeGroup(key_2_attribute_groups[1])}; + IteratorAttributeGroups key_3_expected_attribute_groups{ + IteratorAttributeGroup(key_3_attribute_groups[0]), + IteratorAttributeGroup(key_3_attribute_groups[1])}; + IteratorAttributeGroups key_4_expected_attribute_groups{ + IteratorAttributeGroup(key_4_attribute_groups[0]), + IteratorAttributeGroup(key_4_attribute_groups[1])}; + + std::vector cfhs_order_0_1_2_3{handles_[0], handles_[1], + handles_[2], handles_[3]}; + std::vector expected_keys{key_1, key_2, key_3, key_4}; + std::vector expected_attribute_groups{ + key_1_expected_attribute_groups, key_2_expected_attribute_groups, + key_3_expected_attribute_groups, key_4_expected_attribute_groups}; + VerifyAttributeGroupIterator( + cfhs_order_0_1_2_3, expected_keys, expected_attribute_groups, + /* lower_bound */ nullptr, /* upper_bound */ nullptr, + /* allow_unprepared_value */ true); +} + } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) { diff --git a/include/rocksdb/iterator_base.h b/include/rocksdb/iterator_base.h index a7b5ac9cda..dc172ffedb 100644 --- a/include/rocksdb/iterator_base.h +++ b/include/rocksdb/iterator_base.h @@ -75,20 +75,22 @@ class IteratorBase : public Cleanable { } // When ReadOptions::allow_unprepared_value is set, the iterator may defer - // loading the value when moving to a different entry (i.e. during - // SeekToFirst/SeekToLast/Seek/SeekForPrev/Next/Prev operations). This can be - // used to save on I/O when the values associated with certain keys may not be - // used by the application. When allow_unprepared_value is true, the - // application is expected to call this method before accessing the value to - // ensure it is loaded (for all entries whose values are actually needed). - // Note: it is safe to call this method for entries whose values are already - // loaded. + // loading and/or preparing the value when moving to a different entry (i.e. + // during SeekToFirst/SeekToLast/Seek/SeekForPrev/Next/Prev operations). This + // can be used to save on I/O and/or CPU when the values associated with + // certain keys may not be used by the application. When + // allow_unprepared_value is true, the application is expected to call this + // method before accessing the value to ensure it is prepared (for all entries + // whose values are actually needed). Note: it is safe to call this method for + // entries whose values are already prepared. // // Returns true on success. Returns false and sets Valid() to false and - // status() to non-OK if there is an error while loading the value. + // status() to non-OK if there is an error while loading or preparing the + // value. // - // Note: this method is currently only applicable to large values stored in - // blob files using BlobDB, and has no effect otherwise. + // Note: this option currently only applies to 1) large values stored in blob + // files using BlobDB and 2) multi-column-family iterators (CoalescingIterator + // and AttributeGroupIterator). Otherwise, it has no effect. // // REQUIRES: Valid() virtual bool PrepareValue() { return true; } diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 9f62209190..9160494ef3 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -1938,14 +1938,15 @@ struct ReadOptions { // Default: true bool auto_readahead_size = true; - // When set, the iterator may defer loading the value when moving to a - // different entry (i.e. during SeekToFirst/SeekToLast/Seek/SeekForPrev/ - // Next/Prev operations). This can be used to save on I/O when the values - // associated with certain keys may not be used by the application. See also - // IteratorBase::PrepareValue(). + // When set, the iterator may defer loading and/or preparing the value when + // moving to a different entry (i.e. during SeekToFirst/SeekToLast/Seek/ + // SeekForPrev/Next/Prev operations). This can be used to save on I/O and/or + // CPU when the values associated with certain keys may not be used by the + // application. See also IteratorBase::PrepareValue(). // - // Note: this option currently only applies to large values stored in blob - // files using BlobDB, and has no effect otherwise. + // Note: this option currently only applies to 1) large values stored in blob + // files using BlobDB and 2) multi-column-family iterators (CoalescingIterator + // and AttributeGroupIterator). Otherwise, it has no effect. // // Default: false bool allow_unprepared_value = false; diff --git a/unreleased_history/new_features/blob_db_allow_unprepared_value.md b/unreleased_history/new_features/blob_db_allow_unprepared_value.md index a0bbcc697a..fb655a134f 100644 --- a/unreleased_history/new_features/blob_db_allow_unprepared_value.md +++ b/unreleased_history/new_features/blob_db_allow_unprepared_value.md @@ -1 +1 @@ -When using iterators with BlobDB, it is now possible to load large values on an on-demand basis, i.e. only if they are actually needed by the application. This can save I/O in use cases where the values associated with certain keys are not needed. For more details, see the new read option `allow_unprepared_value` and the iterator API `PrepareValue`. Currently, this functionality is only supported for regular single-column-family iterators. +When using iterators with BlobDB, it is now possible to load large values on an on-demand basis, i.e. only if they are actually needed by the application. This can save I/O in use cases where the values associated with certain keys are not needed. For more details, see the new read option `allow_unprepared_value` and the iterator API `PrepareValue`. diff --git a/unreleased_history/new_features/multi_cf_allow_unprepared_value.md b/unreleased_history/new_features/multi_cf_allow_unprepared_value.md new file mode 100644 index 0000000000..bfb0049878 --- /dev/null +++ b/unreleased_history/new_features/multi_cf_allow_unprepared_value.md @@ -0,0 +1 @@ +The option `allow_unprepared_value` is now also supported for multi-column-family iterators (i.e. `CoalescingIterator` and `AttributeGroupIterator`).