Access DBImpl* and CFD* by CFHImpl* in Iterators (#12395)

Summary:
In the current implementation of iterators, `DBImpl*` and `ColumnFamilyData*` are held in `DBIter` and `ArenaWrappedDBIter` for two purposes: tracing and Refresh() API. With the introduction of a new iterator called MultiCfIterator in PR https://github.com/facebook/rocksdb/issues/12153 , which is a cross-column-family iterator that maintains multiple DBIters as child iterators from a consistent database state, we need to make some changes to the existing implementation. The new iterator will still be exposed through the generic Iterator interface with an additional capability to return AttributeGroups (via `attribute_groups()`) which is a list of wide columns grouped by column family. For more information about AttributeGroup, please refer to previous PRs:  https://github.com/facebook/rocksdb/issues/11925 #11943, and https://github.com/facebook/rocksdb/issues/11977.

To be able to return AttributeGroup in the default single CF iterator created, access to `ColumnFamilyHandle*` within `DBIter` is necessary. However, this is not currently available in `DBIter`. Since `DBImpl*` and `ColumnFamilyData*` can be easily accessed via `ColumnFamilyHandleImpl*`, we have decided to replace the pointers to `ColumnFamilyData` and `DBImpl` in `DBIter` with a pointer to `ColumnFamilyHandleImpl`.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/12395

Test Plan:
# Summary

In the current implementation of iterators, `DBImpl*` and `ColumnFamilyData*` are held in `DBIter` and `ArenaWrappedDBIter` for two purposes: tracing and Refresh() API. With the introduction of a new iterator called MultiCfIterator in PR #12153 , which is a cross-column-family iterator that maintains multiple DBIters as child iterators from a consistent database state, we need to make some changes to the existing implementation. The new iterator will still be exposed through the generic Iterator interface with an additional capability to return AttributeGroups (via `attribute_groups()`) which is a list of wide columns grouped by column family. For more information about AttributeGroup, please refer to previous PRs:  #11925 #11943, and #11977.

To be able to return AttributeGroup in the default single CF iterator created, access to `ColumnFamilyHandle*` within `DBIter` is necessary. However, this is not currently available in `DBIter`. Since `DBImpl*` and `ColumnFamilyData*` can be easily accessed via `ColumnFamilyHandleImpl*`, we have decided to replace the pointers to `ColumnFamilyData` and `DBImpl` in `DBIter` with a pointer to `ColumnFamilyHandleImpl`.

# Test Plan

There should be no behavior changes. Existing tests and CI for the correctness tests.

**Test for Perf Regression**
Build
```
$> make -j64 release
```
Setup
```
$> TEST_TMPDIR=/dev/shm/db_bench ./db_bench -benchmarks="filluniquerandom" -key_size=32 -value_size=512 -num=1000000 -compression_type=none
```
Run
```
TEST_TMPDIR=/dev/shm/db_bench ./db_bench -use_existing_db=1 -benchmarks="newiterator,seekrandom" -cache_size=10485760000
```

Before the change
```
DB path: [/dev/shm/db_bench/dbbench]
newiterator  :       0.552 micros/op 1810157 ops/sec 0.552 seconds 1000000 operations;
DB path: [/dev/shm/db_bench/dbbench]
seekrandom   :       4.502 micros/op 222143 ops/sec 4.502 seconds 1000000 operations; (0 of 1000000 found)
```
After the change
```
DB path: [/dev/shm/db_bench/dbbench]
newiterator  :       0.520 micros/op 1924401 ops/sec 0.520 seconds 1000000 operations;
DB path: [/dev/shm/db_bench/dbbench]
seekrandom   :       4.532 micros/op 220657 ops/sec 4.532 seconds 1000000 operations; (0 of 1000000 found)
```

Reviewed By: pdillinger

Differential Revision: D54332713

Pulled By: jaykorean

fbshipit-source-id: b28d897ad519e58b1ca82eb068a6319544a4fae5
This commit is contained in:
Jay Huh 2024-03-01 10:28:20 -08:00 committed by Facebook GitHub Bot
parent 5bcc184975
commit c00c16855d
15 changed files with 143 additions and 146 deletions

View File

@ -43,14 +43,13 @@ void ArenaWrappedDBIter::Init(
Env* env, const ReadOptions& read_options, const ImmutableOptions& ioptions,
const MutableCFOptions& mutable_cf_options, const Version* version,
const SequenceNumber& sequence, uint64_t max_sequential_skip_in_iteration,
uint64_t version_number, ReadCallback* read_callback, DBImpl* db_impl,
ColumnFamilyData* cfd, bool expose_blob_index, bool allow_refresh) {
uint64_t version_number, ReadCallback* read_callback,
ColumnFamilyHandleImpl* cfh, bool expose_blob_index, bool allow_refresh) {
auto mem = arena_.AllocateAligned(sizeof(DBIter));
db_iter_ =
new (mem) DBIter(env, read_options, ioptions, mutable_cf_options,
ioptions.user_comparator, /* iter */ nullptr, version,
sequence, true, max_sequential_skip_in_iteration,
read_callback, db_impl, cfd, expose_blob_index);
db_iter_ = new (mem) DBIter(
env, read_options, ioptions, mutable_cf_options, ioptions.user_comparator,
/* iter */ nullptr, version, sequence, true,
max_sequential_skip_in_iteration, read_callback, cfh, expose_blob_index);
sv_number_ = version_number;
read_options_ = read_options;
allow_refresh_ = allow_refresh;
@ -65,40 +64,44 @@ void ArenaWrappedDBIter::Init(
Status ArenaWrappedDBIter::Refresh() { return Refresh(nullptr); }
Status ArenaWrappedDBIter::Refresh(const Snapshot* snapshot) {
if (cfd_ == nullptr || db_impl_ == nullptr || !allow_refresh_) {
if (cfh_ == nullptr || !allow_refresh_) {
return Status::NotSupported("Creating renew iterator is not allowed.");
}
assert(db_iter_ != nullptr);
auto cfd = cfh_->cfd();
auto db_impl = cfh_->db();
// TODO(yiwu): For last_seq_same_as_publish_seq_==false, this is not the
// correct behavior. Will be corrected automatically when we take a snapshot
// here for the case of WritePreparedTxnDB.
uint64_t cur_sv_number = cfd_->GetSuperVersionNumber();
uint64_t cur_sv_number = cfd->GetSuperVersionNumber();
// If we recreate a new internal iterator below (NewInternalIterator()),
// we will pass in read_options_. We need to make sure it
// has the right snapshot.
read_options_.snapshot = snapshot;
TEST_SYNC_POINT("ArenaWrappedDBIter::Refresh:1");
TEST_SYNC_POINT("ArenaWrappedDBIter::Refresh:2");
auto reinit_internal_iter = [&]() {
Env* env = db_iter_->env();
db_iter_->~DBIter();
arena_.~Arena();
new (&arena_) Arena();
SuperVersion* sv = cfd_->GetReferencedSuperVersion(db_impl_);
SuperVersion* sv = cfd->GetReferencedSuperVersion(db_impl);
assert(sv->version_number >= cur_sv_number);
SequenceNumber read_seq = GetSeqNum(db_impl_, snapshot);
SequenceNumber read_seq = GetSeqNum(db_impl, snapshot);
if (read_callback_) {
read_callback_->Refresh(read_seq);
}
Init(env, read_options_, *(cfd_->ioptions()), sv->mutable_cf_options,
Init(env, read_options_, *(cfd->ioptions()), sv->mutable_cf_options,
sv->current, read_seq,
sv->mutable_cf_options.max_sequential_skip_in_iterations,
sv->version_number, read_callback_, db_impl_, cfd_, expose_blob_index_,
sv->version_number, read_callback_, cfh_, expose_blob_index_,
allow_refresh_);
InternalIterator* internal_iter = db_impl_->NewInternalIterator(
read_options_, cfd_, sv, &arena_, read_seq,
InternalIterator* internal_iter = db_impl->NewInternalIterator(
read_options_, cfd, sv, &arena_, read_seq,
/* allow_unprepared_value */ true, /* db_iter */ this);
SetIterUnderDBIter(internal_iter);
};
@ -107,10 +110,10 @@ Status ArenaWrappedDBIter::Refresh(const Snapshot* snapshot) {
reinit_internal_iter();
break;
} else {
SequenceNumber read_seq = GetSeqNum(db_impl_, snapshot);
SequenceNumber read_seq = GetSeqNum(db_impl, snapshot);
// Refresh range-tombstones in MemTable
if (!read_options_.ignore_range_deletions) {
SuperVersion* sv = cfd_->GetThreadLocalSuperVersion(db_impl_);
SuperVersion* sv = cfd->GetThreadLocalSuperVersion(db_impl);
TEST_SYNC_POINT_CALLBACK("ArenaWrappedDBIter::Refresh:SV", nullptr);
auto t = sv->mem->NewRangeTombstoneIterator(
read_options_, read_seq, false /* immutable_memtable */);
@ -123,13 +126,13 @@ Status ArenaWrappedDBIter::Refresh(const Snapshot* snapshot) {
// will be freed during db_iter destruction there.
if (memtable_range_tombstone_iter_) {
assert(!*memtable_range_tombstone_iter_ ||
sv_number_ != cfd_->GetSuperVersionNumber());
sv_number_ != cfd->GetSuperVersionNumber());
}
delete t;
} else { // current mutable memtable has range tombstones
if (!memtable_range_tombstone_iter_) {
delete t;
db_impl_->ReturnAndCleanupSuperVersion(cfd_, sv);
db_impl->ReturnAndCleanupSuperVersion(cfd, sv);
// The memtable under DBIter did not have range tombstone before
// refresh.
reinit_internal_iter();
@ -138,13 +141,13 @@ Status ArenaWrappedDBIter::Refresh(const Snapshot* snapshot) {
delete *memtable_range_tombstone_iter_;
*memtable_range_tombstone_iter_ = new TruncatedRangeDelIterator(
std::unique_ptr<FragmentedRangeTombstoneIterator>(t),
&cfd_->internal_comparator(), nullptr, nullptr);
&cfd->internal_comparator(), nullptr, nullptr);
}
}
db_impl_->ReturnAndCleanupSuperVersion(cfd_, sv);
db_impl->ReturnAndCleanupSuperVersion(cfd, sv);
}
// Check again if the latest super version number is changed
uint64_t latest_sv_number = cfd_->GetSuperVersionNumber();
uint64_t latest_sv_number = cfd->GetSuperVersionNumber();
if (latest_sv_number != cur_sv_number) {
// If the super version number is changed after refreshing,
// fallback to Re-Init the InternalIterator
@ -163,14 +166,14 @@ ArenaWrappedDBIter* NewArenaWrappedDbIterator(
Env* env, const ReadOptions& read_options, const ImmutableOptions& ioptions,
const MutableCFOptions& mutable_cf_options, const Version* version,
const SequenceNumber& sequence, uint64_t max_sequential_skip_in_iterations,
uint64_t version_number, ReadCallback* read_callback, DBImpl* db_impl,
ColumnFamilyData* cfd, bool expose_blob_index, bool allow_refresh) {
uint64_t version_number, ReadCallback* read_callback,
ColumnFamilyHandleImpl* cfh, bool expose_blob_index, bool allow_refresh) {
ArenaWrappedDBIter* iter = new ArenaWrappedDBIter();
iter->Init(env, read_options, ioptions, mutable_cf_options, version, sequence,
max_sequential_skip_in_iterations, version_number, read_callback,
db_impl, cfd, expose_blob_index, allow_refresh);
if (db_impl != nullptr && cfd != nullptr && allow_refresh) {
iter->StoreRefreshInfo(db_impl, cfd, read_callback, expose_blob_index);
cfh, expose_blob_index, allow_refresh);
if (cfh != nullptr && allow_refresh) {
iter->StoreRefreshInfo(cfh, read_callback, expose_blob_index);
}
return iter;

View File

@ -87,15 +87,14 @@ class ArenaWrappedDBIter : public Iterator {
const MutableCFOptions& mutable_cf_options, const Version* version,
const SequenceNumber& sequence,
uint64_t max_sequential_skip_in_iterations, uint64_t version_number,
ReadCallback* read_callback, DBImpl* db_impl, ColumnFamilyData* cfd,
ReadCallback* read_callback, ColumnFamilyHandleImpl* cfh,
bool expose_blob_index, bool allow_refresh);
// Store some parameters so we can refresh the iterator at a later point
// with these same params
void StoreRefreshInfo(DBImpl* db_impl, ColumnFamilyData* cfd,
void StoreRefreshInfo(ColumnFamilyHandleImpl* cfh,
ReadCallback* read_callback, bool expose_blob_index) {
db_impl_ = db_impl;
cfd_ = cfd;
cfh_ = cfh;
read_callback_ = read_callback;
expose_blob_index_ = expose_blob_index;
}
@ -104,8 +103,7 @@ class ArenaWrappedDBIter : public Iterator {
DBIter* db_iter_ = nullptr;
Arena arena_;
uint64_t sv_number_;
ColumnFamilyData* cfd_ = nullptr;
DBImpl* db_impl_ = nullptr;
ColumnFamilyHandleImpl* cfh_ = nullptr;
ReadOptions read_options_;
ReadCallback* read_callback_;
bool expose_blob_index_ = false;
@ -116,13 +114,13 @@ class ArenaWrappedDBIter : public Iterator {
};
// Generate the arena wrapped iterator class.
// `db_impl` and `cfd` are used for reneweal. If left null, renewal will not
// `cfh` is used for reneweal. If left null, renewal will not
// be supported.
ArenaWrappedDBIter* NewArenaWrappedDbIterator(
Env* env, const ReadOptions& read_options, const ImmutableOptions& ioptions,
const MutableCFOptions& mutable_cf_options, const Version* version,
const SequenceNumber& sequence, uint64_t max_sequential_skip_in_iterations,
uint64_t version_number, ReadCallback* read_callback,
DBImpl* db_impl = nullptr, ColumnFamilyData* cfd = nullptr,
bool expose_blob_index = false, bool allow_refresh = true);
ColumnFamilyHandleImpl* cfh = nullptr, bool expose_blob_index = false,
bool allow_refresh = true);
} // namespace ROCKSDB_NAMESPACE

View File

@ -45,10 +45,10 @@ class DBBlobIndexTest : public DBTestBase {
DBBlobIndexTest() : DBTestBase("db_blob_index_test", /*env_do_fsync=*/true) {}
ColumnFamilyHandle* cfh() { return dbfull()->DefaultColumnFamily(); }
ColumnFamilyData* cfd() {
return static_cast_with_check<ColumnFamilyHandleImpl>(cfh())->cfd();
ColumnFamilyHandleImpl* cfh_impl() {
return static_cast_with_check<ColumnFamilyHandleImpl>(cfh());
}
ColumnFamilyData* cfd() { return cfh_impl()->cfd(); }
Status PutBlobIndex(WriteBatch* batch, const Slice& key,
const Slice& blob_index) {
@ -96,11 +96,9 @@ class DBBlobIndexTest : public DBTestBase {
}
ArenaWrappedDBIter* GetBlobIterator() {
ColumnFamilyData* column_family = cfd();
DBImpl* db_impl = dbfull();
return db_impl->NewIteratorImpl(
ReadOptions(), column_family,
column_family->GetReferencedSuperVersion(db_impl),
ReadOptions(), cfh_impl(), cfd()->GetReferencedSuperVersion(db_impl),
db_impl->GetLatestSequenceNumber(), nullptr /*read_callback*/,
true /*expose_blob_index*/);
}

View File

@ -168,6 +168,7 @@ class ColumnFamilyHandleImpl : public ColumnFamilyHandle {
// destroy without mutex
virtual ~ColumnFamilyHandleImpl();
virtual ColumnFamilyData* cfd() const { return cfd_; }
virtual DBImpl* db() const { return db_; }
uint32_t GetID() const override;
const std::string& GetName() const override;

View File

@ -3618,9 +3618,9 @@ Iterator* DBImpl::NewIterator(const ReadOptions& _read_options,
}
auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
assert(cfh != nullptr);
ColumnFamilyData* cfd = cfh->cfd();
assert(cfd != nullptr);
ReadCallback* read_callback = nullptr; // No read callback provided.
SuperVersion* sv = cfd->GetReferencedSuperVersion(this);
if (read_options.timestamp && read_options.timestamp->size() > 0) {
const Status s =
@ -3636,24 +3636,24 @@ Iterator* DBImpl::NewIterator(const ReadOptions& _read_options,
result = NewDBIterator(
env_, read_options, *cfd->ioptions(), sv->mutable_cf_options,
cfd->user_comparator(), iter, sv->current, kMaxSequenceNumber,
sv->mutable_cf_options.max_sequential_skip_in_iterations, read_callback,
this, cfd);
sv->mutable_cf_options.max_sequential_skip_in_iterations,
nullptr /* read_callback */, cfh);
} else {
// Note: no need to consider the special case of
// last_seq_same_as_publish_seq_==false since NewIterator is overridden in
// WritePreparedTxnDB
result = NewIteratorImpl(read_options, cfd, sv,
result = NewIteratorImpl(read_options, cfh, sv,
(read_options.snapshot != nullptr)
? read_options.snapshot->GetSequenceNumber()
: kMaxSequenceNumber,
read_callback);
nullptr /* read_callback */);
}
return result;
}
ArenaWrappedDBIter* DBImpl::NewIteratorImpl(
const ReadOptions& read_options, ColumnFamilyData* cfd, SuperVersion* sv,
SequenceNumber snapshot, ReadCallback* read_callback,
const ReadOptions& read_options, ColumnFamilyHandleImpl* cfh,
SuperVersion* sv, SequenceNumber snapshot, ReadCallback* read_callback,
bool expose_blob_index, bool allow_refresh) {
TEST_SYNC_POINT("DBImpl::NewIterator:1");
TEST_SYNC_POINT("DBImpl::NewIterator:2");
@ -3716,13 +3716,13 @@ ArenaWrappedDBIter* DBImpl::NewIteratorImpl(
// likely that any iterator pointer is close to the iterator it points to so
// that they are likely to be in the same cache line and/or page.
ArenaWrappedDBIter* db_iter = NewArenaWrappedDbIterator(
env_, read_options, *cfd->ioptions(), sv->mutable_cf_options, sv->current,
snapshot, sv->mutable_cf_options.max_sequential_skip_in_iterations,
sv->version_number, read_callback, this, cfd, expose_blob_index,
allow_refresh);
env_, read_options, *cfh->cfd()->ioptions(), sv->mutable_cf_options,
sv->current, snapshot,
sv->mutable_cf_options.max_sequential_skip_in_iterations,
sv->version_number, read_callback, cfh, expose_blob_index, allow_refresh);
InternalIterator* internal_iter = NewInternalIterator(
db_iter->GetReadOptions(), cfd, sv, db_iter->GetArena(), snapshot,
db_iter->GetReadOptions(), cfh->cfd(), sv, db_iter->GetArena(), snapshot,
/* allow_unprepared_value */ true, db_iter);
db_iter->SetIterUnderDBIter(internal_iter);
@ -3769,37 +3769,37 @@ Status DBImpl::NewIterators(
}
}
ReadCallback* read_callback = nullptr; // No read callback provided.
iterators->clear();
iterators->reserve(column_families.size());
autovector<std::tuple<ColumnFamilyData*, SuperVersion*>> cfd_to_sv;
autovector<std::tuple<ColumnFamilyHandleImpl*, SuperVersion*>> cfh_to_sv;
const bool check_read_ts =
read_options.timestamp && read_options.timestamp->size() > 0;
for (auto cfh : column_families) {
auto cfd = static_cast_with_check<ColumnFamilyHandleImpl>(cfh)->cfd();
for (auto cf : column_families) {
auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(cf);
auto cfd = cfh->cfd();
SuperVersion* sv = cfd->GetReferencedSuperVersion(this);
cfd_to_sv.emplace_back(cfd, sv);
cfh_to_sv.emplace_back(cfh, sv);
if (check_read_ts) {
const Status s =
FailIfReadCollapsedHistory(cfd, sv, *(read_options.timestamp));
if (!s.ok()) {
for (auto prev_entry : cfd_to_sv) {
for (auto prev_entry : cfh_to_sv) {
CleanupSuperVersion(std::get<1>(prev_entry));
}
return s;
}
}
}
assert(cfd_to_sv.size() == column_families.size());
assert(cfh_to_sv.size() == column_families.size());
if (read_options.tailing) {
for (auto [cfd, sv] : cfd_to_sv) {
auto iter = new ForwardIterator(this, read_options, cfd, sv,
for (auto [cfh, sv] : cfh_to_sv) {
auto iter = new ForwardIterator(this, read_options, cfh->cfd(), sv,
/* allow_unprepared_value */ true);
iterators->push_back(NewDBIterator(
env_, read_options, *cfd->ioptions(), sv->mutable_cf_options,
cfd->user_comparator(), iter, sv->current, kMaxSequenceNumber,
env_, read_options, *cfh->cfd()->ioptions(), sv->mutable_cf_options,
cfh->cfd()->user_comparator(), iter, sv->current, kMaxSequenceNumber,
sv->mutable_cf_options.max_sequential_skip_in_iterations,
read_callback, this, cfd));
nullptr /*read_callback*/, cfh));
}
} else {
// Note: no need to consider the special case of
@ -3808,9 +3808,9 @@ Status DBImpl::NewIterators(
auto snapshot = read_options.snapshot != nullptr
? read_options.snapshot->GetSequenceNumber()
: versions_->LastSequence();
for (auto [cfd, sv] : cfd_to_sv) {
iterators->push_back(
NewIteratorImpl(read_options, cfd, sv, snapshot, read_callback));
for (auto [cfh, sv] : cfh_to_sv) {
iterators->push_back(NewIteratorImpl(read_options, cfh, sv, snapshot,
nullptr /*read_callback*/));
}
}

View File

@ -646,8 +646,8 @@ class DBImpl : public DB {
// If `snapshot` == kMaxSequenceNumber, set a recent one inside the file.
ArenaWrappedDBIter* NewIteratorImpl(const ReadOptions& options,
ColumnFamilyData* cfd, SuperVersion* sv,
SequenceNumber snapshot,
ColumnFamilyHandleImpl* cfh,
SuperVersion* sv, SequenceNumber snapshot,
ReadCallback* read_callback,
bool expose_blob_index = false,
bool allow_refresh = true);

View File

@ -497,8 +497,8 @@ Iterator* DBImplSecondary::NewIterator(const ReadOptions& _read_options,
Iterator* result = nullptr;
auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
assert(cfh != nullptr);
auto cfd = cfh->cfd();
ReadCallback* read_callback = nullptr; // No read callback provided.
if (read_options.tailing) {
return NewErrorIterator(Status::NotSupported(
"tailing iterator not supported in secondary mode"));
@ -517,27 +517,28 @@ Iterator* DBImplSecondary::NewIterator(const ReadOptions& _read_options,
return NewErrorIterator(s);
}
}
result = NewIteratorImpl(read_options, cfd, sv, snapshot, read_callback);
result = NewIteratorImpl(read_options, cfh, sv, snapshot,
nullptr /*read_callback*/);
}
return result;
}
ArenaWrappedDBIter* DBImplSecondary::NewIteratorImpl(
const ReadOptions& read_options, ColumnFamilyData* cfd,
const ReadOptions& read_options, ColumnFamilyHandleImpl* cfh,
SuperVersion* super_version, SequenceNumber snapshot,
ReadCallback* read_callback, bool expose_blob_index, bool allow_refresh) {
assert(nullptr != cfd);
assert(nullptr != cfh);
assert(snapshot == kMaxSequenceNumber);
snapshot = versions_->LastSequence();
assert(snapshot != kMaxSequenceNumber);
auto db_iter = NewArenaWrappedDbIterator(
env_, read_options, *cfd->ioptions(), super_version->mutable_cf_options,
super_version->current, snapshot,
env_, read_options, *cfh->cfd()->ioptions(),
super_version->mutable_cf_options, super_version->current, snapshot,
super_version->mutable_cf_options.max_sequential_skip_in_iterations,
super_version->version_number, read_callback, this, cfd,
expose_blob_index, allow_refresh);
super_version->version_number, read_callback, cfh, expose_blob_index,
allow_refresh);
auto internal_iter = NewInternalIterator(
db_iter->GetReadOptions(), cfd, super_version, db_iter->GetArena(),
db_iter->GetReadOptions(), cfh->cfd(), super_version, db_iter->GetArena(),
snapshot, /* allow_unprepared_value */ true, db_iter);
db_iter->SetIterUnderDBIter(internal_iter);
return db_iter;
@ -596,28 +597,29 @@ Status DBImplSecondary::NewIterators(
return Status::NotSupported("snapshot not supported in secondary mode");
} else {
SequenceNumber read_seq(kMaxSequenceNumber);
autovector<std::tuple<ColumnFamilyData*, SuperVersion*>> cfd_to_sv;
autovector<std::tuple<ColumnFamilyHandleImpl*, SuperVersion*>> cfh_to_sv;
const bool check_read_ts =
read_options.timestamp && read_options.timestamp->size() > 0;
for (auto cfh : column_families) {
ColumnFamilyData* cfd = static_cast<ColumnFamilyHandleImpl*>(cfh)->cfd();
for (auto cf : column_families) {
auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(cf);
auto cfd = cfh->cfd();
SuperVersion* sv = cfd->GetReferencedSuperVersion(this);
cfd_to_sv.emplace_back(cfd, sv);
cfh_to_sv.emplace_back(cfh, sv);
if (check_read_ts) {
const Status s =
FailIfReadCollapsedHistory(cfd, sv, *(read_options.timestamp));
if (!s.ok()) {
for (auto prev_entry : cfd_to_sv) {
for (auto prev_entry : cfh_to_sv) {
CleanupSuperVersion(std::get<1>(prev_entry));
}
return s;
}
}
}
assert(cfd_to_sv.size() == column_families.size());
for (auto [cfd, sv] : cfd_to_sv) {
assert(cfh_to_sv.size() == column_families.size());
for (auto [cfh, sv] : cfh_to_sv) {
iterators->push_back(
NewIteratorImpl(read_options, cfd, sv, read_seq, read_callback));
NewIteratorImpl(read_options, cfh, sv, read_seq, read_callback));
}
}
return Status::OK();

View File

@ -112,8 +112,8 @@ class DBImplSecondary : public DBImpl {
ColumnFamilyHandle* column_family) override;
ArenaWrappedDBIter* NewIteratorImpl(const ReadOptions& read_options,
ColumnFamilyData* cfd, SuperVersion* sv,
SequenceNumber snapshot,
ColumnFamilyHandleImpl* cfh,
SuperVersion* sv, SequenceNumber snapshot,
ReadCallback* read_callback,
bool expose_blob_index = false,
bool allow_refresh = true);

View File

@ -43,8 +43,8 @@ DBIter::DBIter(Env* _env, const ReadOptions& read_options,
const Comparator* cmp, InternalIterator* iter,
const Version* version, SequenceNumber s, bool arena_mode,
uint64_t max_sequential_skip_in_iterations,
ReadCallback* read_callback, DBImpl* db_impl,
ColumnFamilyData* cfd, bool expose_blob_index)
ReadCallback* read_callback, ColumnFamilyHandleImpl* cfh,
bool expose_blob_index)
: prefix_extractor_(mutable_cf_options.prefix_extractor.get()),
env_(_env),
clock_(ioptions.clock),
@ -79,8 +79,7 @@ DBIter::DBIter(Env* _env, const ReadOptions& read_options,
is_blob_(false),
arena_mode_(arena_mode),
io_activity_(read_options.io_activity),
db_impl_(db_impl),
cfd_(cfd),
cfh_(cfh),
timestamp_ub_(read_options.timestamp),
timestamp_lb_(read_options.iter_start_ts),
timestamp_size_(timestamp_ub_ ? timestamp_ub_->size() : 0) {
@ -1459,7 +1458,7 @@ void DBIter::Seek(const Slice& target) {
PERF_CPU_TIMER_GUARD(iter_seek_cpu_nanos, clock_);
StopWatch sw(clock_, statistics_, DB_SEEK);
if (db_impl_ != nullptr && cfd_ != nullptr) {
if (cfh_ != nullptr) {
// TODO: What do we do if this returns an error?
Slice lower_bound, upper_bound;
if (iterate_lower_bound_ != nullptr) {
@ -1472,7 +1471,9 @@ void DBIter::Seek(const Slice& target) {
} else {
upper_bound = Slice("");
}
db_impl_->TraceIteratorSeek(cfd_->GetID(), target, lower_bound, upper_bound)
cfh_->db()
->TraceIteratorSeek(cfh_->cfd()->GetID(), target, lower_bound,
upper_bound)
.PermitUncheckedError();
}
@ -1533,7 +1534,7 @@ void DBIter::SeekForPrev(const Slice& target) {
PERF_CPU_TIMER_GUARD(iter_seek_cpu_nanos, clock_);
StopWatch sw(clock_, statistics_, DB_SEEK);
if (db_impl_ != nullptr && cfd_ != nullptr) {
if (cfh_ != nullptr) {
// TODO: What do we do if this returns an error?
Slice lower_bound, upper_bound;
if (iterate_lower_bound_ != nullptr) {
@ -1546,8 +1547,8 @@ void DBIter::SeekForPrev(const Slice& target) {
} else {
upper_bound = Slice("");
}
db_impl_
->TraceIteratorSeekForPrev(cfd_->GetID(), target, lower_bound,
cfh_->db()
->TraceIteratorSeekForPrev(cfh_->cfd()->GetID(), target, lower_bound,
upper_bound)
.PermitUncheckedError();
}
@ -1711,13 +1712,12 @@ Iterator* NewDBIterator(Env* env, const ReadOptions& read_options,
InternalIterator* internal_iter, const Version* version,
const SequenceNumber& sequence,
uint64_t max_sequential_skip_in_iterations,
ReadCallback* read_callback, DBImpl* db_impl,
ColumnFamilyData* cfd, bool expose_blob_index) {
DBIter* db_iter =
new DBIter(env, read_options, ioptions, mutable_cf_options,
user_key_comparator, internal_iter, version, sequence, false,
max_sequential_skip_in_iterations, read_callback, db_impl, cfd,
expose_blob_index);
ReadCallback* read_callback,
ColumnFamilyHandleImpl* cfh, bool expose_blob_index) {
DBIter* db_iter = new DBIter(
env, read_options, ioptions, mutable_cf_options, user_key_comparator,
internal_iter, version, sequence, false,
max_sequential_skip_in_iterations, read_callback, cfh, expose_blob_index);
return db_iter;
}

View File

@ -118,7 +118,7 @@ class DBIter final : public Iterator {
const MutableCFOptions& mutable_cf_options, const Comparator* cmp,
InternalIterator* iter, const Version* version, SequenceNumber s,
bool arena_mode, uint64_t max_sequential_skip_in_iterations,
ReadCallback* read_callback, DBImpl* db_impl, ColumnFamilyData* cfd,
ReadCallback* read_callback, ColumnFamilyHandleImpl* cfh,
bool expose_blob_index);
// No copying allowed
@ -417,8 +417,7 @@ class DBIter final : public Iterator {
MergeContext merge_context_;
LocalStatistics local_stats_;
PinnedIteratorsManager pinned_iters_mgr_;
DBImpl* db_impl_;
ColumnFamilyData* cfd_;
ColumnFamilyHandleImpl* cfh_;
const Slice* const timestamp_ub_;
const Slice* const timestamp_lb_;
const size_t timestamp_size_;
@ -428,15 +427,12 @@ class DBIter final : public Iterator {
// Return a new iterator that converts internal keys (yielded by
// "*internal_iter") that were live at the specified `sequence` number
// into appropriate user keys.
Iterator* NewDBIterator(Env* env, const ReadOptions& read_options,
const ImmutableOptions& ioptions,
const MutableCFOptions& mutable_cf_options,
const Comparator* user_key_comparator,
InternalIterator* internal_iter, const Version* version,
const SequenceNumber& sequence,
uint64_t max_sequential_skip_in_iterations,
ReadCallback* read_callback, DBImpl* db_impl = nullptr,
ColumnFamilyData* cfd = nullptr,
bool expose_blob_index = false);
Iterator* NewDBIterator(
Env* env, const ReadOptions& read_options, const ImmutableOptions& ioptions,
const MutableCFOptions& mutable_cf_options,
const Comparator* user_key_comparator, InternalIterator* internal_iter,
const Version* version, const SequenceNumber& sequence,
uint64_t max_sequential_skip_in_iterations, ReadCallback* read_callback,
ColumnFamilyHandleImpl* cfh = nullptr, bool expose_blob_index = false);
} // namespace ROCKSDB_NAMESPACE

View File

@ -93,8 +93,8 @@ class DBIteratorTest : public DBIteratorBaseTest,
if (column_family == nullptr) {
column_family = db_->DefaultColumnFamily();
}
auto* cfd =
static_cast_with_check<ColumnFamilyHandleImpl>(column_family)->cfd();
auto* cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
auto* cfd = cfh->cfd();
SequenceNumber seq = read_options.snapshot != nullptr
? read_options.snapshot->GetSequenceNumber()
: db_->GetLatestSequenceNumber();
@ -109,7 +109,7 @@ class DBIteratorTest : public DBIteratorBaseTest,
}
DBImpl* db_impl = dbfull();
SuperVersion* super_version = cfd->GetReferencedSuperVersion(db_impl);
return db_impl->NewIteratorImpl(read_options, cfd, super_version, seq,
return db_impl->NewIteratorImpl(read_options, cfh, super_version, seq,
read_callback);
}
@ -3206,13 +3206,13 @@ TEST_F(DBIteratorWithReadCallbackTest, ReadCallback) {
ASSERT_OK(Put("bar", "v7"));
SequenceNumber seq2 = db_->GetLatestSequenceNumber();
auto* cfd =
static_cast_with_check<ColumnFamilyHandleImpl>(db_->DefaultColumnFamily())
->cfd();
auto* cfh = static_cast_with_check<ColumnFamilyHandleImpl>(
db_->DefaultColumnFamily());
auto* cfd = cfh->cfd();
// The iterator are suppose to see data before seq1.
DBImpl* db_impl = dbfull();
SuperVersion* super_version = cfd->GetReferencedSuperVersion(db_impl);
Iterator* iter = db_impl->NewIteratorImpl(ReadOptions(), cfd, super_version,
Iterator* iter = db_impl->NewIteratorImpl(ReadOptions(), cfh, super_version,
seq2, &callback1);
// Seek
@ -3292,7 +3292,7 @@ TEST_F(DBIteratorWithReadCallbackTest, ReadCallback) {
// The iterator is suppose to see data before seq3.
super_version = cfd->GetReferencedSuperVersion(db_impl);
iter = db_impl->NewIteratorImpl(ReadOptions(), cfd, super_version, seq4,
iter = db_impl->NewIteratorImpl(ReadOptions(), cfh, super_version, seq4,
&callback2);
// Seek to "z", which is visible.
iter->Seek("z");

View File

@ -81,12 +81,11 @@ Iterator* SstFileReader::NewIterator(const ReadOptions& roptions) {
? roptions.snapshot->GetSequenceNumber()
: kMaxSequenceNumber;
ArenaWrappedDBIter* res = new ArenaWrappedDBIter();
res->Init(r->options.env, roptions, r->ioptions, r->moptions,
nullptr /* version */, sequence,
r->moptions.max_sequential_skip_in_iterations,
0 /* version_number */, nullptr /* read_callback */,
nullptr /* db_impl */, nullptr /* cfd */,
true /* expose_blob_index */, false /* allow_refresh */);
res->Init(
r->options.env, roptions, r->ioptions, r->moptions, nullptr /* version */,
sequence, r->moptions.max_sequential_skip_in_iterations,
0 /* version_number */, nullptr /* read_callback */, nullptr /* cfh */,
true /* expose_blob_index */, false /* allow_refresh */);
auto internal_iter = r->table_reader->NewIterator(
res->GetReadOptions(), r->moptions.prefix_extractor.get(),
res->GetArena(), false /* skip_filters */,

View File

@ -2128,9 +2128,9 @@ Iterator* BlobDBImpl::NewIterator(const ReadOptions& _read_options) {
if (read_options.io_activity == Env::IOActivity::kUnknown) {
read_options.io_activity = Env::IOActivity::kDBIterator;
}
auto* cfd =
static_cast_with_check<ColumnFamilyHandleImpl>(DefaultColumnFamily())
->cfd();
auto* cfh =
static_cast_with_check<ColumnFamilyHandleImpl>(DefaultColumnFamily());
auto* cfd = cfh->cfd();
// Get a snapshot to avoid blob file get deleted between we
// fetch and index entry and reading from the file.
ManagedSnapshot* own_snapshot = nullptr;
@ -2141,7 +2141,7 @@ Iterator* BlobDBImpl::NewIterator(const ReadOptions& _read_options) {
}
SuperVersion* sv = cfd->GetReferencedSuperVersion(db_impl_);
auto* iter = db_impl_->NewIteratorImpl(
read_options, cfd, sv, snapshot->GetSequenceNumber(),
read_options, cfh, sv, snapshot->GetSequenceNumber(),
nullptr /*read_callback*/, true /*expose_blob_index*/);
return new BlobDBIterator(own_snapshot, iter, this, clock_, statistics_);
}

View File

@ -422,12 +422,12 @@ Iterator* WritePreparedTxnDB::NewIterator(const ReadOptions& _read_options,
own_snapshot = std::make_shared<ManagedSnapshot>(db_impl_, snapshot);
}
assert(snapshot_seq != kMaxSequenceNumber);
auto* cfd =
static_cast_with_check<ColumnFamilyHandleImpl>(column_family)->cfd();
auto* cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
auto* cfd = cfh->cfd();
auto* state =
new IteratorState(this, snapshot_seq, own_snapshot, min_uncommitted);
SuperVersion* super_version = cfd->GetReferencedSuperVersion(db_impl_);
auto* db_iter = db_impl_->NewIteratorImpl(read_options, cfd, super_version,
auto* db_iter = db_impl_->NewIteratorImpl(read_options, cfh, super_version,
snapshot_seq, &state->callback,
expose_blob_index, allow_refresh);
db_iter->RegisterCleanup(CleanupWritePreparedTxnDBIterator, state, nullptr);
@ -471,12 +471,12 @@ Status WritePreparedTxnDB::NewIterators(
iterators->clear();
iterators->reserve(column_families.size());
for (auto* column_family : column_families) {
auto* cfd =
static_cast_with_check<ColumnFamilyHandleImpl>(column_family)->cfd();
auto* cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
auto* cfd = cfh->cfd();
auto* state =
new IteratorState(this, snapshot_seq, own_snapshot, min_uncommitted);
SuperVersion* super_version = cfd->GetReferencedSuperVersion(db_impl_);
auto* db_iter = db_impl_->NewIteratorImpl(read_options, cfd, super_version,
auto* db_iter = db_impl_->NewIteratorImpl(read_options, cfh, super_version,
snapshot_seq, &state->callback,
expose_blob_index, allow_refresh);
db_iter->RegisterCleanup(CleanupWritePreparedTxnDBIterator, state, nullptr);

View File

@ -470,13 +470,13 @@ Iterator* WriteUnpreparedTxnDB::NewIterator(const ReadOptions& _read_options,
min_uncommitted =
static_cast_with_check<const SnapshotImpl>(snapshot)->min_uncommitted_;
auto* cfd =
static_cast_with_check<ColumnFamilyHandleImpl>(column_family)->cfd();
auto* cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
auto* cfd = cfh->cfd();
auto* state =
new IteratorState(this, snapshot_seq, own_snapshot, min_uncommitted, txn);
SuperVersion* super_version = cfd->GetReferencedSuperVersion(db_impl_);
auto* db_iter = db_impl_->NewIteratorImpl(
read_options, cfd, super_version, state->MaxVisibleSeq(),
read_options, cfh, super_version, state->MaxVisibleSeq(),
&state->callback, expose_blob_index, allow_refresh);
db_iter->RegisterCleanup(CleanupWriteUnpreparedTxnDBIterator, state, nullptr);
return db_iter;