Support allow_unprepared_value for multi-CF iterators (#13079)

Summary:
Pull Request resolved: https://github.com/facebook/rocksdb/pull/13079

The patch adds support for the new read option `allow_unprepared_value` to the multi-column-family iterators `CoalescingIterator` and `AttributeGroupIterator`. When this option is set, these iterators populate their value (`value()` + `columns()` or `attribute_groups()`) in an on-demand fashion when `PrepareValue()` is called. Calling `PrepareValue()` on the child iterators is similarly deferred until `PrepareValue()` is called on the main iterator.

Reviewed By: jowlyzhang

Differential Revision: D64570587

fbshipit-source-id: 783c8d408ad10074417dabca7b82c5e1fe5cab36
This commit is contained in:
Levi Tamasi 2024-10-20 20:53:08 -07:00 committed by Facebook GitHub Bot
parent 0ca691654f
commit c0be6a4b90
9 changed files with 328 additions and 39 deletions

View File

@ -13,11 +13,11 @@ namespace ROCKSDB_NAMESPACE {
class AttributeGroupIteratorImpl : public AttributeGroupIterator {
public:
AttributeGroupIteratorImpl(
const Comparator* comparator,
const Comparator* comparator, bool allow_unprepared_value,
const std::vector<ColumnFamilyHandle*>& column_families,
const std::vector<Iterator*>& child_iterators)
: impl_(comparator, column_families, child_iterators, ResetFunc(this),
PopulateFunc(this)) {}
: impl_(comparator, allow_unprepared_value, column_families,
child_iterators, ResetFunc(this), PopulateFunc(this)) {}
~AttributeGroupIteratorImpl() override {}
// No copy allowed
@ -42,6 +42,8 @@ class AttributeGroupIteratorImpl : public AttributeGroupIterator {
void Reset() { attribute_groups_.clear(); }
bool PrepareValue() override { return impl_.PrepareValue(); }
private:
class ResetFunc {
public:

View File

@ -12,11 +12,11 @@ namespace ROCKSDB_NAMESPACE {
// EXPERIMENTAL
class CoalescingIterator : public Iterator {
public:
CoalescingIterator(const Comparator* comparator,
CoalescingIterator(const Comparator* comparator, bool allow_unprepared_value,
const std::vector<ColumnFamilyHandle*>& column_families,
const std::vector<Iterator*>& child_iterators)
: impl_(comparator, column_families, child_iterators, ResetFunc(this),
PopulateFunc(this)) {}
: impl_(comparator, allow_unprepared_value, column_families,
child_iterators, ResetFunc(this), PopulateFunc(this)) {}
~CoalescingIterator() override {}
// No copy allowed
@ -47,6 +47,8 @@ class CoalescingIterator : public Iterator {
wide_columns_.clear();
}
bool PrepareValue() override { return impl_.PrepareValue(); }
private:
class ResetFunc {
public:

View File

@ -3992,9 +3992,9 @@ std::unique_ptr<IterType> DBImpl::NewMultiCfIterator(
if (!s.ok()) {
return error_iterator_func(s);
}
return std::make_unique<ImplType>(column_families[0]->GetComparator(),
column_families,
std::move(child_iterators));
return std::make_unique<ImplType>(
column_families[0]->GetComparator(), _read_options.allow_unprepared_value,
column_families, std::move(child_iterators));
}
Status DBImpl::NewIterators(

View File

@ -24,11 +24,12 @@ struct MultiCfIteratorInfo {
template <typename ResetFunc, typename PopulateFunc>
class MultiCfIteratorImpl {
public:
MultiCfIteratorImpl(const Comparator* comparator,
MultiCfIteratorImpl(const Comparator* comparator, bool allow_unprepared_value,
const std::vector<ColumnFamilyHandle*>& column_families,
const std::vector<Iterator*>& child_iterators,
ResetFunc reset_func, PopulateFunc populate_func)
: comparator_(comparator),
allow_unprepared_value_(allow_unprepared_value),
heap_(MultiCfMinHeap(
MultiCfHeapItemComparator<std::greater<int>>(comparator_))),
reset_func_(std::move(reset_func)),
@ -100,6 +101,41 @@ class MultiCfIteratorImpl {
AdvanceIterator(max_heap, [](Iterator* iter) { iter->Prev(); });
}
bool PrepareValue() {
assert(Valid());
if (!allow_unprepared_value_) {
return true;
}
auto prepare_value_func = [this](auto& heap, Iterator* iterator) {
assert(iterator);
assert(iterator->Valid());
assert(iterator->status().ok());
if (!iterator->PrepareValue()) {
assert(!iterator->Valid());
assert(!iterator->status().ok());
considerStatus(iterator->status());
assert(!status_.ok());
heap.clear();
return false;
}
return true;
};
if (std::holds_alternative<MultiCfMaxHeap>(heap_)) {
return PopulateIterator(std::get<MultiCfMaxHeap>(heap_),
prepare_value_func);
}
return PopulateIterator(std::get<MultiCfMinHeap>(heap_),
prepare_value_func);
}
private:
std::vector<std::pair<ColumnFamilyHandle*, std::unique_ptr<Iterator>>>
cfh_iter_pairs_;
@ -124,7 +160,10 @@ class MultiCfIteratorImpl {
private:
const Comparator* comparator_;
};
const Comparator* comparator_;
bool allow_unprepared_value_;
using MultiCfMinHeap =
BinaryHeap<MultiCfIteratorInfo,
MultiCfHeapItemComparator<std::greater<int>>>;
@ -185,13 +224,16 @@ class MultiCfIteratorImpl {
if (!status_.ok()) {
// Non-OK status from the iterator. Bail out early
heap.clear();
break;
return;
}
}
++i;
}
if (!heap.empty()) {
PopulateIterator(heap);
if (!allow_unprepared_value_ && !heap.empty()) {
[[maybe_unused]] const bool result = PopulateIterator(
heap,
[](auto& /* heap */, Iterator* /* iterator */) { return true; });
assert(result);
}
}
@ -257,13 +299,16 @@ class MultiCfIteratorImpl {
}
}
if (!heap.empty()) {
PopulateIterator(heap);
if (!allow_unprepared_value_ && !heap.empty()) {
[[maybe_unused]] const bool result = PopulateIterator(
heap,
[](auto& /* heap */, Iterator* /* iterator */) { return true; });
assert(result);
}
}
template <typename BinaryHeap>
void PopulateIterator(BinaryHeap& heap) {
template <typename BinaryHeap, typename PrepareValueFunc>
bool PopulateIterator(BinaryHeap& heap, PrepareValueFunc prepare_value_func) {
// 1. Keep the top iterator (by popping it from the heap) and add it to list
// to populate
// 2. For all non-top iterators having the same key as top iter popped
@ -279,10 +324,14 @@ class MultiCfIteratorImpl {
assert(top.iterator->Valid());
assert(top.iterator->status().ok());
heap.pop();
if (!prepare_value_func(heap, top.iterator)) {
return false;
}
autovector<MultiCfIteratorInfo> to_populate;
to_populate.push_back(top);
heap.pop();
while (!heap.empty()) {
auto current = heap.top();
@ -295,6 +344,10 @@ class MultiCfIteratorImpl {
break;
}
if (!prepare_value_func(heap, current.iterator)) {
return false;
}
to_populate.push_back(current);
heap.pop();
}
@ -305,6 +358,8 @@ class MultiCfIteratorImpl {
}
populate_func_(to_populate);
return true;
}
};

View File

@ -21,18 +21,27 @@ class CoalescingIteratorTest : public DBTestBase {
const std::optional<std::vector<WideColumns>>&
expected_wide_columns = std::nullopt,
const Slice* lower_bound = nullptr,
const Slice* upper_bound = nullptr) {
const Slice* upper_bound = nullptr,
bool allow_unprepared_value = false) {
const size_t num_keys = expected_keys.size();
ReadOptions read_options;
read_options.iterate_lower_bound = lower_bound;
read_options.iterate_upper_bound = upper_bound;
read_options.allow_unprepared_value = allow_unprepared_value;
std::unique_ptr<Iterator> iter =
db_->NewCoalescingIterator(read_options, cfhs);
auto check_iter_entry = [&](size_t idx) {
ASSERT_EQ(iter->key(), expected_keys[idx]);
if (allow_unprepared_value) {
ASSERT_TRUE(iter->value().empty());
ASSERT_TRUE(iter->PrepareValue());
ASSERT_TRUE(iter->Valid());
}
ASSERT_EQ(iter->value(), expected_values[idx]);
if (expected_wide_columns.has_value()) {
ASSERT_EQ(iter->columns(), expected_wide_columns.value()[idx]);
@ -781,6 +790,137 @@ TEST_F(CoalescingIteratorTest, CustomComparatorsInMultiCFs) {
ASSERT_OK(iter->status());
}
TEST_F(CoalescingIteratorTest, AllowUnpreparedValue) {
Options options = GetDefaultOptions();
options.enable_blob_files = true;
CreateAndReopenWithCF({"cf_1", "cf_2", "cf_3"}, options);
ASSERT_OK(Put(0, "key_1", "key_1_cf_0_val"));
ASSERT_OK(Put(3, "key_1", "key_1_cf_3_val"));
ASSERT_OK(Put(1, "key_2", "key_2_cf_1_val"));
ASSERT_OK(Put(2, "key_2", "key_2_cf_2_val"));
ASSERT_OK(Put(0, "key_3", "key_3_cf_0_val"));
ASSERT_OK(Put(1, "key_3", "key_3_cf_1_val"));
ASSERT_OK(Put(3, "key_3", "key_3_cf_3_val"));
ASSERT_OK(Flush());
std::vector<ColumnFamilyHandle*> cfhs_order_3_2_0_1{handles_[3], handles_[2],
handles_[0], handles_[1]};
std::vector<Slice> expected_keys{"key_1", "key_2", "key_3"};
std::vector<Slice> expected_values{"key_1_cf_0_val", "key_2_cf_1_val",
"key_3_cf_1_val"};
VerifyCoalescingIterator(cfhs_order_3_2_0_1, expected_keys, expected_values,
/* expected_wide_columns */ std::nullopt,
/* lower_bound */ nullptr, /* upper_bound */ nullptr,
/* allow_unprepared_value */ true);
ReadOptions read_options;
read_options.allow_unprepared_value = true;
{
std::unique_ptr<Iterator> iter =
db_->NewCoalescingIterator(read_options, cfhs_order_3_2_0_1);
iter->Seek("");
ASSERT_EQ(IterStatus(iter.get()), "key_1->");
ASSERT_TRUE(iter->PrepareValue());
ASSERT_EQ(IterStatus(iter.get()), "key_1->key_1_cf_0_val");
iter->Seek("key_1");
ASSERT_EQ(IterStatus(iter.get()), "key_1->");
ASSERT_TRUE(iter->PrepareValue());
ASSERT_EQ(IterStatus(iter.get()), "key_1->key_1_cf_0_val");
iter->Seek("key_2");
ASSERT_EQ(IterStatus(iter.get()), "key_2->");
ASSERT_TRUE(iter->PrepareValue());
ASSERT_EQ(IterStatus(iter.get()), "key_2->key_2_cf_1_val");
iter->Next();
ASSERT_EQ(IterStatus(iter.get()), "key_3->");
ASSERT_TRUE(iter->PrepareValue());
ASSERT_EQ(IterStatus(iter.get()), "key_3->key_3_cf_1_val");
iter->Seek("key_x");
ASSERT_EQ(IterStatus(iter.get()), "(invalid)");
}
{
std::unique_ptr<Iterator> iter =
db_->NewCoalescingIterator(read_options, cfhs_order_3_2_0_1);
iter->SeekForPrev("");
ASSERT_EQ(IterStatus(iter.get()), "(invalid)");
iter->SeekForPrev("key_1");
ASSERT_EQ(IterStatus(iter.get()), "key_1->");
ASSERT_TRUE(iter->PrepareValue());
ASSERT_EQ(IterStatus(iter.get()), "key_1->key_1_cf_0_val");
iter->Next();
ASSERT_EQ(IterStatus(iter.get()), "key_2->");
ASSERT_TRUE(iter->PrepareValue());
ASSERT_EQ(IterStatus(iter.get()), "key_2->key_2_cf_1_val");
iter->SeekForPrev("key_x");
ASSERT_EQ(IterStatus(iter.get()), "key_3->");
ASSERT_TRUE(iter->PrepareValue());
ASSERT_EQ(IterStatus(iter.get()), "key_3->key_3_cf_1_val");
iter->Next();
ASSERT_EQ(IterStatus(iter.get()), "(invalid)");
}
}
TEST_F(CoalescingIteratorTest, AllowUnpreparedValue_Corruption) {
Options options = GetDefaultOptions();
options.enable_blob_files = true;
CreateAndReopenWithCF({"cf_1", "cf_2", "cf_3"}, options);
ASSERT_OK(Put(0, "key_1", "key_1_cf_0_val"));
ASSERT_OK(Put(3, "key_1", "key_1_cf_3_val"));
ASSERT_OK(Put(1, "key_2", "key_2_cf_1_val"));
ASSERT_OK(Put(2, "key_2", "key_2_cf_2_val"));
ASSERT_OK(Put(0, "key_3", "key_3_cf_0_val"));
ASSERT_OK(Put(1, "key_3", "key_3_cf_1_val"));
ASSERT_OK(Put(3, "key_3", "key_3_cf_3_val"));
ASSERT_OK(Flush());
ReadOptions read_options;
read_options.allow_unprepared_value = true;
std::vector<ColumnFamilyHandle*> cfhs_order_3_2_0_1{handles_[3], handles_[2],
handles_[0], handles_[1]};
std::unique_ptr<Iterator> iter =
db_->NewCoalescingIterator(read_options, cfhs_order_3_2_0_1);
iter->SeekToFirst();
ASSERT_TRUE(iter->Valid());
ASSERT_OK(iter->status());
ASSERT_EQ(iter->key(), "key_1");
ASSERT_TRUE(iter->value().empty());
SyncPoint::GetInstance()->SetCallBack(
"BlobFileReader::GetBlob:TamperWithResult", [](void* arg) {
Slice* const blob_index = static_cast<Slice*>(arg);
assert(blob_index);
assert(!blob_index->empty());
blob_index->remove_prefix(1);
});
SyncPoint::GetInstance()->EnableProcessing();
ASSERT_FALSE(iter->PrepareValue());
ASSERT_FALSE(iter->Valid());
ASSERT_TRUE(iter->status().IsCorruption());
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
}
class AttributeGroupIteratorTest : public DBTestBase {
public:
AttributeGroupIteratorTest()
@ -790,17 +930,27 @@ class AttributeGroupIteratorTest : public DBTestBase {
const std::vector<ColumnFamilyHandle*>& cfhs,
const std::vector<Slice>& expected_keys,
const std::vector<IteratorAttributeGroups>& expected_attribute_groups,
const Slice* lower_bound = nullptr, const Slice* upper_bound = nullptr) {
const Slice* lower_bound = nullptr, const Slice* upper_bound = nullptr,
bool allow_unprepared_value = false) {
const size_t num_keys = expected_keys.size();
ReadOptions read_options;
read_options.iterate_lower_bound = lower_bound;
read_options.iterate_upper_bound = upper_bound;
read_options.allow_unprepared_value = allow_unprepared_value;
std::unique_ptr<AttributeGroupIterator> iter =
db_->NewAttributeGroupIterator(read_options, cfhs);
auto check_iter_entry = [&](size_t idx) {
ASSERT_EQ(iter->key(), expected_keys[idx]);
if (allow_unprepared_value) {
ASSERT_TRUE(iter->attribute_groups().empty());
ASSERT_TRUE(iter->PrepareValue());
ASSERT_TRUE(iter->Valid());
}
ASSERT_EQ(iter->attribute_groups(), expected_attribute_groups[idx]);
};
@ -938,6 +1088,82 @@ TEST_F(AttributeGroupIteratorTest, IterateAttributeGroups) {
}
}
TEST_F(AttributeGroupIteratorTest, AllowUnpreparedValue) {
Options options = GetDefaultOptions();
CreateAndReopenWithCF({"cf_1", "cf_2", "cf_3"}, options);
constexpr char key_1[] = "key_1";
WideColumns key_1_columns_in_cf_2{
{kDefaultWideColumnName, "cf_2_col_val_0_key_1"},
{"cf_2_col_name_1", "cf_2_col_val_1_key_1"},
{"cf_2_col_name_2", "cf_2_col_val_2_key_1"}};
WideColumns key_1_columns_in_cf_3{
{"cf_3_col_name_1", "cf_3_col_val_1_key_1"},
{"cf_3_col_name_2", "cf_3_col_val_2_key_1"},
{"cf_3_col_name_3", "cf_3_col_val_3_key_1"}};
constexpr char key_2[] = "key_2";
WideColumns key_2_columns_in_cf_1{
{"cf_1_col_name_1", "cf_1_col_val_1_key_2"}};
WideColumns key_2_columns_in_cf_2{
{"cf_2_col_name_1", "cf_2_col_val_1_key_2"},
{"cf_2_col_name_2", "cf_2_col_val_2_key_2"}};
constexpr char key_3[] = "key_3";
WideColumns key_3_columns_in_cf_1{
{"cf_1_col_name_1", "cf_1_col_val_1_key_3"}};
WideColumns key_3_columns_in_cf_3{
{"cf_3_col_name_1", "cf_3_col_val_1_key_3"}};
constexpr char key_4[] = "key_4";
WideColumns key_4_columns_in_cf_0{
{"cf_0_col_name_1", "cf_0_col_val_1_key_4"}};
WideColumns key_4_columns_in_cf_2{
{"cf_2_col_name_1", "cf_2_col_val_1_key_4"}};
AttributeGroups key_1_attribute_groups{
AttributeGroup(handles_[2], key_1_columns_in_cf_2),
AttributeGroup(handles_[3], key_1_columns_in_cf_3)};
AttributeGroups key_2_attribute_groups{
AttributeGroup(handles_[1], key_2_columns_in_cf_1),
AttributeGroup(handles_[2], key_2_columns_in_cf_2)};
AttributeGroups key_3_attribute_groups{
AttributeGroup(handles_[1], key_3_columns_in_cf_1),
AttributeGroup(handles_[3], key_3_columns_in_cf_3)};
AttributeGroups key_4_attribute_groups{
AttributeGroup(handles_[0], key_4_columns_in_cf_0),
AttributeGroup(handles_[2], key_4_columns_in_cf_2)};
ASSERT_OK(db_->PutEntity(WriteOptions(), key_1, key_1_attribute_groups));
ASSERT_OK(db_->PutEntity(WriteOptions(), key_2, key_2_attribute_groups));
ASSERT_OK(db_->PutEntity(WriteOptions(), key_3, key_3_attribute_groups));
ASSERT_OK(db_->PutEntity(WriteOptions(), key_4, key_4_attribute_groups));
IteratorAttributeGroups key_1_expected_attribute_groups{
IteratorAttributeGroup(key_1_attribute_groups[0]),
IteratorAttributeGroup(key_1_attribute_groups[1])};
IteratorAttributeGroups key_2_expected_attribute_groups{
IteratorAttributeGroup(key_2_attribute_groups[0]),
IteratorAttributeGroup(key_2_attribute_groups[1])};
IteratorAttributeGroups key_3_expected_attribute_groups{
IteratorAttributeGroup(key_3_attribute_groups[0]),
IteratorAttributeGroup(key_3_attribute_groups[1])};
IteratorAttributeGroups key_4_expected_attribute_groups{
IteratorAttributeGroup(key_4_attribute_groups[0]),
IteratorAttributeGroup(key_4_attribute_groups[1])};
std::vector<ColumnFamilyHandle*> cfhs_order_0_1_2_3{handles_[0], handles_[1],
handles_[2], handles_[3]};
std::vector<Slice> expected_keys{key_1, key_2, key_3, key_4};
std::vector<IteratorAttributeGroups> expected_attribute_groups{
key_1_expected_attribute_groups, key_2_expected_attribute_groups,
key_3_expected_attribute_groups, key_4_expected_attribute_groups};
VerifyAttributeGroupIterator(
cfhs_order_0_1_2_3, expected_keys, expected_attribute_groups,
/* lower_bound */ nullptr, /* upper_bound */ nullptr,
/* allow_unprepared_value */ true);
}
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {

View File

@ -75,20 +75,22 @@ class IteratorBase : public Cleanable {
}
// When ReadOptions::allow_unprepared_value is set, the iterator may defer
// loading the value when moving to a different entry (i.e. during
// SeekToFirst/SeekToLast/Seek/SeekForPrev/Next/Prev operations). This can be
// used to save on I/O when the values associated with certain keys may not be
// used by the application. When allow_unprepared_value is true, the
// application is expected to call this method before accessing the value to
// ensure it is loaded (for all entries whose values are actually needed).
// Note: it is safe to call this method for entries whose values are already
// loaded.
// loading and/or preparing the value when moving to a different entry (i.e.
// during SeekToFirst/SeekToLast/Seek/SeekForPrev/Next/Prev operations). This
// can be used to save on I/O and/or CPU when the values associated with
// certain keys may not be used by the application. When
// allow_unprepared_value is true, the application is expected to call this
// method before accessing the value to ensure it is prepared (for all entries
// whose values are actually needed). Note: it is safe to call this method for
// entries whose values are already prepared.
//
// Returns true on success. Returns false and sets Valid() to false and
// status() to non-OK if there is an error while loading the value.
// status() to non-OK if there is an error while loading or preparing the
// value.
//
// Note: this method is currently only applicable to large values stored in
// blob files using BlobDB, and has no effect otherwise.
// Note: this option currently only applies to 1) large values stored in blob
// files using BlobDB and 2) multi-column-family iterators (CoalescingIterator
// and AttributeGroupIterator). Otherwise, it has no effect.
//
// REQUIRES: Valid()
virtual bool PrepareValue() { return true; }

View File

@ -1938,14 +1938,15 @@ struct ReadOptions {
// Default: true
bool auto_readahead_size = true;
// When set, the iterator may defer loading the value when moving to a
// different entry (i.e. during SeekToFirst/SeekToLast/Seek/SeekForPrev/
// Next/Prev operations). This can be used to save on I/O when the values
// associated with certain keys may not be used by the application. See also
// IteratorBase::PrepareValue().
// When set, the iterator may defer loading and/or preparing the value when
// moving to a different entry (i.e. during SeekToFirst/SeekToLast/Seek/
// SeekForPrev/Next/Prev operations). This can be used to save on I/O and/or
// CPU when the values associated with certain keys may not be used by the
// application. See also IteratorBase::PrepareValue().
//
// Note: this option currently only applies to large values stored in blob
// files using BlobDB, and has no effect otherwise.
// Note: this option currently only applies to 1) large values stored in blob
// files using BlobDB and 2) multi-column-family iterators (CoalescingIterator
// and AttributeGroupIterator). Otherwise, it has no effect.
//
// Default: false
bool allow_unprepared_value = false;

View File

@ -1 +1 @@
When using iterators with BlobDB, it is now possible to load large values on an on-demand basis, i.e. only if they are actually needed by the application. This can save I/O in use cases where the values associated with certain keys are not needed. For more details, see the new read option `allow_unprepared_value` and the iterator API `PrepareValue`. Currently, this functionality is only supported for regular single-column-family iterators.
When using iterators with BlobDB, it is now possible to load large values on an on-demand basis, i.e. only if they are actually needed by the application. This can save I/O in use cases where the values associated with certain keys are not needed. For more details, see the new read option `allow_unprepared_value` and the iterator API `PrepareValue`.

View File

@ -0,0 +1 @@
The option `allow_unprepared_value` is now also supported for multi-column-family iterators (i.e. `CoalescingIterator` and `AttributeGroupIterator`).