MultiCFIterator Refactor - CoalescingIterator & AttributeGroupIterator (#12480)

Summary:
There are a couple of reasons to modify the current implementation of the MultiCfIterator, which implements the generic `Iterator` interface.
- The default behavior of `value()`/`columns()` returning data from different Column Families for different keys can be prone to errors, even though there might be valid use cases where users do not care about the origin of the value/columns.
- The `attribute_groups()` API, which is not yet implemented, will not be useful for a single-CF iterator.

In this PR, we are implementing the following changes:
- `IteratorBase` introduced, which includes all basic iterator functions except `value()` and `columns()`.
- `Iterator`, which now inherits from `IteratorBase`, includes `value()` and `columns()`.
- New public interface `AttributeGroupIterator` inherits from `IteratorBase` and additionally includes `attribute_groups()` (to be implemented).
- Renamed former `MultiCfIterator` to `CoalescingIterator` which inherits from `Iterator`
- Existing MultiCfIteratorTest has been split into two - `CoalescingIteratorTest` and `AttributeGroupIteratorTest`.
- Moved AttributeGroup related code from `wide_columns.h` to a new file, `attribute_groups.h`.

Some Implementation Details
- `MultiCfIteratorImpl` takes two functions - `populate_func` and `reset_func` and use them to populate `value_` and `columns_` in CoalescingIterator and `attribute_groups_` in AttributeGroupIterator. In CoalescingIterator, populate_func is `Coalesce()`, in AttributeGroupIterator populate_func is `AddToAttributeGroups()`. `reset_func` clears populated value_, columns_ and attribute_groups_ accordingly.
- `Coalesce()` merge sorts columns from multiple CFs when a key exists in more than on CFs. column that appears in later CF overwrites the prior ones.

For example, if CF1 has `"key_1" ==> {"col_1": "foo",  "col_2", "baz"}` and CF2 has `"key_1" ==> {"col_2": "quux", "col_3", "bla"}`, and when the iterator is at `key_1`, `columns()` will return `{"col_1": "foo", "col_2", "quux", "col_3", "bla"}`

In this example, `value()` will be empty, because none of them have values for `kDefaultColumnName`

Pull Request resolved: https://github.com/facebook/rocksdb/pull/12480

Test Plan:
## Unit Test
```
./multi_cf_iterator_test
```

## Performance Test

To make sure this change does not impact existing `Iterator` performance

**Build**
```
$> make -j64 release
```
**Setup**
```
$> TEST_TMPDIR=/dev/shm/db_bench ./db_bench -benchmarks="filluniquerandom" -key_size=32 -value_size=512 -num=1000000 -compression_type=none
```
**Run**
```
TEST_TMPDIR=/dev/shm/db_bench ./db_bench -use_existing_db=1 -benchmarks="newiterator,seekrandom" -cache_size=10485760000
```

**Before the change**
```
DB path: [/dev/shm/db_bench/dbbench]
newiterator  :       0.519 micros/op 1927904 ops/sec 0.519 seconds 1000000 operations;
DB path: [/dev/shm/db_bench/dbbench]
seekrandom   :       5.302 micros/op 188589 ops/sec 5.303 seconds 1000000 operations; (0 of 1000000 found)
```
**After the change**
```
DB path: [/dev/shm/db_bench/dbbench]
newiterator  :       0.497 micros/op 2011012 ops/sec 0.497 seconds 1000000 operations;
DB path: [/dev/shm/db_bench/dbbench]
seekrandom   :       5.252 micros/op 190405 ops/sec 5.252 seconds 1000000 operations; (0 of 1000000 found)
```

Reviewed By: ltamasi

Differential Revision: D55353909

Pulled By: jaykorean

fbshipit-source-id: 8d7786ffee09e022261ce34aa60e8633685e1946
This commit is contained in:
Jay Huh 2024-04-11 11:34:04 -07:00 committed by Facebook GitHub Bot
parent fab9dd9635
commit 58a98bded9
22 changed files with 923 additions and 506 deletions

View File

@ -653,6 +653,7 @@ set(SOURCES
cache/sharded_cache.cc
cache/tiered_secondary_cache.cc
db/arena_wrapped_db_iter.cc
db/attribute_group_iterator_impl.cc
db/blob/blob_contents.cc
db/blob/blob_fetcher.cc
db/blob/blob_file_addition.cc
@ -669,6 +670,7 @@ set(SOURCES
db/blob/prefetch_buffer_collection.cc
db/builder.cc
db/c.cc
db/coalescing_iterator.cc
db/column_family.cc
db/compaction/compaction.cc
db/compaction/compaction_iterator.cc
@ -715,7 +717,6 @@ set(SOURCES
db/memtable_list.cc
db/merge_helper.cc
db/merge_operator.cc
db/multi_cf_iterator.cc
db/output_validator.cc
db/periodic_task_scheduler.cc
db/range_del_aggregator.cc

View File

@ -21,6 +21,7 @@ cpp_library_wrapper(name="rocksdb_lib", srcs=[
"cache/sharded_cache.cc",
"cache/tiered_secondary_cache.cc",
"db/arena_wrapped_db_iter.cc",
"db/attribute_group_iterator_impl.cc",
"db/blob/blob_contents.cc",
"db/blob/blob_fetcher.cc",
"db/blob/blob_file_addition.cc",
@ -37,6 +38,7 @@ cpp_library_wrapper(name="rocksdb_lib", srcs=[
"db/blob/prefetch_buffer_collection.cc",
"db/builder.cc",
"db/c.cc",
"db/coalescing_iterator.cc",
"db/column_family.cc",
"db/compaction/compaction.cc",
"db/compaction/compaction_iterator.cc",
@ -83,7 +85,6 @@ cpp_library_wrapper(name="rocksdb_lib", srcs=[
"db/memtable_list.cc",
"db/merge_helper.cc",
"db/merge_operator.cc",
"db/multi_cf_iterator.cc",
"db/output_validator.cc",
"db/periodic_task_scheduler.cc",
"db/range_del_aggregator.cc",

View File

@ -0,0 +1,17 @@
// Copyright (c) Meta Platforms, Inc. and affiliates.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include "db/attribute_group_iterator_impl.h"
namespace ROCKSDB_NAMESPACE {
const AttributeGroups kNoAttributeGroups;
void AttributeGroupIteratorImpl::AddToAttributeGroups(
ColumnFamilyHandle* /*cfh*/, const WideColumns& /*columns*/) {
// TODO - Implement AttributeGroup population
}
} // namespace ROCKSDB_NAMESPACE

View File

@ -0,0 +1,84 @@
// Copyright (c) Meta Platforms, Inc. and affiliates.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#pragma once
#include "db/multi_cf_iterator_impl.h"
#include "rocksdb/attribute_groups.h"
namespace ROCKSDB_NAMESPACE {
class AttributeGroupIteratorImpl : public AttributeGroupIterator {
public:
AttributeGroupIteratorImpl(
const Comparator* comparator,
const std::vector<ColumnFamilyHandle*>& column_families,
const std::vector<Iterator*>& child_iterators)
: impl_(
comparator, column_families, child_iterators, [this]() { Reset(); },
[this](ColumnFamilyHandle* cfh, Iterator* iter) {
AddToAttributeGroups(cfh, iter->columns());
}) {}
~AttributeGroupIteratorImpl() override {}
// No copy allowed
AttributeGroupIteratorImpl(const AttributeGroupIteratorImpl&) = delete;
AttributeGroupIteratorImpl& operator=(const AttributeGroupIteratorImpl&) =
delete;
bool Valid() const override { return impl_.Valid(); }
void SeekToFirst() override { impl_.SeekToFirst(); }
void SeekToLast() override { impl_.SeekToLast(); }
void Seek(const Slice& target) override { impl_.Seek(target); }
void SeekForPrev(const Slice& target) override { impl_.SeekForPrev(target); }
void Next() override { impl_.Next(); }
void Prev() override { impl_.Prev(); }
Slice key() const override { return impl_.key(); }
Status status() const override { return impl_.status(); }
const AttributeGroups& attribute_groups() const override {
assert(Valid());
return attribute_groups_;
}
void Reset() { attribute_groups_.clear(); }
private:
MultiCfIteratorImpl impl_;
AttributeGroups attribute_groups_;
void AddToAttributeGroups(ColumnFamilyHandle* cfh,
const WideColumns& columns);
};
class EmptyAttributeGroupIterator : public AttributeGroupIterator {
public:
explicit EmptyAttributeGroupIterator(const Status& s) : status_(s) {}
bool Valid() const override { return false; }
void Seek(const Slice& /*target*/) override {}
void SeekForPrev(const Slice& /*target*/) override {}
void SeekToFirst() override {}
void SeekToLast() override {}
void Next() override { assert(false); }
void Prev() override { assert(false); }
Slice key() const override {
assert(false);
return Slice();
}
Status status() const override { return status_; }
const AttributeGroups& attribute_groups() const override {
return kNoAttributeGroups;
}
private:
Status status_;
};
inline std::unique_ptr<AttributeGroupIterator> NewAttributeGroupErrorIterator(
const Status& status) {
return std::make_unique<EmptyAttributeGroupIterator>(status);
}
} // namespace ROCKSDB_NAMESPACE

47
db/coalescing_iterator.cc Normal file
View File

@ -0,0 +1,47 @@
// Copyright (c) Meta Platforms, Inc. and affiliates.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include "db/coalescing_iterator.h"
#include "db/wide/wide_columns_helper.h"
namespace ROCKSDB_NAMESPACE {
void CoalescingIterator::Coalesce(const WideColumns& columns) {
WideColumns coalesced;
coalesced.reserve(wide_columns_.size() + columns.size());
auto base_col_iter = wide_columns_.begin();
auto new_col_iter = columns.begin();
while (base_col_iter != wide_columns_.end() &&
new_col_iter != columns.end()) {
auto comparison = base_col_iter->name().compare(new_col_iter->name());
if (comparison < 0) {
coalesced.push_back(*base_col_iter);
++base_col_iter;
} else if (comparison > 0) {
coalesced.push_back(*new_col_iter);
++new_col_iter;
} else {
coalesced.push_back(*new_col_iter);
++new_col_iter;
++base_col_iter;
}
}
while (base_col_iter != wide_columns_.end()) {
coalesced.push_back(*base_col_iter);
++base_col_iter;
}
while (new_col_iter != columns.end()) {
coalesced.push_back(*new_col_iter);
++new_col_iter;
}
wide_columns_.swap(coalesced);
if (WideColumnsHelper::HasDefaultColumn(wide_columns_)) {
value_ = WideColumnsHelper::GetDefaultColumn(wide_columns_);
}
}
} // namespace ROCKSDB_NAMESPACE

61
db/coalescing_iterator.h Normal file
View File

@ -0,0 +1,61 @@
// Copyright (c) Meta Platforms, Inc. and affiliates.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#pragma once
#include "db/multi_cf_iterator_impl.h"
namespace ROCKSDB_NAMESPACE {
// UNDER CONSTRUCTION - DO NOT USE
class CoalescingIterator : public Iterator {
public:
CoalescingIterator(const Comparator* comparator,
const std::vector<ColumnFamilyHandle*>& column_families,
const std::vector<Iterator*>& child_iterators)
: impl_(
comparator, column_families, child_iterators, [this]() { Reset(); },
[this](ColumnFamilyHandle*, Iterator* iter) {
Coalesce(iter->columns());
}) {}
~CoalescingIterator() override {}
// No copy allowed
CoalescingIterator(const CoalescingIterator&) = delete;
CoalescingIterator& operator=(const CoalescingIterator&) = delete;
bool Valid() const override { return impl_.Valid(); }
void SeekToFirst() override { impl_.SeekToFirst(); }
void SeekToLast() override { impl_.SeekToLast(); }
void Seek(const Slice& target) override { impl_.Seek(target); }
void SeekForPrev(const Slice& target) override { impl_.SeekForPrev(target); }
void Next() override { impl_.Next(); }
void Prev() override { impl_.Prev(); }
Slice key() const override { return impl_.key(); }
Status status() const override { return impl_.status(); }
Slice value() const override {
assert(Valid());
return value_;
}
const WideColumns& columns() const override {
assert(Valid());
return wide_columns_;
}
void Reset() {
value_.clear();
wide_columns_.clear();
}
private:
MultiCfIteratorImpl impl_;
Slice value_;
WideColumns wide_columns_;
void Coalesce(const WideColumns& columns);
};
} // namespace ROCKSDB_NAMESPACE

View File

@ -26,7 +26,9 @@
#include <vector>
#include "db/arena_wrapped_db_iter.h"
#include "db/attribute_group_iterator_impl.h"
#include "db/builder.h"
#include "db/coalescing_iterator.h"
#include "db/compaction/compaction_job.h"
#include "db/convenience_impl.h"
#include "db/db_info_dumper.h"
@ -45,7 +47,6 @@
#include "db/memtable.h"
#include "db/memtable_list.h"
#include "db/merge_context.h"
#include "db/multi_cf_iterator.h"
#include "db/periodic_task_scheduler.h"
#include "db/range_tombstone_fragmenter.h"
#include "db/table_cache.h"
@ -3735,29 +3736,49 @@ ArenaWrappedDBIter* DBImpl::NewIteratorImpl(
return db_iter;
}
std::unique_ptr<Iterator> DBImpl::NewMultiCfIterator(
std::unique_ptr<Iterator> DBImpl::NewCoalescingIterator(
const ReadOptions& _read_options,
const std::vector<ColumnFamilyHandle*>& column_families) {
return NewMultiCfIterator<Iterator, CoalescingIterator>(
_read_options, column_families, [](const Status& s) {
return std::unique_ptr<Iterator>(NewErrorIterator(s));
});
}
std::unique_ptr<AttributeGroupIterator> DBImpl::NewAttributeGroupIterator(
const ReadOptions& _read_options,
const std::vector<ColumnFamilyHandle*>& column_families) {
return NewMultiCfIterator<AttributeGroupIterator, AttributeGroupIteratorImpl>(
_read_options, column_families,
[](const Status& s) { return NewAttributeGroupErrorIterator(s); });
}
template <typename IterType, typename ImplType, typename ErrorIteratorFuncType>
std::unique_ptr<IterType> DBImpl::NewMultiCfIterator(
const ReadOptions& _read_options,
const std::vector<ColumnFamilyHandle*>& column_families,
ErrorIteratorFuncType error_iterator_func) {
if (column_families.size() == 0) {
return std::unique_ptr<Iterator>(NewErrorIterator(
Status::InvalidArgument("No Column Family was provided")));
return error_iterator_func(
Status::InvalidArgument("No Column Family was provided"));
}
const Comparator* first_comparator = column_families[0]->GetComparator();
for (size_t i = 1; i < column_families.size(); ++i) {
const Comparator* cf_comparator = column_families[i]->GetComparator();
if (first_comparator != cf_comparator &&
first_comparator->GetId().compare(cf_comparator->GetId()) != 0) {
return std::unique_ptr<Iterator>(NewErrorIterator(Status::InvalidArgument(
"Different comparators are being used across CFs")));
return error_iterator_func(Status::InvalidArgument(
"Different comparators are being used across CFs"));
}
}
std::vector<Iterator*> child_iterators;
Status s = NewIterators(_read_options, column_families, &child_iterators);
if (s.ok()) {
return std::make_unique<MultiCfIterator>(first_comparator, column_families,
std::move(child_iterators));
if (!s.ok()) {
return error_iterator_func(s);
}
return std::unique_ptr<Iterator>(NewErrorIterator(s));
return std::make_unique<ImplType>(column_families[0]->GetComparator(),
column_families,
std::move(child_iterators));
}
Status DBImpl::NewIterators(
@ -3834,8 +3855,8 @@ Status DBImpl::NewIterators(
}
} else {
// Note: no need to consider the special case of
// last_seq_same_as_publish_seq_==false since NewIterators is overridden in
// WritePreparedTxnDB
// last_seq_same_as_publish_seq_==false since NewIterators is overridden
// in WritePreparedTxnDB
auto snapshot = read_options.snapshot != nullptr
? read_options.snapshot->GetSequenceNumber()
: versions_->LastSequence();
@ -3959,8 +3980,8 @@ DBImpl::CreateTimestampedSnapshotImpl(SequenceNumber snapshot_seq, uint64_t ts,
std::shared_ptr<const SnapshotImpl> latest =
timestamped_snapshots_.GetSnapshot(std::numeric_limits<uint64_t>::max());
// If there is already a latest timestamped snapshot, then we need to do some
// checks.
// If there is already a latest timestamped snapshot, then we need to do
// some checks.
if (latest) {
uint64_t latest_snap_ts = latest->GetTimestamp();
SequenceNumber latest_snap_seq = latest->GetSequenceNumber();
@ -3969,8 +3990,8 @@ DBImpl::CreateTimestampedSnapshotImpl(SequenceNumber snapshot_seq, uint64_t ts,
Status status;
std::shared_ptr<const SnapshotImpl> ret;
if (latest_snap_ts > ts) {
// A snapshot created later cannot have smaller timestamp than a previous
// timestamped snapshot.
// A snapshot created later cannot have smaller timestamp than a
// previous timestamped snapshot.
needs_create_snap = false;
std::ostringstream oss;
oss << "snapshot exists with larger timestamp " << latest_snap_ts << " > "
@ -4084,7 +4105,8 @@ void DBImpl::ReleaseSnapshot(const Snapshot* s) {
// Calculate a new threshold, skipping those CFs where compactions are
// scheduled. We do not do the same pass as the previous loop because
// mutex might be unlocked during the loop, making the result inaccurate.
// mutex might be unlocked during the loop, making the result
// inaccurate.
SequenceNumber new_bottommost_files_mark_threshold = kMaxSequenceNumber;
for (auto* cfd : *versions_->GetColumnFamilySet()) {
if (CfdListContains(cf_scheduled, cfd) ||
@ -4512,7 +4534,8 @@ Status DBImpl::GetApproximateSizes(const SizeApproximationOptions& options,
sizes[i] = 0;
if (options.include_files) {
sizes[i] += versions_->ApproximateSize(
options, read_options, v, k1.Encode(), k2.Encode(), /*start_level=*/0,
options, read_options, v, k1.Encode(), k2.Encode(),
/*start_level=*/0,
/*end_level=*/-1, TableReaderCaller::kUserApproximateSize);
}
if (options.include_memtables) {
@ -4797,9 +4820,9 @@ void DBImpl::GetColumnFamilyMetaData(ColumnFamilyHandle* column_family,
static_cast_with_check<ColumnFamilyHandleImpl>(column_family)->cfd();
auto* sv = GetAndRefSuperVersion(cfd);
{
// Without mutex, Version::GetColumnFamilyMetaData will have data race with
// Compaction::MarkFilesBeingCompacted. One solution is to use mutex, but
// this may cause regression. An alternative is to make
// Without mutex, Version::GetColumnFamilyMetaData will have data race
// with Compaction::MarkFilesBeingCompacted. One solution is to use mutex,
// but this may cause regression. An alternative is to make
// FileMetaData::being_compacted atomic, but it will make FileMetaData
// non-copy-able. Another option is to separate these variables from
// original FileMetaData struct, and this requires re-organization of data

View File

@ -50,6 +50,7 @@
#include "monitoring/instrumented_mutex.h"
#include "options/db_options.h"
#include "port/port.h"
#include "rocksdb/attribute_groups.h"
#include "rocksdb/db.h"
#include "rocksdb/env.h"
#include "rocksdb/memtablerep.h"
@ -352,8 +353,12 @@ class DBImpl : public DB {
void ReleaseSnapshot(const Snapshot* snapshot) override;
// UNDER CONSTRUCTION - DO NOT USE
// Return a cross-column-family iterator from a consistent database state.
std::unique_ptr<Iterator> NewMultiCfIterator(
std::unique_ptr<Iterator> NewCoalescingIterator(
const ReadOptions& options,
const std::vector<ColumnFamilyHandle*>& column_families) override;
// UNDER CONSTRUCTION - DO NOT USE
std::unique_ptr<AttributeGroupIterator> NewAttributeGroupIterator(
const ReadOptions& options,
const std::vector<ColumnFamilyHandle*>& column_families) override;
@ -2411,6 +2416,13 @@ class DBImpl : public DB {
bool ShouldReferenceSuperVersion(const MergeContext& merge_context);
template <typename IterType, typename ImplType,
typename ErrorIteratorFuncType>
std::unique_ptr<IterType> NewMultiCfIterator(
const ReadOptions& _read_options,
const std::vector<ColumnFamilyHandle*>& column_families,
ErrorIteratorFuncType error_iterator_func);
// Lock over the persistent DB state. Non-nullptr iff successfully acquired.
FileLock* db_lock_;

View File

@ -3200,7 +3200,14 @@ class ModelDB : public DB {
}
// UNDER CONSTRUCTION - DO NOT USE
std::unique_ptr<Iterator> NewMultiCfIterator(
std::unique_ptr<Iterator> NewCoalescingIterator(
const ReadOptions& /*options*/,
const std::vector<ColumnFamilyHandle*>& /*column_families*/) override {
return nullptr;
}
// UNDER CONSTRUCTION - DO NOT USE
std::unique_ptr<AttributeGroupIterator> NewAttributeGroupIterator(
const ReadOptions& /*options*/,
const std::vector<ColumnFamilyHandle*>& /*column_families*/) override {
return nullptr;

View File

@ -1,102 +0,0 @@
// Copyright (c) Meta Platforms, Inc. and affiliates.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include "db/multi_cf_iterator.h"
#include <cassert>
namespace ROCKSDB_NAMESPACE {
template <typename BinaryHeap, typename ChildSeekFuncType>
void MultiCfIterator::SeekCommon(BinaryHeap& heap,
ChildSeekFuncType child_seek_func) {
heap.clear();
int i = 0;
for (auto& cfh_iter_pair : cfh_iter_pairs_) {
auto& cfh = cfh_iter_pair.first;
auto& iter = cfh_iter_pair.second;
child_seek_func(iter.get());
if (iter->Valid()) {
assert(iter->status().ok());
heap.push(MultiCfIteratorInfo{iter.get(), cfh, i});
} else {
considerStatus(iter->status());
}
++i;
}
}
template <typename BinaryHeap, typename AdvanceFuncType>
void MultiCfIterator::AdvanceIterator(BinaryHeap& heap,
AdvanceFuncType advance_func) {
// 1. Keep the top iterator (by popping it from the heap)
// 2. Make sure all others have iterated past the top iterator key slice
// 3. Advance the top iterator, and add it back to the heap if valid
auto top = heap.top();
heap.pop();
if (!heap.empty()) {
auto* current = heap.top().iterator;
while (current->Valid() &&
comparator_->Compare(top.iterator->key(), current->key()) == 0) {
assert(current->status().ok());
advance_func(current);
if (current->Valid()) {
heap.replace_top(heap.top());
} else {
considerStatus(current->status());
heap.pop();
}
if (!heap.empty()) {
current = heap.top().iterator;
}
}
}
advance_func(top.iterator);
if (top.iterator->Valid()) {
assert(top.iterator->status().ok());
heap.push(top);
} else {
considerStatus(top.iterator->status());
}
}
void MultiCfIterator::SeekToFirst() {
auto& min_heap = GetHeap<MultiCfMinHeap>([this]() { InitMinHeap(); });
SeekCommon(min_heap, [](Iterator* iter) { iter->SeekToFirst(); });
}
void MultiCfIterator::Seek(const Slice& target) {
auto& min_heap = GetHeap<MultiCfMinHeap>([this]() { InitMinHeap(); });
SeekCommon(min_heap, [&target](Iterator* iter) { iter->Seek(target); });
}
void MultiCfIterator::SeekToLast() {
auto& max_heap = GetHeap<MultiCfMaxHeap>([this]() { InitMaxHeap(); });
SeekCommon(max_heap, [](Iterator* iter) { iter->SeekToLast(); });
}
void MultiCfIterator::SeekForPrev(const Slice& target) {
auto& max_heap = GetHeap<MultiCfMaxHeap>([this]() { InitMaxHeap(); });
SeekCommon(max_heap,
[&target](Iterator* iter) { iter->SeekForPrev(target); });
}
void MultiCfIterator::Next() {
assert(Valid());
auto& min_heap = GetHeap<MultiCfMinHeap>([this]() {
Slice target = key();
InitMinHeap();
Seek(target);
});
AdvanceIterator(min_heap, [](Iterator* iter) { iter->Next(); });
}
void MultiCfIterator::Prev() {
assert(Valid());
auto& max_heap = GetHeap<MultiCfMaxHeap>([this]() {
Slice target = key();
InitMaxHeap();
SeekForPrev(target);
});
AdvanceIterator(max_heap, [](Iterator* iter) { iter->Prev(); });
}
} // namespace ROCKSDB_NAMESPACE

View File

@ -1,159 +0,0 @@
// Copyright (c) Meta Platforms, Inc. and affiliates.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#pragma once
#include <variant>
#include "rocksdb/comparator.h"
#include "rocksdb/iterator.h"
#include "rocksdb/options.h"
#include "util/heap.h"
#include "util/overload.h"
namespace ROCKSDB_NAMESPACE {
// UNDER CONSTRUCTION - DO NOT USE
// A cross-column-family iterator from a consistent database state.
// When the same key exists in more than one column families, the iterator
// selects the value from the first column family containing the key, in the
// order provided in the `column_families` parameter.
class MultiCfIterator : public Iterator {
public:
MultiCfIterator(const Comparator* comparator,
const std::vector<ColumnFamilyHandle*>& column_families,
const std::vector<Iterator*>& child_iterators)
: comparator_(comparator),
heap_(MultiCfMinHeap(
MultiCfHeapItemComparator<std::greater<int>>(comparator_))) {
assert(column_families.size() > 0 &&
column_families.size() == child_iterators.size());
cfh_iter_pairs_.reserve(column_families.size());
for (size_t i = 0; i < column_families.size(); ++i) {
cfh_iter_pairs_.emplace_back(
column_families[i], std::unique_ptr<Iterator>(child_iterators[i]));
}
}
~MultiCfIterator() override { status_.PermitUncheckedError(); }
// No copy allowed
MultiCfIterator(const MultiCfIterator&) = delete;
MultiCfIterator& operator=(const MultiCfIterator&) = delete;
private:
std::vector<std::pair<ColumnFamilyHandle*, std::unique_ptr<Iterator>>>
cfh_iter_pairs_;
ReadOptions read_options_;
Status status_;
AttributeGroups attribute_groups_;
struct MultiCfIteratorInfo {
Iterator* iterator;
ColumnFamilyHandle* cfh;
int order;
};
template <typename CompareOp>
class MultiCfHeapItemComparator {
public:
explicit MultiCfHeapItemComparator(const Comparator* comparator)
: comparator_(comparator) {}
bool operator()(const MultiCfIteratorInfo& a,
const MultiCfIteratorInfo& b) const {
assert(a.iterator);
assert(b.iterator);
assert(a.iterator->Valid());
assert(b.iterator->Valid());
int c = comparator_->Compare(a.iterator->key(), b.iterator->key());
assert(c != 0 || a.order != b.order);
return c == 0 ? a.order - b.order > 0 : CompareOp()(c, 0);
}
private:
const Comparator* comparator_;
};
const Comparator* comparator_;
using MultiCfMinHeap =
BinaryHeap<MultiCfIteratorInfo,
MultiCfHeapItemComparator<std::greater<int>>>;
using MultiCfMaxHeap = BinaryHeap<MultiCfIteratorInfo,
MultiCfHeapItemComparator<std::less<int>>>;
using MultiCfIterHeap = std::variant<MultiCfMinHeap, MultiCfMaxHeap>;
MultiCfIterHeap heap_;
// TODO: Lower and Upper bounds
Iterator* current() const {
if (std::holds_alternative<MultiCfMaxHeap>(heap_)) {
auto& max_heap = std::get<MultiCfMaxHeap>(heap_);
return max_heap.top().iterator;
}
auto& min_heap = std::get<MultiCfMinHeap>(heap_);
return min_heap.top().iterator;
}
Slice key() const override {
assert(Valid());
return current()->key();
}
Slice value() const override {
assert(Valid());
return current()->value();
}
const WideColumns& columns() const override {
assert(Valid());
return current()->columns();
}
bool Valid() const override {
if (std::holds_alternative<MultiCfMaxHeap>(heap_)) {
auto& max_heap = std::get<MultiCfMaxHeap>(heap_);
return !max_heap.empty() && status_.ok();
}
auto& min_heap = std::get<MultiCfMinHeap>(heap_);
return !min_heap.empty() && status_.ok();
}
Status status() const override { return status_; }
void considerStatus(Status s) {
if (!s.ok() && status_.ok()) {
status_ = std::move(s);
}
}
template <typename HeapType, typename InitFunc>
HeapType& GetHeap(InitFunc initFunc) {
if (!std::holds_alternative<HeapType>(heap_)) {
initFunc();
}
return std::get<HeapType>(heap_);
}
void InitMinHeap() {
heap_.emplace<MultiCfMinHeap>(
MultiCfHeapItemComparator<std::greater<int>>(comparator_));
}
void InitMaxHeap() {
heap_.emplace<MultiCfMaxHeap>(
MultiCfHeapItemComparator<std::less<int>>(comparator_));
}
template <typename BinaryHeap, typename ChildSeekFuncType>
void SeekCommon(BinaryHeap& heap, ChildSeekFuncType child_seek_func);
template <typename BinaryHeap, typename AdvanceFuncType>
void AdvanceIterator(BinaryHeap& heap, AdvanceFuncType advance_func);
void SeekToFirst() override;
void SeekToLast() override;
void Seek(const Slice& /*target*/) override;
void SeekForPrev(const Slice& /*target*/) override;
void Next() override;
void Prev() override;
};
} // namespace ROCKSDB_NAMESPACE

278
db/multi_cf_iterator_impl.h Normal file
View File

@ -0,0 +1,278 @@
// Copyright (c) Meta Platforms, Inc. and affiliates.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#pragma once
#include <functional>
#include <variant>
#include "rocksdb/comparator.h"
#include "rocksdb/iterator.h"
#include "rocksdb/options.h"
#include "util/heap.h"
namespace ROCKSDB_NAMESPACE {
class MultiCfIteratorImpl {
public:
MultiCfIteratorImpl(
const Comparator* comparator,
const std::vector<ColumnFamilyHandle*>& column_families,
const std::vector<Iterator*>& child_iterators,
std::function<void()> reset_func,
std::function<void(ColumnFamilyHandle*, Iterator*)> populate_func)
: comparator_(comparator),
heap_(MultiCfMinHeap(
MultiCfHeapItemComparator<std::greater<int>>(comparator_))),
reset_func_(std::move(reset_func)),
populate_func_(std::move(populate_func)) {
assert(column_families.size() > 0 &&
column_families.size() == child_iterators.size());
cfh_iter_pairs_.reserve(column_families.size());
for (size_t i = 0; i < column_families.size(); ++i) {
cfh_iter_pairs_.emplace_back(
column_families[i], std::unique_ptr<Iterator>(child_iterators[i]));
}
}
~MultiCfIteratorImpl() { status_.PermitUncheckedError(); }
// No copy allowed
MultiCfIteratorImpl(const MultiCfIteratorImpl&) = delete;
MultiCfIteratorImpl& operator=(const MultiCfIteratorImpl&) = delete;
Slice key() const {
assert(Valid());
return current()->key();
}
bool Valid() const {
if (std::holds_alternative<MultiCfMaxHeap>(heap_)) {
auto& max_heap = std::get<MultiCfMaxHeap>(heap_);
return !max_heap.empty() && status_.ok();
}
auto& min_heap = std::get<MultiCfMinHeap>(heap_);
return !min_heap.empty() && status_.ok();
}
Status status() const { return status_; }
void SeekToFirst() {
auto& min_heap = GetHeap<MultiCfMinHeap>([this]() { InitMinHeap(); });
SeekCommon(
min_heap, [](Iterator* iter) { iter->SeekToFirst(); },
[](Iterator* iter) { iter->Next(); });
}
void Seek(const Slice& target) {
auto& min_heap = GetHeap<MultiCfMinHeap>([this]() { InitMinHeap(); });
SeekCommon(
min_heap, [&target](Iterator* iter) { iter->Seek(target); },
[](Iterator* iter) { iter->Next(); });
}
void SeekToLast() {
auto& max_heap = GetHeap<MultiCfMaxHeap>([this]() { InitMaxHeap(); });
SeekCommon(
max_heap, [](Iterator* iter) { iter->SeekToLast(); },
[](Iterator* iter) { iter->Prev(); });
}
void SeekForPrev(const Slice& target) {
auto& max_heap = GetHeap<MultiCfMaxHeap>([this]() { InitMaxHeap(); });
SeekCommon(
max_heap, [&target](Iterator* iter) { iter->SeekForPrev(target); },
[](Iterator* iter) { iter->Prev(); });
}
void Next() {
assert(Valid());
auto& min_heap = GetHeap<MultiCfMinHeap>([this]() {
Slice target = key();
InitMinHeap();
Seek(target);
});
AdvanceIterator(min_heap, [](Iterator* iter) { iter->Next(); });
}
void Prev() {
assert(Valid());
auto& max_heap = GetHeap<MultiCfMaxHeap>([this]() {
Slice target = key();
InitMaxHeap();
SeekForPrev(target);
});
AdvanceIterator(max_heap, [](Iterator* iter) { iter->Prev(); });
}
private:
std::vector<std::pair<ColumnFamilyHandle*, std::unique_ptr<Iterator>>>
cfh_iter_pairs_;
Status status_;
struct MultiCfIteratorInfo {
Iterator* iterator;
int order;
};
template <typename CompareOp>
class MultiCfHeapItemComparator {
public:
explicit MultiCfHeapItemComparator(const Comparator* comparator)
: comparator_(comparator) {}
bool operator()(const MultiCfIteratorInfo& a,
const MultiCfIteratorInfo& b) const {
assert(a.iterator);
assert(b.iterator);
assert(a.iterator->Valid());
assert(b.iterator->Valid());
int c = comparator_->Compare(a.iterator->key(), b.iterator->key());
assert(c != 0 || a.order != b.order);
return c == 0 ? a.order - b.order > 0 : CompareOp()(c, 0);
}
private:
const Comparator* comparator_;
};
const Comparator* comparator_;
using MultiCfMinHeap =
BinaryHeap<MultiCfIteratorInfo,
MultiCfHeapItemComparator<std::greater<int>>>;
using MultiCfMaxHeap = BinaryHeap<MultiCfIteratorInfo,
MultiCfHeapItemComparator<std::less<int>>>;
using MultiCfIterHeap = std::variant<MultiCfMinHeap, MultiCfMaxHeap>;
MultiCfIterHeap heap_;
std::function<void()> reset_func_;
std::function<void(ColumnFamilyHandle*, Iterator*)> populate_func_;
// TODO: Lower and Upper bounds
Iterator* current() const {
if (std::holds_alternative<MultiCfMaxHeap>(heap_)) {
auto& max_heap = std::get<MultiCfMaxHeap>(heap_);
return max_heap.top().iterator;
}
auto& min_heap = std::get<MultiCfMinHeap>(heap_);
return min_heap.top().iterator;
}
void considerStatus(Status s) {
if (!s.ok() && status_.ok()) {
status_ = std::move(s);
}
}
template <typename HeapType, typename InitFunc>
HeapType& GetHeap(InitFunc initFunc) {
if (!std::holds_alternative<HeapType>(heap_)) {
initFunc();
}
return std::get<HeapType>(heap_);
}
void InitMinHeap() {
heap_.emplace<MultiCfMinHeap>(
MultiCfHeapItemComparator<std::greater<int>>(comparator_));
}
void InitMaxHeap() {
heap_.emplace<MultiCfMaxHeap>(
MultiCfHeapItemComparator<std::less<int>>(comparator_));
}
template <typename BinaryHeap, typename ChildSeekFuncType,
typename AdvanceFuncType>
void SeekCommon(BinaryHeap& heap, ChildSeekFuncType child_seek_func,
AdvanceFuncType advance_func) {
reset_func_();
heap.clear();
int i = 0;
for (auto& cfh_iter_pair : cfh_iter_pairs_) {
auto& iter = cfh_iter_pair.second;
child_seek_func(iter.get());
if (iter->Valid()) {
assert(iter->status().ok());
heap.push(MultiCfIteratorInfo{iter.get(), i});
} else {
considerStatus(iter->status());
if (!status_.ok()) {
// Non-OK status from the iterator. Bail out early
heap.clear();
break;
}
}
++i;
}
if (!heap.empty()) {
PopulateIterator(heap, advance_func);
}
}
template <typename BinaryHeap, typename AdvanceFuncType>
void AdvanceIterator(BinaryHeap& heap, AdvanceFuncType advance_func) {
assert(!heap.empty());
reset_func_();
// Because PopulateIterator() advances the same key in all non-top
// iterators, we do not have to do that here again. Just advance the top
// iterator and re-heapify
auto top = heap.top();
advance_func(top.iterator);
if (top.iterator->Valid()) {
assert(top.iterator->status().ok());
heap.replace_top(top);
} else {
considerStatus(top.iterator->status());
if (!status_.ok()) {
heap.clear();
return;
} else {
heap.pop();
}
}
if (!heap.empty()) {
PopulateIterator(heap, advance_func);
}
}
template <typename BinaryHeap, typename AdvanceFuncType>
void PopulateIterator(BinaryHeap& heap, AdvanceFuncType advance_func) {
// 1. Keep the top iterator (by popping it from the heap) and populate
// value, columns and attribute_groups
// 2. Make sure all others have iterated past the top iterator key slice.
// While iterating, coalesce/populate value, columns and attribute_groups
// 3. Add the top iterator back without advancing it
assert(!heap.empty());
auto top = heap.top();
auto& [top_cfh, top_iter] = cfh_iter_pairs_[top.order];
populate_func_(top_cfh, top_iter.get());
heap.pop();
if (!heap.empty()) {
auto* current = heap.top().iterator;
while (current->Valid() &&
comparator_->Compare(top.iterator->key(), current->key()) == 0) {
assert(current->status().ok());
auto& [curr_cfh, curr_iter] = cfh_iter_pairs_[heap.top().order];
populate_func_(curr_cfh, curr_iter.get());
advance_func(current);
if (current->Valid()) {
heap.replace_top(heap.top());
} else {
considerStatus(current->status());
if (!status_.ok()) {
// Non-OK status from the iterator. Bail out early
heap.clear();
break;
} else {
heap.pop();
}
}
if (!heap.empty()) {
current = heap.top().iterator;
}
}
}
heap.push(top);
}
};
} // namespace ROCKSDB_NAMESPACE

View File

@ -6,40 +6,31 @@
#include <memory>
#include "db/db_test_util.h"
#include "rocksdb/attribute_groups.h"
namespace ROCKSDB_NAMESPACE {
class MultiCfIteratorTest : public DBTestBase {
class CoalescingIteratorTest : public DBTestBase {
public:
MultiCfIteratorTest()
: DBTestBase("multi_cf_iterator_test", /*env_do_fsync=*/true) {}
CoalescingIteratorTest()
: DBTestBase("coalescing_iterator_test", /*env_do_fsync=*/true) {}
// Verify Iteration of MultiCfIterator
// Verify Iteration of CoalescingIterator
// by SeekToFirst() + Next() and SeekToLast() + Prev()
void verifyMultiCfIterator(
const std::vector<ColumnFamilyHandle*>& cfhs,
void verifyCoalescingIterator(const std::vector<ColumnFamilyHandle*>& cfhs,
const std::vector<Slice>& expected_keys,
const std::optional<std::vector<Slice>>& expected_values = std::nullopt,
const std::optional<std::vector<WideColumns>>& expected_wide_columns =
std::nullopt,
const std::optional<std::vector<AttributeGroups>>&
expected_attribute_groups = std::nullopt) {
const std::vector<Slice>& expected_values,
const std::optional<std::vector<WideColumns>>&
expected_wide_columns = std::nullopt) {
int i = 0;
std::unique_ptr<Iterator> iter =
db_->NewMultiCfIterator(ReadOptions(), cfhs);
db_->NewCoalescingIterator(ReadOptions(), cfhs);
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
ASSERT_EQ(expected_keys[i], iter->key());
if (expected_values.has_value()) {
ASSERT_EQ(expected_values.value()[i], iter->value());
}
ASSERT_EQ(expected_values[i], iter->value());
if (expected_wide_columns.has_value()) {
ASSERT_EQ(expected_wide_columns.value()[i], iter->columns());
}
if (expected_attribute_groups.has_value()) {
// TODO - Add this back when attribute_groups() API is added
// ASSERT_EQ(expected_attribute_groups.value()[i],
// iter->attribute_groups());
}
++i;
}
ASSERT_EQ(expected_keys.size(), i);
@ -48,17 +39,10 @@ class MultiCfIteratorTest : public DBTestBase {
int rev_i = i - 1;
for (iter->SeekToLast(); iter->Valid(); iter->Prev()) {
ASSERT_EQ(expected_keys[rev_i], iter->key());
if (expected_values.has_value()) {
ASSERT_EQ(expected_values.value()[rev_i], iter->value());
}
ASSERT_EQ(expected_values[rev_i], iter->value());
if (expected_wide_columns.has_value()) {
ASSERT_EQ(expected_wide_columns.value()[rev_i], iter->columns());
}
if (expected_attribute_groups.has_value()) {
// TODO - Add this back when attribute_groups() API is added
// ASSERT_EQ(expected_attribute_groups.value()[rev_i],
// iter->attribute_groups());
}
rev_i--;
}
ASSERT_OK(iter->status());
@ -78,20 +62,20 @@ class MultiCfIteratorTest : public DBTestBase {
}
};
TEST_F(MultiCfIteratorTest, InvalidArguments) {
TEST_F(CoalescingIteratorTest, InvalidArguments) {
Options options = GetDefaultOptions();
{
CreateAndReopenWithCF({"cf_1", "cf_2", "cf_3"}, options);
// Invalid - No CF is provided
std::unique_ptr<Iterator> iter_with_no_cf =
db_->NewMultiCfIterator(ReadOptions(), {});
db_->NewCoalescingIterator(ReadOptions(), {});
ASSERT_NOK(iter_with_no_cf->status());
ASSERT_TRUE(iter_with_no_cf->status().IsInvalidArgument());
}
}
TEST_F(MultiCfIteratorTest, SimpleValues) {
TEST_F(CoalescingIteratorTest, SimpleValues) {
Options options = GetDefaultOptions();
{
// Case 1: Unique key per CF
@ -109,19 +93,21 @@ TEST_F(MultiCfIteratorTest, SimpleValues) {
// Test for iteration over CF default->1->2->3
std::vector<ColumnFamilyHandle*> cfhs_order_0_1_2_3 = {
handles_[0], handles_[1], handles_[2], handles_[3]};
verifyMultiCfIterator(cfhs_order_0_1_2_3, expected_keys, expected_values);
verifyCoalescingIterator(cfhs_order_0_1_2_3, expected_keys,
expected_values);
// Test for iteration over CF 3->1->default_cf->2
std::vector<ColumnFamilyHandle*> cfhs_order_3_1_0_2 = {
handles_[3], handles_[1], handles_[0], handles_[2]};
// Iteration order and the return values should be the same since keys are
// unique per CF
verifyMultiCfIterator(cfhs_order_3_1_0_2, expected_keys, expected_values);
verifyCoalescingIterator(cfhs_order_3_1_0_2, expected_keys,
expected_values);
// Verify Seek()
{
std::unique_ptr<Iterator> iter =
db_->NewMultiCfIterator(ReadOptions(), cfhs_order_0_1_2_3);
db_->NewCoalescingIterator(ReadOptions(), cfhs_order_0_1_2_3);
iter->Seek("");
ASSERT_EQ(IterStatus(iter.get()), "key_1->key_1_cf_0_val");
iter->Seek("key_1");
@ -136,7 +122,7 @@ TEST_F(MultiCfIteratorTest, SimpleValues) {
// Verify SeekForPrev()
{
std::unique_ptr<Iterator> iter =
db_->NewMultiCfIterator(ReadOptions(), cfhs_order_0_1_2_3);
db_->NewCoalescingIterator(ReadOptions(), cfhs_order_0_1_2_3);
iter->SeekForPrev("");
ASSERT_EQ(IterStatus(iter.get()), "(invalid)");
iter->SeekForPrev("key_1");
@ -172,56 +158,58 @@ TEST_F(MultiCfIteratorTest, SimpleValues) {
// Test for iteration over CFs default->1->2->3
std::vector<ColumnFamilyHandle*> cfhs_order_0_1_2_3 = {
handles_[0], handles_[1], handles_[2], handles_[3]};
std::vector<Slice> expected_values = {"key_1_cf_0_val", "key_2_cf_1_val",
"key_3_cf_0_val"};
verifyMultiCfIterator(cfhs_order_0_1_2_3, expected_keys, expected_values);
std::vector<Slice> expected_values = {"key_1_cf_3_val", "key_2_cf_2_val",
"key_3_cf_3_val"};
verifyCoalescingIterator(cfhs_order_0_1_2_3, expected_keys,
expected_values);
// Test for iteration over CFs 3->2->default_cf->1
std::vector<ColumnFamilyHandle*> cfhs_order_3_2_0_1 = {
handles_[3], handles_[2], handles_[0], handles_[1]};
expected_values = {"key_1_cf_3_val", "key_2_cf_2_val", "key_3_cf_3_val"};
verifyMultiCfIterator(cfhs_order_3_2_0_1, expected_keys, expected_values);
expected_values = {"key_1_cf_0_val", "key_2_cf_1_val", "key_3_cf_1_val"};
verifyCoalescingIterator(cfhs_order_3_2_0_1, expected_keys,
expected_values);
// Verify Seek()
{
std::unique_ptr<Iterator> iter =
db_->NewMultiCfIterator(ReadOptions(), cfhs_order_3_2_0_1);
db_->NewCoalescingIterator(ReadOptions(), cfhs_order_3_2_0_1);
iter->Seek("");
ASSERT_EQ(IterStatus(iter.get()), "key_1->key_1_cf_3_val");
ASSERT_EQ(IterStatus(iter.get()), "key_1->key_1_cf_0_val");
iter->Seek("key_1");
ASSERT_EQ(IterStatus(iter.get()), "key_1->key_1_cf_3_val");
ASSERT_EQ(IterStatus(iter.get()), "key_1->key_1_cf_0_val");
iter->Seek("key_2");
ASSERT_EQ(IterStatus(iter.get()), "key_2->key_2_cf_2_val");
ASSERT_EQ(IterStatus(iter.get()), "key_2->key_2_cf_1_val");
iter->Next();
ASSERT_EQ(IterStatus(iter.get()), "key_3->key_3_cf_3_val");
ASSERT_EQ(IterStatus(iter.get()), "key_3->key_3_cf_1_val");
iter->Seek("key_x");
ASSERT_EQ(IterStatus(iter.get()), "(invalid)");
}
// Verify SeekForPrev()
{
std::unique_ptr<Iterator> iter =
db_->NewMultiCfIterator(ReadOptions(), cfhs_order_3_2_0_1);
db_->NewCoalescingIterator(ReadOptions(), cfhs_order_3_2_0_1);
iter->SeekForPrev("");
ASSERT_EQ(IterStatus(iter.get()), "(invalid)");
iter->SeekForPrev("key_1");
ASSERT_EQ(IterStatus(iter.get()), "key_1->key_1_cf_3_val");
ASSERT_EQ(IterStatus(iter.get()), "key_1->key_1_cf_0_val");
iter->Next();
ASSERT_EQ(IterStatus(iter.get()), "key_2->key_2_cf_2_val");
ASSERT_EQ(IterStatus(iter.get()), "key_2->key_2_cf_1_val");
iter->SeekForPrev("key_x");
ASSERT_EQ(IterStatus(iter.get()), "key_3->key_3_cf_3_val");
ASSERT_EQ(IterStatus(iter.get()), "key_3->key_3_cf_1_val");
iter->Next();
ASSERT_EQ(IterStatus(iter.get()), "(invalid)");
}
}
}
TEST_F(MultiCfIteratorTest, EmptyCfs) {
TEST_F(CoalescingIteratorTest, EmptyCfs) {
Options options = GetDefaultOptions();
{
// Case 1: No keys in any of the CFs
CreateAndReopenWithCF({"cf_1", "cf_2", "cf_3"}, options);
std::unique_ptr<Iterator> iter =
db_->NewMultiCfIterator(ReadOptions(), handles_);
db_->NewCoalescingIterator(ReadOptions(), handles_);
iter->SeekToFirst();
ASSERT_EQ(IterStatus(iter.get()), "(invalid)");
iter->SeekToLast();
@ -237,7 +225,7 @@ TEST_F(MultiCfIteratorTest, EmptyCfs) {
// Case 2: A single key exists in only one of the CF. Rest CFs are empty.
ASSERT_OK(Put(1, "key_1", "key_1_cf_1_val"));
std::unique_ptr<Iterator> iter =
db_->NewMultiCfIterator(ReadOptions(), handles_);
db_->NewCoalescingIterator(ReadOptions(), handles_);
iter->SeekToFirst();
ASSERT_EQ(IterStatus(iter.get()), "key_1->key_1_cf_1_val");
iter->Next();
@ -251,21 +239,21 @@ TEST_F(MultiCfIteratorTest, EmptyCfs) {
// Case 3: same key exists in all of the CFs except one (cf_2)
ASSERT_OK(Put(0, "key_1", "key_1_cf_0_val"));
ASSERT_OK(Put(3, "key_1", "key_1_cf_3_val"));
// handles_ are in the order of 0->1->2->3. We should expect value from cf_0
// handles_ are in the order of 0->1->2->3
std::unique_ptr<Iterator> iter =
db_->NewMultiCfIterator(ReadOptions(), handles_);
db_->NewCoalescingIterator(ReadOptions(), handles_);
iter->SeekToFirst();
ASSERT_EQ(IterStatus(iter.get()), "key_1->key_1_cf_0_val");
ASSERT_EQ(IterStatus(iter.get()), "key_1->key_1_cf_3_val");
iter->Next();
ASSERT_EQ(IterStatus(iter.get()), "(invalid)");
iter->SeekToLast();
ASSERT_EQ(IterStatus(iter.get()), "key_1->key_1_cf_0_val");
ASSERT_EQ(IterStatus(iter.get()), "key_1->key_1_cf_3_val");
iter->Prev();
ASSERT_EQ(IterStatus(iter.get()), "(invalid)");
}
}
TEST_F(MultiCfIteratorTest, WideColumns) {
TEST_F(CoalescingIteratorTest, WideColumns) {
// Set up the DB and Column Families
Options options = GetDefaultOptions();
CreateAndReopenWithCF({"cf_1", "cf_2", "cf_3"}, options);
@ -274,30 +262,65 @@ TEST_F(MultiCfIteratorTest, WideColumns) {
WideColumns key_1_columns_in_cf_2{
{kDefaultWideColumnName, "cf_2_col_val_0_key_1"},
{"cf_2_col_name_1", "cf_2_col_val_1_key_1"},
{"cf_2_col_name_2", "cf_2_col_val_2_key_1"}};
{"cf_2_col_name_2", "cf_2_col_val_2_key_1"},
{"cf_overlap_col_name", "cf_2_overlap_value_key_1"}};
WideColumns key_1_columns_in_cf_3{
{"cf_3_col_name_1", "cf_3_col_val_1_key_1"},
{"cf_3_col_name_2", "cf_3_col_val_2_key_1"},
{"cf_3_col_name_3", "cf_3_col_val_3_key_1"}};
{"cf_3_col_name_3", "cf_3_col_val_3_key_1"},
{"cf_overlap_col_name", "cf_3_overlap_value_key_1"}};
WideColumns key_1_expected_columns_cfh_order_2_3{
{kDefaultWideColumnName, "cf_2_col_val_0_key_1"},
{"cf_2_col_name_1", "cf_2_col_val_1_key_1"},
{"cf_2_col_name_2", "cf_2_col_val_2_key_1"},
{"cf_3_col_name_1", "cf_3_col_val_1_key_1"},
{"cf_3_col_name_2", "cf_3_col_val_2_key_1"},
{"cf_3_col_name_3", "cf_3_col_val_3_key_1"},
{"cf_overlap_col_name", "cf_3_overlap_value_key_1"}};
WideColumns key_1_expected_columns_cfh_order_3_2{
{kDefaultWideColumnName, "cf_2_col_val_0_key_1"},
{"cf_2_col_name_1", "cf_2_col_val_1_key_1"},
{"cf_2_col_name_2", "cf_2_col_val_2_key_1"},
{"cf_3_col_name_1", "cf_3_col_val_1_key_1"},
{"cf_3_col_name_2", "cf_3_col_val_2_key_1"},
{"cf_3_col_name_3", "cf_3_col_val_3_key_1"},
{"cf_overlap_col_name", "cf_2_overlap_value_key_1"}};
constexpr char key_2[] = "key_2";
WideColumns key_2_columns_in_cf_1{
{"cf_1_col_name_1", "cf_1_col_val_1_key_2"}};
{"cf_overlap_col_name", "cf_1_overlap_value_key_2"}};
WideColumns key_2_columns_in_cf_2{
{"cf_2_col_name_1", "cf_2_col_val_1_key_2"},
{"cf_2_col_name_2", "cf_2_col_val_2_key_2"}};
{"cf_2_col_name_2", "cf_2_col_val_2_key_2"},
{"cf_overlap_col_name", "cf_2_overlap_value_key_2"}};
WideColumns key_2_expected_columns_cfh_order_1_2{
{"cf_2_col_name_1", "cf_2_col_val_1_key_2"},
{"cf_2_col_name_2", "cf_2_col_val_2_key_2"},
{"cf_overlap_col_name", "cf_2_overlap_value_key_2"}};
WideColumns key_2_expected_columns_cfh_order_2_1{
{"cf_2_col_name_1", "cf_2_col_val_1_key_2"},
{"cf_2_col_name_2", "cf_2_col_val_2_key_2"},
{"cf_overlap_col_name", "cf_1_overlap_value_key_2"}};
constexpr char key_3[] = "key_3";
WideColumns key_3_columns_in_cf_1{
{"cf_1_col_name_1", "cf_1_col_val_1_key_3"}};
WideColumns key_3_columns_in_cf_3{
{"cf_3_col_name_1", "cf_3_col_val_1_key_3"}};
WideColumns key_3_expected_columns{
{"cf_1_col_name_1", "cf_1_col_val_1_key_3"},
{"cf_3_col_name_1", "cf_3_col_val_1_key_3"},
};
constexpr char key_4[] = "key_4";
WideColumns key_4_columns_in_cf_0{
{"cf_0_col_name_1", "cf_0_col_val_1_key_4"}};
WideColumns key_4_columns_in_cf_2{
{"cf_2_col_name_1", "cf_2_col_val_1_key_4"}};
WideColumns key_4_expected_columns{
{"cf_0_col_name_1", "cf_0_col_val_1_key_4"},
{"cf_2_col_name_1", "cf_2_col_val_1_key_4"},
};
// Use AttributeGroup PutEntity API to insert them together
AttributeGroups key_1_attribute_groups{
@ -318,31 +341,47 @@ TEST_F(MultiCfIteratorTest, WideColumns) {
ASSERT_OK(db_->PutEntity(WriteOptions(), key_3, key_3_attribute_groups));
ASSERT_OK(db_->PutEntity(WriteOptions(), key_4, key_4_attribute_groups));
// Test for iteration over CF default->1->2->3
std::vector<ColumnFamilyHandle*> cfhs_order_0_1_2_3 = {
handles_[0], handles_[1], handles_[2], handles_[3]};
// Keys should be returned in order regardless of cfh order
std::vector<Slice> expected_keys = {key_1, key_2, key_3, key_4};
// Pick what DBIter would return for value() in the first CF that key exists
// Since value for kDefaultWideColumnName only exists for key_1, rest will
// return empty value
// return empty value after coalesced
std::vector<Slice> expected_values = {"cf_2_col_val_0_key_1", "", "", ""};
// Pick columns from the first CF that the key exists and value is stored as
// wide column
std::vector<WideColumns> expected_wide_columns = {
{{kDefaultWideColumnName, "cf_2_col_val_0_key_1"},
{"cf_2_col_name_1", "cf_2_col_val_1_key_1"},
{"cf_2_col_name_2", "cf_2_col_val_2_key_1"}},
{{"cf_1_col_name_1", "cf_1_col_val_1_key_2"}},
{{"cf_1_col_name_1", "cf_1_col_val_1_key_3"}},
{{"cf_0_col_name_1", "cf_0_col_val_1_key_4"}}};
verifyMultiCfIterator(cfhs_order_0_1_2_3, expected_keys, expected_values,
expected_wide_columns);
// Test for iteration over CF default->1->2->3
{
std::vector<ColumnFamilyHandle*> cfhs_order_0_1_2_3 = {
handles_[0], handles_[1], handles_[2], handles_[3]};
// Coalesced columns
std::vector<WideColumns> expected_wide_columns_0_1_2_3 = {
key_1_expected_columns_cfh_order_2_3,
key_2_expected_columns_cfh_order_1_2, key_3_expected_columns,
key_4_expected_columns};
verifyCoalescingIterator(cfhs_order_0_1_2_3, expected_keys, expected_values,
expected_wide_columns_0_1_2_3);
}
TEST_F(MultiCfIteratorTest, DifferentComparatorsInMultiCFs) {
// Test for iteration over CF 3->2->default->1
{
std::vector<ColumnFamilyHandle*> cfhs_order_3_2_0_1 = {
handles_[3], handles_[2], handles_[0], handles_[1]};
// Coalesced columns
std::vector<WideColumns> expected_wide_columns_3_2_0_1 = {
key_1_expected_columns_cfh_order_3_2,
key_2_expected_columns_cfh_order_2_1, key_3_expected_columns,
key_4_expected_columns};
verifyCoalescingIterator(cfhs_order_3_2_0_1, expected_keys, expected_values,
expected_wide_columns_3_2_0_1);
}
}
TEST_F(CoalescingIteratorTest, DifferentComparatorsInMultiCFs) {
// This test creates two column families with two different comparators.
// Attempting to create the MultiCFIterator should fail.
// Attempting to create the CoalescingIterator should fail.
Options options = GetDefaultOptions();
options.create_if_missing = true;
DestroyAndReopen(options);
@ -362,15 +401,15 @@ TEST_F(MultiCfIteratorTest, DifferentComparatorsInMultiCFs) {
verifyExpectedKeys(handles_[1], {"key_3", "key_2", "key_1"});
std::unique_ptr<Iterator> iter =
db_->NewMultiCfIterator(ReadOptions(), handles_);
db_->NewCoalescingIterator(ReadOptions(), handles_);
ASSERT_NOK(iter->status());
ASSERT_TRUE(iter->status().IsInvalidArgument());
}
TEST_F(MultiCfIteratorTest, CustomComparatorsInMultiCFs) {
TEST_F(CoalescingIteratorTest, CustomComparatorsInMultiCFs) {
// This test creates two column families with the same custom test
// comparators (but instantiated independently). Attempting to create the
// MultiCFIterator should not fail.
// CoalescingIterator should not fail.
Options options = GetDefaultOptions();
options.create_if_missing = true;
DestroyAndReopen(options);
@ -410,12 +449,12 @@ TEST_F(MultiCfIteratorTest, CustomComparatorsInMultiCFs) {
std::vector<Slice> expected_keys = {
"key_001_003", "key_001_002", "key_001_001", "key_002_003", "key_002_002",
"key_002_001", "key_003_006", "key_003_005", "key_003_004"};
std::vector<Slice> expected_values = {"value_0_1", "value_0_2", "value_0_3",
std::vector<Slice> expected_values = {"value_1_1", "value_1_2", "value_1_3",
"value_0_4", "value_0_5", "value_0_6",
"value_1_4", "value_1_5", "value_1_6"};
int i = 0;
std::unique_ptr<Iterator> iter =
db_->NewMultiCfIterator(ReadOptions(), handles_);
db_->NewCoalescingIterator(ReadOptions(), handles_);
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
ASSERT_EQ(expected_keys[i], iter->key());
ASSERT_EQ(expected_values[i], iter->value());
@ -424,7 +463,40 @@ TEST_F(MultiCfIteratorTest, CustomComparatorsInMultiCFs) {
ASSERT_OK(iter->status());
}
TEST_F(MultiCfIteratorTest, DISABLED_IterateAttributeGroups) {
class AttributeGroupIteratorTest : public DBTestBase {
public:
AttributeGroupIteratorTest()
: DBTestBase("attribute_group_iterator_test", /*env_do_fsync=*/true) {}
// TODO - Verify AttributeGroup Iterator
void verifyAttributeGroupIterator(
const std::vector<ColumnFamilyHandle*>& cfhs,
const std::vector<Slice>& expected_keys,
const std::vector<AttributeGroups>& /*expected_attribute_groups*/) {
int i = 0;
std::unique_ptr<AttributeGroupIterator> iter =
db_->NewAttributeGroupIterator(ReadOptions(), cfhs);
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
ASSERT_EQ(expected_keys[i], iter->key());
// TODO - uncomment this after implementing attribute_groups()
// ASSERT_EQ(expected_attribute_groups[i], iter->attribute_groups());
++i;
}
ASSERT_EQ(expected_keys.size(), i);
ASSERT_OK(iter->status());
int rev_i = i - 1;
for (iter->SeekToLast(); iter->Valid(); iter->Prev()) {
ASSERT_EQ(expected_keys[rev_i], iter->key());
// TODO - uncomment this after implementing attribute_groups()
// ASSERT_EQ(expected_attribute_groups[rev_i], iter->attribute_groups());
rev_i--;
}
ASSERT_OK(iter->status());
}
};
TEST_F(AttributeGroupIteratorTest, IterateAttributeGroups) {
// Set up the DB and Column Families
Options options = GetDefaultOptions();
CreateAndReopenWithCF({"cf_1", "cf_2", "cf_3"}, options);
@ -483,9 +555,8 @@ TEST_F(MultiCfIteratorTest, DISABLED_IterateAttributeGroups) {
std::vector<AttributeGroups> expected_attribute_groups = {
key_1_attribute_groups, key_2_attribute_groups, key_3_attribute_groups,
key_4_attribute_groups};
verifyMultiCfIterator(
cfhs_order_0_1_2_3, expected_keys, std::nullopt /* expected_values */,
std::nullopt /* expected_wide_columns */, expected_attribute_groups);
verifyAttributeGroupIterator(cfhs_order_0_1_2_3, expected_keys,
expected_attribute_groups);
}
} // namespace ROCKSDB_NAMESPACE

View File

@ -4,7 +4,6 @@
// (found in the LICENSE.Apache file in the root directory).
#include "rocksdb/wide_columns.h"
#include "db/wide/wide_column_serialization.h"
namespace ROCKSDB_NAMESPACE {
@ -12,7 +11,6 @@ namespace ROCKSDB_NAMESPACE {
const Slice kDefaultWideColumnName;
const WideColumns kNoWideColumns;
const AttributeGroups kNoAttributeGroups;
Status PinnableWideColumns::CreateIndexForWideColumns() {
Slice value_copy = value_;

View File

@ -0,0 +1,95 @@
// Copyright (c) Meta Platforms, Inc. and affiliates.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#pragma once
#include "rocksdb/iterator_base.h"
#include "rocksdb/wide_columns.h"
namespace ROCKSDB_NAMESPACE {
class ColumnFamilyHandle;
// Class representing attribute group. Attribute group is a logical grouping of
// wide-column entities by leveraging Column Families.
// Used in Write Path
class AttributeGroup {
public:
explicit AttributeGroup(ColumnFamilyHandle* column_family,
const WideColumns& columns)
: column_family_(column_family), columns_(columns) {}
ColumnFamilyHandle* column_family() const { return column_family_; }
const WideColumns& columns() const { return columns_; }
WideColumns& columns() { return columns_; }
private:
ColumnFamilyHandle* column_family_;
WideColumns columns_;
};
inline bool operator==(const AttributeGroup& lhs, const AttributeGroup& rhs) {
return lhs.column_family() == rhs.column_family() &&
lhs.columns() == rhs.columns();
}
// A collection of Attribute Groups.
using AttributeGroups = std::vector<AttributeGroup>;
// An empty set of Attribute Groups.
extern const AttributeGroups kNoAttributeGroups;
// Used in Read Path. Wide-columns returned from the query are pinnable.
class PinnableAttributeGroup {
public:
explicit PinnableAttributeGroup(ColumnFamilyHandle* column_family)
: column_family_(column_family), status_(Status::OK()) {}
ColumnFamilyHandle* column_family() const { return column_family_; }
const Status& status() const { return status_; }
const WideColumns& columns() const { return columns_.columns(); }
void SetStatus(const Status& status);
void SetColumns(PinnableWideColumns&& columns);
void Reset();
private:
ColumnFamilyHandle* column_family_;
Status status_;
PinnableWideColumns columns_;
};
inline void PinnableAttributeGroup::SetStatus(const Status& status) {
status_ = status;
}
inline void PinnableAttributeGroup::SetColumns(PinnableWideColumns&& columns) {
columns_ = std::move(columns);
}
inline void PinnableAttributeGroup::Reset() {
SetStatus(Status::OK());
columns_.Reset();
}
// A collection of Pinnable Attribute Groups.
using PinnableAttributeGroups = std::vector<PinnableAttributeGroup>;
// UNDER CONSTRUCTION - DO NOT USE
// A cross-column-family iterator that collects and returns attribute groups for
// each key in order provided by comparator
class AttributeGroupIterator : public IteratorBase {
public:
AttributeGroupIterator() {}
~AttributeGroupIterator() override {}
// No copy allowed
AttributeGroupIterator(const AttributeGroupIterator&) = delete;
AttributeGroupIterator& operator=(const AttributeGroupIterator&) = delete;
virtual const AttributeGroups& attribute_groups() const = 0;
};
} // namespace ROCKSDB_NAMESPACE

View File

@ -17,6 +17,7 @@
#include <unordered_map>
#include <vector>
#include "rocksdb/attribute_groups.h"
#include "rocksdb/block_cache_trace_writer.h"
#include "rocksdb/iterator.h"
#include "rocksdb/listener.h"
@ -973,10 +974,26 @@ class DB {
// UNDER CONSTRUCTION - DO NOT USE
// Return a cross-column-family iterator from a consistent database state.
// When the same key is present in multiple column families, the iterator
// selects the value or columns from the first column family containing the
// key, in the order specified by the `column_families` parameter.
virtual std::unique_ptr<Iterator> NewMultiCfIterator(
//
// If a key exists in more than one column family, value() will be determined
// by the wide column value of kDefaultColumnName after coalesced as described
// below.
//
// Each wide column will be independently shadowed by the CFs.
// For example, if CF1 has "key_1" ==> {"col_1": "foo",
// "col_2", "baz"} and CF2 has "key_1" ==> {"col_2": "quux", "col_3", "bla"},
// and when the iterator is at key_1, columns() will return
// {"col_1": "foo", "col_2", "quux", "col_3", "bla"}
// In this example, value() will be empty, because none of them have values
// for kDefaultColumnName
virtual std::unique_ptr<Iterator> NewCoalescingIterator(
const ReadOptions& options,
const std::vector<ColumnFamilyHandle*>& column_families) = 0;
// UNDER CONSTRUCTION - DO NOT USE
// A cross-column-family iterator that collects and returns attribute groups
// for each key in order provided by comparator
virtual std::unique_ptr<AttributeGroupIterator> NewAttributeGroupIterator(
const ReadOptions& options,
const std::vector<ColumnFamilyHandle*>& column_families) = 0;

View File

@ -20,14 +20,12 @@
#include <string>
#include "rocksdb/cleanable.h"
#include "rocksdb/slice.h"
#include "rocksdb/status.h"
#include "rocksdb/iterator_base.h"
#include "rocksdb/wide_columns.h"
namespace ROCKSDB_NAMESPACE {
class Iterator : public Cleanable {
class Iterator : public IteratorBase {
public:
Iterator() {}
// No copying allowed
@ -36,51 +34,6 @@ class Iterator : public Cleanable {
virtual ~Iterator() {}
// An iterator is either positioned at a key/value pair, or
// not valid. This method returns true iff the iterator is valid.
// Always returns false if !status().ok().
virtual bool Valid() const = 0;
// Position at the first key in the source. The iterator is Valid()
// after this call iff the source is not empty.
virtual void SeekToFirst() = 0;
// Position at the last key in the source. The iterator is
// Valid() after this call iff the source is not empty.
virtual void SeekToLast() = 0;
// Position at the first key in the source that at or past target.
// The iterator is Valid() after this call iff the source contains
// an entry that comes at or past target.
// All Seek*() methods clear any error status() that the iterator had prior to
// the call; after the seek, status() indicates only the error (if any) that
// happened during the seek, not any past errors.
// Target does not contain timestamp.
virtual void Seek(const Slice& target) = 0;
// Position at the last key in the source that at or before target.
// The iterator is Valid() after this call iff the source contains
// an entry that comes at or before target.
// Target does not contain timestamp.
virtual void SeekForPrev(const Slice& target) = 0;
// Moves to the next entry in the source. After this call, Valid() is
// true iff the iterator was not positioned at the last entry in the source.
// REQUIRES: Valid()
virtual void Next() = 0;
// Moves to the previous entry in the source. After this call, Valid() is
// true iff the iterator was not positioned at the first entry in source.
// REQUIRES: Valid()
virtual void Prev() = 0;
// Return the key for the current entry. The underlying storage for
// the returned slice is valid only until the next modification of the
// iterator (i.e. the next SeekToFirst/SeekToLast/Seek/SeekForPrev/Next/Prev
// operation).
// REQUIRES: Valid()
virtual Slice key() const = 0;
// Return the value for the current entry. If the entry is a plain key-value,
// return the value as-is; if it is a wide-column entity, return the value of
// the default anonymous column (see kDefaultWideColumnName) if any, or an
@ -102,11 +55,6 @@ class Iterator : public Cleanable {
return kNoWideColumns;
}
// If an error has occurred, return it. Else return an ok status.
// If non-blocking IO is requested and this operation cannot be
// satisfied without doing some IO, then this returns Status::Incomplete().
virtual Status status() const = 0;
// If supported, the DB state that the iterator reads from is updated to
// the latest state. The iterator will be invalidated after the call.
// Regardless of whether the iterator was created/refreshed previously

View File

@ -0,0 +1,74 @@
// Copyright (c) Meta Platforms, Inc. and affiliates.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#pragma once
#include "rocksdb/cleanable.h"
#include "rocksdb/slice.h"
#include "rocksdb/status.h"
namespace ROCKSDB_NAMESPACE {
class IteratorBase : public Cleanable {
public:
IteratorBase() {}
// No copying allowed
IteratorBase(const IteratorBase&) = delete;
void operator=(const IteratorBase&) = delete;
virtual ~IteratorBase() {}
// An iterator is either positioned at a key/value pair, or
// not valid. This method returns true iff the iterator is valid.
// Always returns false if !status().ok().
virtual bool Valid() const = 0;
// Position at the first key in the source. The iterator is Valid()
// after this call iff the source is not empty.
virtual void SeekToFirst() = 0;
// Position at the last key in the source. The iterator is
// Valid() after this call iff the source is not empty.
virtual void SeekToLast() = 0;
// Position at the first key in the source that at or past target.
// The iterator is Valid() after this call iff the source contains
// an entry that comes at or past target.
// All Seek*() methods clear any error status() that the iterator had prior to
// the call; after the seek, status() indicates only the error (if any) that
// happened during the seek, not any past errors.
// Target does not contain timestamp.
virtual void Seek(const Slice& target) = 0;
// Position at the last key in the source that at or before target.
// The iterator is Valid() after this call iff the source contains
// an entry that comes at or before target.
// Target does not contain timestamp.
virtual void SeekForPrev(const Slice& target) = 0;
// Moves to the next entry in the source. After this call, Valid() is
// true iff the iterator was not positioned at the last entry in the source.
// REQUIRES: Valid()
virtual void Next() = 0;
// Moves to the previous entry in the source. After this call, Valid() is
// true iff the iterator was not positioned at the first entry in source.
// REQUIRES: Valid()
virtual void Prev() = 0;
// Return the key for the current entry. The underlying storage for
// the returned slice is valid only until the next modification of the
// iterator (i.e. the next SeekToFirst/SeekToLast/Seek/SeekForPrev/Next/Prev
// operation).
// REQUIRES: Valid()
virtual Slice key() const = 0;
// If an error has occurred, return it. Else return an ok status.
// If non-blocking IO is requested and this operation cannot be
// satisfied without doing some IO, then this returns Status::Incomplete().
virtual Status status() const = 0;
};
} // namespace ROCKSDB_NAMESPACE

View File

@ -259,11 +259,18 @@ class StackableDB : public DB {
return db_->NewIterators(options, column_families, iterators);
}
using DB::NewMultiCfIterator;
std::unique_ptr<Iterator> NewMultiCfIterator(
using DB::NewCoalescingIterator;
std::unique_ptr<Iterator> NewCoalescingIterator(
const ReadOptions& options,
const std::vector<ColumnFamilyHandle*>& column_families) override {
return db_->NewMultiCfIterator(options, column_families);
return db_->NewCoalescingIterator(options, column_families);
}
using DB::NewAttributeGroupIterator;
std::unique_ptr<AttributeGroupIterator> NewAttributeGroupIterator(
const ReadOptions& options,
const std::vector<ColumnFamilyHandle*>& column_families) override {
return db_->NewAttributeGroupIterator(options, column_families);
}
const Snapshot* GetSnapshot() override { return db_->GetSnapshot(); }

View File

@ -220,69 +220,5 @@ inline bool operator!=(const PinnableWideColumns& lhs,
return !(lhs == rhs);
}
// Class representing attribute group. Attribute group is a logical grouping of
// wide-column entities by leveraging Column Families.
// Used in Write Path
class AttributeGroup {
public:
ColumnFamilyHandle* column_family() const { return column_family_; }
const WideColumns& columns() const { return columns_; }
WideColumns& columns() { return columns_; }
explicit AttributeGroup(ColumnFamilyHandle* column_family,
const WideColumns& columns)
: column_family_(column_family), columns_(columns) {}
private:
ColumnFamilyHandle* column_family_;
WideColumns columns_;
};
inline bool operator==(const AttributeGroup& lhs, const AttributeGroup& rhs) {
return lhs.column_family() == rhs.column_family() &&
lhs.columns() == rhs.columns();
}
// A collection of Attribute Groups.
using AttributeGroups = std::vector<AttributeGroup>;
// An empty set of Attribute Groups.
extern const AttributeGroups kNoAttributeGroups;
// Used in Read Path. Wide-columns returned from the query are pinnable.
class PinnableAttributeGroup {
public:
ColumnFamilyHandle* column_family() const { return column_family_; }
const Status& status() const { return status_; }
const WideColumns& columns() const { return columns_.columns(); }
explicit PinnableAttributeGroup(ColumnFamilyHandle* column_family)
: column_family_(column_family), status_(Status::OK()) {}
void SetStatus(const Status& status);
void SetColumns(PinnableWideColumns&& columns);
void Reset();
private:
ColumnFamilyHandle* column_family_;
Status status_;
PinnableWideColumns columns_;
};
inline void PinnableAttributeGroup::SetStatus(const Status& status) {
status_ = status;
}
inline void PinnableAttributeGroup::SetColumns(PinnableWideColumns&& columns) {
columns_ = std::move(columns);
}
inline void PinnableAttributeGroup::Reset() {
SetStatus(Status::OK());
columns_.Reset();
}
// A collection of Pinnable Attribute Groups.
using PinnableAttributeGroups = std::vector<PinnableAttributeGroup>;
} // namespace ROCKSDB_NAMESPACE

View File

@ -10,8 +10,8 @@
#include <cstddef>
#include "rocksdb/attribute_groups.h"
#include "rocksdb/rocksdb_namespace.h"
#include "rocksdb/wide_columns.h"
namespace ROCKSDB_NAMESPACE {

3
src.mk
View File

@ -14,6 +14,7 @@ LIB_SOURCES = \
cache/sharded_cache.cc \
cache/tiered_secondary_cache.cc \
db/arena_wrapped_db_iter.cc \
db/attribute_group_iterator_impl.cc \
db/blob/blob_contents.cc \
db/blob/blob_fetcher.cc \
db/blob/blob_file_addition.cc \
@ -30,6 +31,7 @@ LIB_SOURCES = \
db/blob/prefetch_buffer_collection.cc \
db/builder.cc \
db/c.cc \
db/coalescing_iterator.cc \
db/column_family.cc \
db/compaction/compaction.cc \
db/compaction/compaction_iterator.cc \
@ -76,7 +78,6 @@ LIB_SOURCES = \
db/memtable_list.cc \
db/merge_helper.cc \
db/merge_operator.cc \
db/multi_cf_iterator.cc \
db/output_validator.cc \
db/periodic_task_scheduler.cc \
db/range_del_aggregator.cc \