From c00c16855d065557562745f7b751f482f42857a9 Mon Sep 17 00:00:00 2001 From: Jay Huh Date: Fri, 1 Mar 2024 10:28:20 -0800 Subject: [PATCH] Access DBImpl* and CFD* by CFHImpl* in Iterators (#12395) Summary: In the current implementation of iterators, `DBImpl*` and `ColumnFamilyData*` are held in `DBIter` and `ArenaWrappedDBIter` for two purposes: tracing and Refresh() API. With the introduction of a new iterator called MultiCfIterator in PR https://github.com/facebook/rocksdb/issues/12153 , which is a cross-column-family iterator that maintains multiple DBIters as child iterators from a consistent database state, we need to make some changes to the existing implementation. The new iterator will still be exposed through the generic Iterator interface with an additional capability to return AttributeGroups (via `attribute_groups()`) which is a list of wide columns grouped by column family. For more information about AttributeGroup, please refer to previous PRs: https://github.com/facebook/rocksdb/issues/11925 #11943, and https://github.com/facebook/rocksdb/issues/11977. To be able to return AttributeGroup in the default single CF iterator created, access to `ColumnFamilyHandle*` within `DBIter` is necessary. However, this is not currently available in `DBIter`. Since `DBImpl*` and `ColumnFamilyData*` can be easily accessed via `ColumnFamilyHandleImpl*`, we have decided to replace the pointers to `ColumnFamilyData` and `DBImpl` in `DBIter` with a pointer to `ColumnFamilyHandleImpl`. Pull Request resolved: https://github.com/facebook/rocksdb/pull/12395 Test Plan: # Summary In the current implementation of iterators, `DBImpl*` and `ColumnFamilyData*` are held in `DBIter` and `ArenaWrappedDBIter` for two purposes: tracing and Refresh() API. With the introduction of a new iterator called MultiCfIterator in PR #12153 , which is a cross-column-family iterator that maintains multiple DBIters as child iterators from a consistent database state, we need to make some changes to the existing implementation. The new iterator will still be exposed through the generic Iterator interface with an additional capability to return AttributeGroups (via `attribute_groups()`) which is a list of wide columns grouped by column family. For more information about AttributeGroup, please refer to previous PRs: #11925 #11943, and #11977. To be able to return AttributeGroup in the default single CF iterator created, access to `ColumnFamilyHandle*` within `DBIter` is necessary. However, this is not currently available in `DBIter`. Since `DBImpl*` and `ColumnFamilyData*` can be easily accessed via `ColumnFamilyHandleImpl*`, we have decided to replace the pointers to `ColumnFamilyData` and `DBImpl` in `DBIter` with a pointer to `ColumnFamilyHandleImpl`. # Test Plan There should be no behavior changes. Existing tests and CI for the correctness tests. **Test for Perf Regression** Build ``` $> make -j64 release ``` Setup ``` $> TEST_TMPDIR=/dev/shm/db_bench ./db_bench -benchmarks="filluniquerandom" -key_size=32 -value_size=512 -num=1000000 -compression_type=none ``` Run ``` TEST_TMPDIR=/dev/shm/db_bench ./db_bench -use_existing_db=1 -benchmarks="newiterator,seekrandom" -cache_size=10485760000 ``` Before the change ``` DB path: [/dev/shm/db_bench/dbbench] newiterator : 0.552 micros/op 1810157 ops/sec 0.552 seconds 1000000 operations; DB path: [/dev/shm/db_bench/dbbench] seekrandom : 4.502 micros/op 222143 ops/sec 4.502 seconds 1000000 operations; (0 of 1000000 found) ``` After the change ``` DB path: [/dev/shm/db_bench/dbbench] newiterator : 0.520 micros/op 1924401 ops/sec 0.520 seconds 1000000 operations; DB path: [/dev/shm/db_bench/dbbench] seekrandom : 4.532 micros/op 220657 ops/sec 4.532 seconds 1000000 operations; (0 of 1000000 found) ``` Reviewed By: pdillinger Differential Revision: D54332713 Pulled By: jaykorean fbshipit-source-id: b28d897ad519e58b1ca82eb068a6319544a4fae5 --- db/arena_wrapped_db_iter.cc | 57 ++++++++++--------- db/arena_wrapped_db_iter.h | 16 +++--- db/blob/db_blob_index_test.cc | 10 ++-- db/column_family.h | 1 + db/db_impl/db_impl.cc | 54 +++++++++--------- db/db_impl/db_impl.h | 4 +- db/db_impl/db_impl_secondary.cc | 36 ++++++------ db/db_impl/db_impl_secondary.h | 4 +- db/db_iter.cc | 32 +++++------ db/db_iter.h | 22 +++---- db/db_iterator_test.cc | 16 +++--- table/sst_file_reader.cc | 11 ++-- utilities/blob_db/blob_db_impl.cc | 8 +-- .../transactions/write_prepared_txn_db.cc | 12 ++-- .../transactions/write_unprepared_txn_db.cc | 6 +- 15 files changed, 143 insertions(+), 146 deletions(-) diff --git a/db/arena_wrapped_db_iter.cc b/db/arena_wrapped_db_iter.cc index e6dcb66962..919ea56650 100644 --- a/db/arena_wrapped_db_iter.cc +++ b/db/arena_wrapped_db_iter.cc @@ -43,14 +43,13 @@ void ArenaWrappedDBIter::Init( Env* env, const ReadOptions& read_options, const ImmutableOptions& ioptions, const MutableCFOptions& mutable_cf_options, const Version* version, const SequenceNumber& sequence, uint64_t max_sequential_skip_in_iteration, - uint64_t version_number, ReadCallback* read_callback, DBImpl* db_impl, - ColumnFamilyData* cfd, bool expose_blob_index, bool allow_refresh) { + uint64_t version_number, ReadCallback* read_callback, + ColumnFamilyHandleImpl* cfh, bool expose_blob_index, bool allow_refresh) { auto mem = arena_.AllocateAligned(sizeof(DBIter)); - db_iter_ = - new (mem) DBIter(env, read_options, ioptions, mutable_cf_options, - ioptions.user_comparator, /* iter */ nullptr, version, - sequence, true, max_sequential_skip_in_iteration, - read_callback, db_impl, cfd, expose_blob_index); + db_iter_ = new (mem) DBIter( + env, read_options, ioptions, mutable_cf_options, ioptions.user_comparator, + /* iter */ nullptr, version, sequence, true, + max_sequential_skip_in_iteration, read_callback, cfh, expose_blob_index); sv_number_ = version_number; read_options_ = read_options; allow_refresh_ = allow_refresh; @@ -65,40 +64,44 @@ void ArenaWrappedDBIter::Init( Status ArenaWrappedDBIter::Refresh() { return Refresh(nullptr); } Status ArenaWrappedDBIter::Refresh(const Snapshot* snapshot) { - if (cfd_ == nullptr || db_impl_ == nullptr || !allow_refresh_) { + if (cfh_ == nullptr || !allow_refresh_) { return Status::NotSupported("Creating renew iterator is not allowed."); } assert(db_iter_ != nullptr); + auto cfd = cfh_->cfd(); + auto db_impl = cfh_->db(); + // TODO(yiwu): For last_seq_same_as_publish_seq_==false, this is not the // correct behavior. Will be corrected automatically when we take a snapshot // here for the case of WritePreparedTxnDB. - uint64_t cur_sv_number = cfd_->GetSuperVersionNumber(); + uint64_t cur_sv_number = cfd->GetSuperVersionNumber(); // If we recreate a new internal iterator below (NewInternalIterator()), // we will pass in read_options_. We need to make sure it // has the right snapshot. read_options_.snapshot = snapshot; TEST_SYNC_POINT("ArenaWrappedDBIter::Refresh:1"); TEST_SYNC_POINT("ArenaWrappedDBIter::Refresh:2"); + auto reinit_internal_iter = [&]() { Env* env = db_iter_->env(); db_iter_->~DBIter(); arena_.~Arena(); new (&arena_) Arena(); - SuperVersion* sv = cfd_->GetReferencedSuperVersion(db_impl_); + SuperVersion* sv = cfd->GetReferencedSuperVersion(db_impl); assert(sv->version_number >= cur_sv_number); - SequenceNumber read_seq = GetSeqNum(db_impl_, snapshot); + SequenceNumber read_seq = GetSeqNum(db_impl, snapshot); if (read_callback_) { read_callback_->Refresh(read_seq); } - Init(env, read_options_, *(cfd_->ioptions()), sv->mutable_cf_options, + Init(env, read_options_, *(cfd->ioptions()), sv->mutable_cf_options, sv->current, read_seq, sv->mutable_cf_options.max_sequential_skip_in_iterations, - sv->version_number, read_callback_, db_impl_, cfd_, expose_blob_index_, + sv->version_number, read_callback_, cfh_, expose_blob_index_, allow_refresh_); - InternalIterator* internal_iter = db_impl_->NewInternalIterator( - read_options_, cfd_, sv, &arena_, read_seq, + InternalIterator* internal_iter = db_impl->NewInternalIterator( + read_options_, cfd, sv, &arena_, read_seq, /* allow_unprepared_value */ true, /* db_iter */ this); SetIterUnderDBIter(internal_iter); }; @@ -107,10 +110,10 @@ Status ArenaWrappedDBIter::Refresh(const Snapshot* snapshot) { reinit_internal_iter(); break; } else { - SequenceNumber read_seq = GetSeqNum(db_impl_, snapshot); + SequenceNumber read_seq = GetSeqNum(db_impl, snapshot); // Refresh range-tombstones in MemTable if (!read_options_.ignore_range_deletions) { - SuperVersion* sv = cfd_->GetThreadLocalSuperVersion(db_impl_); + SuperVersion* sv = cfd->GetThreadLocalSuperVersion(db_impl); TEST_SYNC_POINT_CALLBACK("ArenaWrappedDBIter::Refresh:SV", nullptr); auto t = sv->mem->NewRangeTombstoneIterator( read_options_, read_seq, false /* immutable_memtable */); @@ -123,13 +126,13 @@ Status ArenaWrappedDBIter::Refresh(const Snapshot* snapshot) { // will be freed during db_iter destruction there. if (memtable_range_tombstone_iter_) { assert(!*memtable_range_tombstone_iter_ || - sv_number_ != cfd_->GetSuperVersionNumber()); + sv_number_ != cfd->GetSuperVersionNumber()); } delete t; } else { // current mutable memtable has range tombstones if (!memtable_range_tombstone_iter_) { delete t; - db_impl_->ReturnAndCleanupSuperVersion(cfd_, sv); + db_impl->ReturnAndCleanupSuperVersion(cfd, sv); // The memtable under DBIter did not have range tombstone before // refresh. reinit_internal_iter(); @@ -138,13 +141,13 @@ Status ArenaWrappedDBIter::Refresh(const Snapshot* snapshot) { delete *memtable_range_tombstone_iter_; *memtable_range_tombstone_iter_ = new TruncatedRangeDelIterator( std::unique_ptr(t), - &cfd_->internal_comparator(), nullptr, nullptr); + &cfd->internal_comparator(), nullptr, nullptr); } } - db_impl_->ReturnAndCleanupSuperVersion(cfd_, sv); + db_impl->ReturnAndCleanupSuperVersion(cfd, sv); } // Check again if the latest super version number is changed - uint64_t latest_sv_number = cfd_->GetSuperVersionNumber(); + uint64_t latest_sv_number = cfd->GetSuperVersionNumber(); if (latest_sv_number != cur_sv_number) { // If the super version number is changed after refreshing, // fallback to Re-Init the InternalIterator @@ -163,14 +166,14 @@ ArenaWrappedDBIter* NewArenaWrappedDbIterator( Env* env, const ReadOptions& read_options, const ImmutableOptions& ioptions, const MutableCFOptions& mutable_cf_options, const Version* version, const SequenceNumber& sequence, uint64_t max_sequential_skip_in_iterations, - uint64_t version_number, ReadCallback* read_callback, DBImpl* db_impl, - ColumnFamilyData* cfd, bool expose_blob_index, bool allow_refresh) { + uint64_t version_number, ReadCallback* read_callback, + ColumnFamilyHandleImpl* cfh, bool expose_blob_index, bool allow_refresh) { ArenaWrappedDBIter* iter = new ArenaWrappedDBIter(); iter->Init(env, read_options, ioptions, mutable_cf_options, version, sequence, max_sequential_skip_in_iterations, version_number, read_callback, - db_impl, cfd, expose_blob_index, allow_refresh); - if (db_impl != nullptr && cfd != nullptr && allow_refresh) { - iter->StoreRefreshInfo(db_impl, cfd, read_callback, expose_blob_index); + cfh, expose_blob_index, allow_refresh); + if (cfh != nullptr && allow_refresh) { + iter->StoreRefreshInfo(cfh, read_callback, expose_blob_index); } return iter; diff --git a/db/arena_wrapped_db_iter.h b/db/arena_wrapped_db_iter.h index 2d832cbc98..5c4d2db777 100644 --- a/db/arena_wrapped_db_iter.h +++ b/db/arena_wrapped_db_iter.h @@ -87,15 +87,14 @@ class ArenaWrappedDBIter : public Iterator { const MutableCFOptions& mutable_cf_options, const Version* version, const SequenceNumber& sequence, uint64_t max_sequential_skip_in_iterations, uint64_t version_number, - ReadCallback* read_callback, DBImpl* db_impl, ColumnFamilyData* cfd, + ReadCallback* read_callback, ColumnFamilyHandleImpl* cfh, bool expose_blob_index, bool allow_refresh); // Store some parameters so we can refresh the iterator at a later point // with these same params - void StoreRefreshInfo(DBImpl* db_impl, ColumnFamilyData* cfd, + void StoreRefreshInfo(ColumnFamilyHandleImpl* cfh, ReadCallback* read_callback, bool expose_blob_index) { - db_impl_ = db_impl; - cfd_ = cfd; + cfh_ = cfh; read_callback_ = read_callback; expose_blob_index_ = expose_blob_index; } @@ -104,8 +103,7 @@ class ArenaWrappedDBIter : public Iterator { DBIter* db_iter_ = nullptr; Arena arena_; uint64_t sv_number_; - ColumnFamilyData* cfd_ = nullptr; - DBImpl* db_impl_ = nullptr; + ColumnFamilyHandleImpl* cfh_ = nullptr; ReadOptions read_options_; ReadCallback* read_callback_; bool expose_blob_index_ = false; @@ -116,13 +114,13 @@ class ArenaWrappedDBIter : public Iterator { }; // Generate the arena wrapped iterator class. -// `db_impl` and `cfd` are used for reneweal. If left null, renewal will not +// `cfh` is used for reneweal. If left null, renewal will not // be supported. ArenaWrappedDBIter* NewArenaWrappedDbIterator( Env* env, const ReadOptions& read_options, const ImmutableOptions& ioptions, const MutableCFOptions& mutable_cf_options, const Version* version, const SequenceNumber& sequence, uint64_t max_sequential_skip_in_iterations, uint64_t version_number, ReadCallback* read_callback, - DBImpl* db_impl = nullptr, ColumnFamilyData* cfd = nullptr, - bool expose_blob_index = false, bool allow_refresh = true); + ColumnFamilyHandleImpl* cfh = nullptr, bool expose_blob_index = false, + bool allow_refresh = true); } // namespace ROCKSDB_NAMESPACE diff --git a/db/blob/db_blob_index_test.cc b/db/blob/db_blob_index_test.cc index 8ca4837fff..1edaedb8d6 100644 --- a/db/blob/db_blob_index_test.cc +++ b/db/blob/db_blob_index_test.cc @@ -45,10 +45,10 @@ class DBBlobIndexTest : public DBTestBase { DBBlobIndexTest() : DBTestBase("db_blob_index_test", /*env_do_fsync=*/true) {} ColumnFamilyHandle* cfh() { return dbfull()->DefaultColumnFamily(); } - - ColumnFamilyData* cfd() { - return static_cast_with_check(cfh())->cfd(); + ColumnFamilyHandleImpl* cfh_impl() { + return static_cast_with_check(cfh()); } + ColumnFamilyData* cfd() { return cfh_impl()->cfd(); } Status PutBlobIndex(WriteBatch* batch, const Slice& key, const Slice& blob_index) { @@ -96,11 +96,9 @@ class DBBlobIndexTest : public DBTestBase { } ArenaWrappedDBIter* GetBlobIterator() { - ColumnFamilyData* column_family = cfd(); DBImpl* db_impl = dbfull(); return db_impl->NewIteratorImpl( - ReadOptions(), column_family, - column_family->GetReferencedSuperVersion(db_impl), + ReadOptions(), cfh_impl(), cfd()->GetReferencedSuperVersion(db_impl), db_impl->GetLatestSequenceNumber(), nullptr /*read_callback*/, true /*expose_blob_index*/); } diff --git a/db/column_family.h b/db/column_family.h index ce05c45a25..b6b0ed8d5d 100644 --- a/db/column_family.h +++ b/db/column_family.h @@ -168,6 +168,7 @@ class ColumnFamilyHandleImpl : public ColumnFamilyHandle { // destroy without mutex virtual ~ColumnFamilyHandleImpl(); virtual ColumnFamilyData* cfd() const { return cfd_; } + virtual DBImpl* db() const { return db_; } uint32_t GetID() const override; const std::string& GetName() const override; diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index ab83e86d70..8e83ad0831 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -3618,9 +3618,9 @@ Iterator* DBImpl::NewIterator(const ReadOptions& _read_options, } auto cfh = static_cast_with_check(column_family); + assert(cfh != nullptr); ColumnFamilyData* cfd = cfh->cfd(); assert(cfd != nullptr); - ReadCallback* read_callback = nullptr; // No read callback provided. SuperVersion* sv = cfd->GetReferencedSuperVersion(this); if (read_options.timestamp && read_options.timestamp->size() > 0) { const Status s = @@ -3636,24 +3636,24 @@ Iterator* DBImpl::NewIterator(const ReadOptions& _read_options, result = NewDBIterator( env_, read_options, *cfd->ioptions(), sv->mutable_cf_options, cfd->user_comparator(), iter, sv->current, kMaxSequenceNumber, - sv->mutable_cf_options.max_sequential_skip_in_iterations, read_callback, - this, cfd); + sv->mutable_cf_options.max_sequential_skip_in_iterations, + nullptr /* read_callback */, cfh); } else { // Note: no need to consider the special case of // last_seq_same_as_publish_seq_==false since NewIterator is overridden in // WritePreparedTxnDB - result = NewIteratorImpl(read_options, cfd, sv, + result = NewIteratorImpl(read_options, cfh, sv, (read_options.snapshot != nullptr) ? read_options.snapshot->GetSequenceNumber() : kMaxSequenceNumber, - read_callback); + nullptr /* read_callback */); } return result; } ArenaWrappedDBIter* DBImpl::NewIteratorImpl( - const ReadOptions& read_options, ColumnFamilyData* cfd, SuperVersion* sv, - SequenceNumber snapshot, ReadCallback* read_callback, + const ReadOptions& read_options, ColumnFamilyHandleImpl* cfh, + SuperVersion* sv, SequenceNumber snapshot, ReadCallback* read_callback, bool expose_blob_index, bool allow_refresh) { TEST_SYNC_POINT("DBImpl::NewIterator:1"); TEST_SYNC_POINT("DBImpl::NewIterator:2"); @@ -3716,13 +3716,13 @@ ArenaWrappedDBIter* DBImpl::NewIteratorImpl( // likely that any iterator pointer is close to the iterator it points to so // that they are likely to be in the same cache line and/or page. ArenaWrappedDBIter* db_iter = NewArenaWrappedDbIterator( - env_, read_options, *cfd->ioptions(), sv->mutable_cf_options, sv->current, - snapshot, sv->mutable_cf_options.max_sequential_skip_in_iterations, - sv->version_number, read_callback, this, cfd, expose_blob_index, - allow_refresh); + env_, read_options, *cfh->cfd()->ioptions(), sv->mutable_cf_options, + sv->current, snapshot, + sv->mutable_cf_options.max_sequential_skip_in_iterations, + sv->version_number, read_callback, cfh, expose_blob_index, allow_refresh); InternalIterator* internal_iter = NewInternalIterator( - db_iter->GetReadOptions(), cfd, sv, db_iter->GetArena(), snapshot, + db_iter->GetReadOptions(), cfh->cfd(), sv, db_iter->GetArena(), snapshot, /* allow_unprepared_value */ true, db_iter); db_iter->SetIterUnderDBIter(internal_iter); @@ -3769,37 +3769,37 @@ Status DBImpl::NewIterators( } } - ReadCallback* read_callback = nullptr; // No read callback provided. iterators->clear(); iterators->reserve(column_families.size()); - autovector> cfd_to_sv; + autovector> cfh_to_sv; const bool check_read_ts = read_options.timestamp && read_options.timestamp->size() > 0; - for (auto cfh : column_families) { - auto cfd = static_cast_with_check(cfh)->cfd(); + for (auto cf : column_families) { + auto cfh = static_cast_with_check(cf); + auto cfd = cfh->cfd(); SuperVersion* sv = cfd->GetReferencedSuperVersion(this); - cfd_to_sv.emplace_back(cfd, sv); + cfh_to_sv.emplace_back(cfh, sv); if (check_read_ts) { const Status s = FailIfReadCollapsedHistory(cfd, sv, *(read_options.timestamp)); if (!s.ok()) { - for (auto prev_entry : cfd_to_sv) { + for (auto prev_entry : cfh_to_sv) { CleanupSuperVersion(std::get<1>(prev_entry)); } return s; } } } - assert(cfd_to_sv.size() == column_families.size()); + assert(cfh_to_sv.size() == column_families.size()); if (read_options.tailing) { - for (auto [cfd, sv] : cfd_to_sv) { - auto iter = new ForwardIterator(this, read_options, cfd, sv, + for (auto [cfh, sv] : cfh_to_sv) { + auto iter = new ForwardIterator(this, read_options, cfh->cfd(), sv, /* allow_unprepared_value */ true); iterators->push_back(NewDBIterator( - env_, read_options, *cfd->ioptions(), sv->mutable_cf_options, - cfd->user_comparator(), iter, sv->current, kMaxSequenceNumber, + env_, read_options, *cfh->cfd()->ioptions(), sv->mutable_cf_options, + cfh->cfd()->user_comparator(), iter, sv->current, kMaxSequenceNumber, sv->mutable_cf_options.max_sequential_skip_in_iterations, - read_callback, this, cfd)); + nullptr /*read_callback*/, cfh)); } } else { // Note: no need to consider the special case of @@ -3808,9 +3808,9 @@ Status DBImpl::NewIterators( auto snapshot = read_options.snapshot != nullptr ? read_options.snapshot->GetSequenceNumber() : versions_->LastSequence(); - for (auto [cfd, sv] : cfd_to_sv) { - iterators->push_back( - NewIteratorImpl(read_options, cfd, sv, snapshot, read_callback)); + for (auto [cfh, sv] : cfh_to_sv) { + iterators->push_back(NewIteratorImpl(read_options, cfh, sv, snapshot, + nullptr /*read_callback*/)); } } diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index eb0f6e0430..327a49344e 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -646,8 +646,8 @@ class DBImpl : public DB { // If `snapshot` == kMaxSequenceNumber, set a recent one inside the file. ArenaWrappedDBIter* NewIteratorImpl(const ReadOptions& options, - ColumnFamilyData* cfd, SuperVersion* sv, - SequenceNumber snapshot, + ColumnFamilyHandleImpl* cfh, + SuperVersion* sv, SequenceNumber snapshot, ReadCallback* read_callback, bool expose_blob_index = false, bool allow_refresh = true); diff --git a/db/db_impl/db_impl_secondary.cc b/db/db_impl/db_impl_secondary.cc index 235a528ba0..29c8990c9b 100644 --- a/db/db_impl/db_impl_secondary.cc +++ b/db/db_impl/db_impl_secondary.cc @@ -497,8 +497,8 @@ Iterator* DBImplSecondary::NewIterator(const ReadOptions& _read_options, Iterator* result = nullptr; auto cfh = static_cast_with_check(column_family); + assert(cfh != nullptr); auto cfd = cfh->cfd(); - ReadCallback* read_callback = nullptr; // No read callback provided. if (read_options.tailing) { return NewErrorIterator(Status::NotSupported( "tailing iterator not supported in secondary mode")); @@ -517,27 +517,28 @@ Iterator* DBImplSecondary::NewIterator(const ReadOptions& _read_options, return NewErrorIterator(s); } } - result = NewIteratorImpl(read_options, cfd, sv, snapshot, read_callback); + result = NewIteratorImpl(read_options, cfh, sv, snapshot, + nullptr /*read_callback*/); } return result; } ArenaWrappedDBIter* DBImplSecondary::NewIteratorImpl( - const ReadOptions& read_options, ColumnFamilyData* cfd, + const ReadOptions& read_options, ColumnFamilyHandleImpl* cfh, SuperVersion* super_version, SequenceNumber snapshot, ReadCallback* read_callback, bool expose_blob_index, bool allow_refresh) { - assert(nullptr != cfd); + assert(nullptr != cfh); assert(snapshot == kMaxSequenceNumber); snapshot = versions_->LastSequence(); assert(snapshot != kMaxSequenceNumber); auto db_iter = NewArenaWrappedDbIterator( - env_, read_options, *cfd->ioptions(), super_version->mutable_cf_options, - super_version->current, snapshot, + env_, read_options, *cfh->cfd()->ioptions(), + super_version->mutable_cf_options, super_version->current, snapshot, super_version->mutable_cf_options.max_sequential_skip_in_iterations, - super_version->version_number, read_callback, this, cfd, - expose_blob_index, allow_refresh); + super_version->version_number, read_callback, cfh, expose_blob_index, + allow_refresh); auto internal_iter = NewInternalIterator( - db_iter->GetReadOptions(), cfd, super_version, db_iter->GetArena(), + db_iter->GetReadOptions(), cfh->cfd(), super_version, db_iter->GetArena(), snapshot, /* allow_unprepared_value */ true, db_iter); db_iter->SetIterUnderDBIter(internal_iter); return db_iter; @@ -596,28 +597,29 @@ Status DBImplSecondary::NewIterators( return Status::NotSupported("snapshot not supported in secondary mode"); } else { SequenceNumber read_seq(kMaxSequenceNumber); - autovector> cfd_to_sv; + autovector> cfh_to_sv; const bool check_read_ts = read_options.timestamp && read_options.timestamp->size() > 0; - for (auto cfh : column_families) { - ColumnFamilyData* cfd = static_cast(cfh)->cfd(); + for (auto cf : column_families) { + auto cfh = static_cast_with_check(cf); + auto cfd = cfh->cfd(); SuperVersion* sv = cfd->GetReferencedSuperVersion(this); - cfd_to_sv.emplace_back(cfd, sv); + cfh_to_sv.emplace_back(cfh, sv); if (check_read_ts) { const Status s = FailIfReadCollapsedHistory(cfd, sv, *(read_options.timestamp)); if (!s.ok()) { - for (auto prev_entry : cfd_to_sv) { + for (auto prev_entry : cfh_to_sv) { CleanupSuperVersion(std::get<1>(prev_entry)); } return s; } } } - assert(cfd_to_sv.size() == column_families.size()); - for (auto [cfd, sv] : cfd_to_sv) { + assert(cfh_to_sv.size() == column_families.size()); + for (auto [cfh, sv] : cfh_to_sv) { iterators->push_back( - NewIteratorImpl(read_options, cfd, sv, read_seq, read_callback)); + NewIteratorImpl(read_options, cfh, sv, read_seq, read_callback)); } } return Status::OK(); diff --git a/db/db_impl/db_impl_secondary.h b/db/db_impl/db_impl_secondary.h index a1878d9c7a..f1a40af379 100644 --- a/db/db_impl/db_impl_secondary.h +++ b/db/db_impl/db_impl_secondary.h @@ -112,8 +112,8 @@ class DBImplSecondary : public DBImpl { ColumnFamilyHandle* column_family) override; ArenaWrappedDBIter* NewIteratorImpl(const ReadOptions& read_options, - ColumnFamilyData* cfd, SuperVersion* sv, - SequenceNumber snapshot, + ColumnFamilyHandleImpl* cfh, + SuperVersion* sv, SequenceNumber snapshot, ReadCallback* read_callback, bool expose_blob_index = false, bool allow_refresh = true); diff --git a/db/db_iter.cc b/db/db_iter.cc index 55125883ad..1547ec0a2e 100644 --- a/db/db_iter.cc +++ b/db/db_iter.cc @@ -43,8 +43,8 @@ DBIter::DBIter(Env* _env, const ReadOptions& read_options, const Comparator* cmp, InternalIterator* iter, const Version* version, SequenceNumber s, bool arena_mode, uint64_t max_sequential_skip_in_iterations, - ReadCallback* read_callback, DBImpl* db_impl, - ColumnFamilyData* cfd, bool expose_blob_index) + ReadCallback* read_callback, ColumnFamilyHandleImpl* cfh, + bool expose_blob_index) : prefix_extractor_(mutable_cf_options.prefix_extractor.get()), env_(_env), clock_(ioptions.clock), @@ -79,8 +79,7 @@ DBIter::DBIter(Env* _env, const ReadOptions& read_options, is_blob_(false), arena_mode_(arena_mode), io_activity_(read_options.io_activity), - db_impl_(db_impl), - cfd_(cfd), + cfh_(cfh), timestamp_ub_(read_options.timestamp), timestamp_lb_(read_options.iter_start_ts), timestamp_size_(timestamp_ub_ ? timestamp_ub_->size() : 0) { @@ -1459,7 +1458,7 @@ void DBIter::Seek(const Slice& target) { PERF_CPU_TIMER_GUARD(iter_seek_cpu_nanos, clock_); StopWatch sw(clock_, statistics_, DB_SEEK); - if (db_impl_ != nullptr && cfd_ != nullptr) { + if (cfh_ != nullptr) { // TODO: What do we do if this returns an error? Slice lower_bound, upper_bound; if (iterate_lower_bound_ != nullptr) { @@ -1472,7 +1471,9 @@ void DBIter::Seek(const Slice& target) { } else { upper_bound = Slice(""); } - db_impl_->TraceIteratorSeek(cfd_->GetID(), target, lower_bound, upper_bound) + cfh_->db() + ->TraceIteratorSeek(cfh_->cfd()->GetID(), target, lower_bound, + upper_bound) .PermitUncheckedError(); } @@ -1533,7 +1534,7 @@ void DBIter::SeekForPrev(const Slice& target) { PERF_CPU_TIMER_GUARD(iter_seek_cpu_nanos, clock_); StopWatch sw(clock_, statistics_, DB_SEEK); - if (db_impl_ != nullptr && cfd_ != nullptr) { + if (cfh_ != nullptr) { // TODO: What do we do if this returns an error? Slice lower_bound, upper_bound; if (iterate_lower_bound_ != nullptr) { @@ -1546,8 +1547,8 @@ void DBIter::SeekForPrev(const Slice& target) { } else { upper_bound = Slice(""); } - db_impl_ - ->TraceIteratorSeekForPrev(cfd_->GetID(), target, lower_bound, + cfh_->db() + ->TraceIteratorSeekForPrev(cfh_->cfd()->GetID(), target, lower_bound, upper_bound) .PermitUncheckedError(); } @@ -1711,13 +1712,12 @@ Iterator* NewDBIterator(Env* env, const ReadOptions& read_options, InternalIterator* internal_iter, const Version* version, const SequenceNumber& sequence, uint64_t max_sequential_skip_in_iterations, - ReadCallback* read_callback, DBImpl* db_impl, - ColumnFamilyData* cfd, bool expose_blob_index) { - DBIter* db_iter = - new DBIter(env, read_options, ioptions, mutable_cf_options, - user_key_comparator, internal_iter, version, sequence, false, - max_sequential_skip_in_iterations, read_callback, db_impl, cfd, - expose_blob_index); + ReadCallback* read_callback, + ColumnFamilyHandleImpl* cfh, bool expose_blob_index) { + DBIter* db_iter = new DBIter( + env, read_options, ioptions, mutable_cf_options, user_key_comparator, + internal_iter, version, sequence, false, + max_sequential_skip_in_iterations, read_callback, cfh, expose_blob_index); return db_iter; } diff --git a/db/db_iter.h b/db/db_iter.h index e618dab919..1882b02b23 100644 --- a/db/db_iter.h +++ b/db/db_iter.h @@ -118,7 +118,7 @@ class DBIter final : public Iterator { const MutableCFOptions& mutable_cf_options, const Comparator* cmp, InternalIterator* iter, const Version* version, SequenceNumber s, bool arena_mode, uint64_t max_sequential_skip_in_iterations, - ReadCallback* read_callback, DBImpl* db_impl, ColumnFamilyData* cfd, + ReadCallback* read_callback, ColumnFamilyHandleImpl* cfh, bool expose_blob_index); // No copying allowed @@ -417,8 +417,7 @@ class DBIter final : public Iterator { MergeContext merge_context_; LocalStatistics local_stats_; PinnedIteratorsManager pinned_iters_mgr_; - DBImpl* db_impl_; - ColumnFamilyData* cfd_; + ColumnFamilyHandleImpl* cfh_; const Slice* const timestamp_ub_; const Slice* const timestamp_lb_; const size_t timestamp_size_; @@ -428,15 +427,12 @@ class DBIter final : public Iterator { // Return a new iterator that converts internal keys (yielded by // "*internal_iter") that were live at the specified `sequence` number // into appropriate user keys. -Iterator* NewDBIterator(Env* env, const ReadOptions& read_options, - const ImmutableOptions& ioptions, - const MutableCFOptions& mutable_cf_options, - const Comparator* user_key_comparator, - InternalIterator* internal_iter, const Version* version, - const SequenceNumber& sequence, - uint64_t max_sequential_skip_in_iterations, - ReadCallback* read_callback, DBImpl* db_impl = nullptr, - ColumnFamilyData* cfd = nullptr, - bool expose_blob_index = false); +Iterator* NewDBIterator( + Env* env, const ReadOptions& read_options, const ImmutableOptions& ioptions, + const MutableCFOptions& mutable_cf_options, + const Comparator* user_key_comparator, InternalIterator* internal_iter, + const Version* version, const SequenceNumber& sequence, + uint64_t max_sequential_skip_in_iterations, ReadCallback* read_callback, + ColumnFamilyHandleImpl* cfh = nullptr, bool expose_blob_index = false); } // namespace ROCKSDB_NAMESPACE diff --git a/db/db_iterator_test.cc b/db/db_iterator_test.cc index 1c17974bbe..39ef21db16 100644 --- a/db/db_iterator_test.cc +++ b/db/db_iterator_test.cc @@ -93,8 +93,8 @@ class DBIteratorTest : public DBIteratorBaseTest, if (column_family == nullptr) { column_family = db_->DefaultColumnFamily(); } - auto* cfd = - static_cast_with_check(column_family)->cfd(); + auto* cfh = static_cast_with_check(column_family); + auto* cfd = cfh->cfd(); SequenceNumber seq = read_options.snapshot != nullptr ? read_options.snapshot->GetSequenceNumber() : db_->GetLatestSequenceNumber(); @@ -109,7 +109,7 @@ class DBIteratorTest : public DBIteratorBaseTest, } DBImpl* db_impl = dbfull(); SuperVersion* super_version = cfd->GetReferencedSuperVersion(db_impl); - return db_impl->NewIteratorImpl(read_options, cfd, super_version, seq, + return db_impl->NewIteratorImpl(read_options, cfh, super_version, seq, read_callback); } @@ -3206,13 +3206,13 @@ TEST_F(DBIteratorWithReadCallbackTest, ReadCallback) { ASSERT_OK(Put("bar", "v7")); SequenceNumber seq2 = db_->GetLatestSequenceNumber(); - auto* cfd = - static_cast_with_check(db_->DefaultColumnFamily()) - ->cfd(); + auto* cfh = static_cast_with_check( + db_->DefaultColumnFamily()); + auto* cfd = cfh->cfd(); // The iterator are suppose to see data before seq1. DBImpl* db_impl = dbfull(); SuperVersion* super_version = cfd->GetReferencedSuperVersion(db_impl); - Iterator* iter = db_impl->NewIteratorImpl(ReadOptions(), cfd, super_version, + Iterator* iter = db_impl->NewIteratorImpl(ReadOptions(), cfh, super_version, seq2, &callback1); // Seek @@ -3292,7 +3292,7 @@ TEST_F(DBIteratorWithReadCallbackTest, ReadCallback) { // The iterator is suppose to see data before seq3. super_version = cfd->GetReferencedSuperVersion(db_impl); - iter = db_impl->NewIteratorImpl(ReadOptions(), cfd, super_version, seq4, + iter = db_impl->NewIteratorImpl(ReadOptions(), cfh, super_version, seq4, &callback2); // Seek to "z", which is visible. iter->Seek("z"); diff --git a/table/sst_file_reader.cc b/table/sst_file_reader.cc index b34496f756..39da7038b2 100644 --- a/table/sst_file_reader.cc +++ b/table/sst_file_reader.cc @@ -81,12 +81,11 @@ Iterator* SstFileReader::NewIterator(const ReadOptions& roptions) { ? roptions.snapshot->GetSequenceNumber() : kMaxSequenceNumber; ArenaWrappedDBIter* res = new ArenaWrappedDBIter(); - res->Init(r->options.env, roptions, r->ioptions, r->moptions, - nullptr /* version */, sequence, - r->moptions.max_sequential_skip_in_iterations, - 0 /* version_number */, nullptr /* read_callback */, - nullptr /* db_impl */, nullptr /* cfd */, - true /* expose_blob_index */, false /* allow_refresh */); + res->Init( + r->options.env, roptions, r->ioptions, r->moptions, nullptr /* version */, + sequence, r->moptions.max_sequential_skip_in_iterations, + 0 /* version_number */, nullptr /* read_callback */, nullptr /* cfh */, + true /* expose_blob_index */, false /* allow_refresh */); auto internal_iter = r->table_reader->NewIterator( res->GetReadOptions(), r->moptions.prefix_extractor.get(), res->GetArena(), false /* skip_filters */, diff --git a/utilities/blob_db/blob_db_impl.cc b/utilities/blob_db/blob_db_impl.cc index 5d3cf03c92..c4c336a357 100644 --- a/utilities/blob_db/blob_db_impl.cc +++ b/utilities/blob_db/blob_db_impl.cc @@ -2128,9 +2128,9 @@ Iterator* BlobDBImpl::NewIterator(const ReadOptions& _read_options) { if (read_options.io_activity == Env::IOActivity::kUnknown) { read_options.io_activity = Env::IOActivity::kDBIterator; } - auto* cfd = - static_cast_with_check(DefaultColumnFamily()) - ->cfd(); + auto* cfh = + static_cast_with_check(DefaultColumnFamily()); + auto* cfd = cfh->cfd(); // Get a snapshot to avoid blob file get deleted between we // fetch and index entry and reading from the file. ManagedSnapshot* own_snapshot = nullptr; @@ -2141,7 +2141,7 @@ Iterator* BlobDBImpl::NewIterator(const ReadOptions& _read_options) { } SuperVersion* sv = cfd->GetReferencedSuperVersion(db_impl_); auto* iter = db_impl_->NewIteratorImpl( - read_options, cfd, sv, snapshot->GetSequenceNumber(), + read_options, cfh, sv, snapshot->GetSequenceNumber(), nullptr /*read_callback*/, true /*expose_blob_index*/); return new BlobDBIterator(own_snapshot, iter, this, clock_, statistics_); } diff --git a/utilities/transactions/write_prepared_txn_db.cc b/utilities/transactions/write_prepared_txn_db.cc index f6b83f32fa..3641a1e6a8 100644 --- a/utilities/transactions/write_prepared_txn_db.cc +++ b/utilities/transactions/write_prepared_txn_db.cc @@ -422,12 +422,12 @@ Iterator* WritePreparedTxnDB::NewIterator(const ReadOptions& _read_options, own_snapshot = std::make_shared(db_impl_, snapshot); } assert(snapshot_seq != kMaxSequenceNumber); - auto* cfd = - static_cast_with_check(column_family)->cfd(); + auto* cfh = static_cast_with_check(column_family); + auto* cfd = cfh->cfd(); auto* state = new IteratorState(this, snapshot_seq, own_snapshot, min_uncommitted); SuperVersion* super_version = cfd->GetReferencedSuperVersion(db_impl_); - auto* db_iter = db_impl_->NewIteratorImpl(read_options, cfd, super_version, + auto* db_iter = db_impl_->NewIteratorImpl(read_options, cfh, super_version, snapshot_seq, &state->callback, expose_blob_index, allow_refresh); db_iter->RegisterCleanup(CleanupWritePreparedTxnDBIterator, state, nullptr); @@ -471,12 +471,12 @@ Status WritePreparedTxnDB::NewIterators( iterators->clear(); iterators->reserve(column_families.size()); for (auto* column_family : column_families) { - auto* cfd = - static_cast_with_check(column_family)->cfd(); + auto* cfh = static_cast_with_check(column_family); + auto* cfd = cfh->cfd(); auto* state = new IteratorState(this, snapshot_seq, own_snapshot, min_uncommitted); SuperVersion* super_version = cfd->GetReferencedSuperVersion(db_impl_); - auto* db_iter = db_impl_->NewIteratorImpl(read_options, cfd, super_version, + auto* db_iter = db_impl_->NewIteratorImpl(read_options, cfh, super_version, snapshot_seq, &state->callback, expose_blob_index, allow_refresh); db_iter->RegisterCleanup(CleanupWritePreparedTxnDBIterator, state, nullptr); diff --git a/utilities/transactions/write_unprepared_txn_db.cc b/utilities/transactions/write_unprepared_txn_db.cc index 8aecda26eb..304a3c200d 100644 --- a/utilities/transactions/write_unprepared_txn_db.cc +++ b/utilities/transactions/write_unprepared_txn_db.cc @@ -470,13 +470,13 @@ Iterator* WriteUnpreparedTxnDB::NewIterator(const ReadOptions& _read_options, min_uncommitted = static_cast_with_check(snapshot)->min_uncommitted_; - auto* cfd = - static_cast_with_check(column_family)->cfd(); + auto* cfh = static_cast_with_check(column_family); + auto* cfd = cfh->cfd(); auto* state = new IteratorState(this, snapshot_seq, own_snapshot, min_uncommitted, txn); SuperVersion* super_version = cfd->GetReferencedSuperVersion(db_impl_); auto* db_iter = db_impl_->NewIteratorImpl( - read_options, cfd, super_version, state->MaxVisibleSeq(), + read_options, cfh, super_version, state->MaxVisibleSeq(), &state->callback, expose_blob_index, allow_refresh); db_iter->RegisterCleanup(CleanupWriteUnpreparedTxnDBIterator, state, nullptr); return db_iter;