From 2c9aa69a93f3ff960eed6267f4c0a5741c6ba5ff Mon Sep 17 00:00:00 2001 From: Levi Tamasi Date: Thu, 10 Oct 2024 15:38:59 -0700 Subject: [PATCH] Refactor the BlobDB-related parts of DBIter (#13061) Summary: Pull Request resolved: https://github.com/facebook/rocksdb/pull/13061 As groundwork for further changes, the patch refactors the BlobDB-related parts of `DBIter` by 1) introducing a new internal helper class `DBIter::BlobReader` that encapsulates all members needed to retrieve a blob value (namely, `Version` and the `ReadOptions` fields) and 2) factoring out and cleaning up some duplicate logic related to resolving blob references in the non-Merge (see `SetValueAndColumnsFromBlob`) and Merge (see `MergeWithBlobBaseValue`) cases. Reviewed By: jowlyzhang Differential Revision: D64078099 fbshipit-source-id: 22d5bd93e6e5be5cc9ecf6c4ee6954f2eb016aff --- db/db_iter.cc | 150 ++++++++++++++++++++++++-------------------------- db/db_iter.h | 52 +++++++++++------ 2 files changed, 107 insertions(+), 95 deletions(-) diff --git a/db/db_iter.cc b/db/db_iter.cc index e02586377f..c632cb67b7 100644 --- a/db/db_iter.cc +++ b/db/db_iter.cc @@ -52,7 +52,9 @@ DBIter::DBIter(Env* _env, const ReadOptions& read_options, user_comparator_(cmp), merge_operator_(ioptions.merge_operator.get()), iter_(iter), - version_(version), + blob_reader_(version, read_options.read_tier, + read_options.verify_checksums, read_options.fill_cache, + read_options.io_activity), read_callback_(read_callback), sequence_(s), statistics_(ioptions.stats), @@ -71,13 +73,9 @@ DBIter::DBIter(Env* _env, const ReadOptions& read_options, expect_total_order_inner_iter_(prefix_extractor_ == nullptr || read_options.total_order_seek || read_options.auto_prefix_mode), - read_tier_(read_options.read_tier), - fill_cache_(read_options.fill_cache), - verify_checksums_(read_options.verify_checksums), expose_blob_index_(expose_blob_index), is_blob_(false), arena_mode_(arena_mode), - io_activity_(read_options.io_activity), cfh_(cfh), timestamp_ub_(read_options.timestamp), timestamp_lb_(read_options.iter_start_ts), @@ -151,7 +149,7 @@ void DBIter::Next() { PERF_CPU_TIMER_GUARD(iter_next_cpu_nanos, clock_); // Release temporarily pinned blocks from last operation ReleaseTempPinnedData(); - ResetBlobValue(); + ResetBlobData(); ResetValueAndColumns(); local_stats_.skip_count_ += num_internal_keys_skipped_; local_stats_.skip_count_--; @@ -194,29 +192,21 @@ void DBIter::Next() { } } -bool DBIter::SetBlobValueIfNeeded(const Slice& user_key, - const Slice& blob_index) { - assert(!is_blob_); +Status DBIter::BlobReader::RetrieveAndSetBlobValue(const Slice& user_key, + const Slice& blob_index) { assert(blob_value_.empty()); - if (expose_blob_index_) { // Stacked BlobDB implementation - is_blob_ = true; - return true; - } - if (!version_) { - status_ = Status::Corruption("Encountered unexpected blob index."); - valid_ = false; - return false; + return Status::Corruption("Encountered unexpected blob index."); } // TODO: consider moving ReadOptions from ArenaWrappedDBIter to DBIter to // avoid having to copy options back and forth. - // TODO: plumb Env::IOActivity, Env::IOPriority + // TODO: plumb Env::IOPriority ReadOptions read_options; read_options.read_tier = read_tier_; - read_options.fill_cache = fill_cache_; read_options.verify_checksums = verify_checksums_; + read_options.fill_cache = fill_cache_; read_options.io_activity = io_activity_; constexpr FilePrefetchBuffer* prefetch_buffer = nullptr; constexpr uint64_t* bytes_read = nullptr; @@ -224,13 +214,33 @@ bool DBIter::SetBlobValueIfNeeded(const Slice& user_key, const Status s = version_->GetBlob(read_options, user_key, blob_index, prefetch_buffer, &blob_value_, bytes_read); + if (!s.ok()) { + return s; + } + + return Status::OK(); +} + +bool DBIter::SetValueAndColumnsFromBlob(const Slice& user_key, + const Slice& blob_index) { + assert(!is_blob_); + is_blob_ = true; + + if (expose_blob_index_) { + SetValueAndColumnsFromPlain(blob_index); + return true; + } + + const Status s = blob_reader_.RetrieveAndSetBlobValue(user_key, blob_index); if (!s.ok()) { status_ = s; valid_ = false; + is_blob_ = false; return false; } - is_blob_ = true; + SetValueAndColumnsFromPlain(blob_reader_.GetBlobValue()); + return true; } @@ -420,12 +430,9 @@ bool DBIter::FindNextUserEntryInternal(bool skipping_saved_key, } if (ikey_.type == kTypeBlobIndex) { - if (!SetBlobValueIfNeeded(ikey_.user_key, iter_.value())) { + if (!SetValueAndColumnsFromBlob(ikey_.user_key, iter_.value())) { return false; } - - SetValueAndColumnsFromPlain(expose_blob_index_ ? iter_.value() - : blob_value_); } else if (ikey_.type == kTypeWideColumnEntity) { if (!SetValueAndColumnsFromEntity(iter_.value())) { return false; @@ -619,23 +626,9 @@ bool DBIter::MergeValuesNewToOld() { iter_.value(), iter_.iter()->IsValuePinned() /* operand_pinned */); PERF_COUNTER_ADD(internal_merge_count, 1); } else if (kTypeBlobIndex == ikey.type) { - if (expose_blob_index_) { - status_ = - Status::NotSupported("BlobDB does not support merge operator."); - valid_ = false; + if (!MergeWithBlobBaseValue(iter_.value(), ikey.user_key)) { return false; } - // hit a put, merge the put value with operands and store the - // final result in saved_value_. We are done! - if (!SetBlobValueIfNeeded(ikey.user_key, iter_.value())) { - return false; - } - valid_ = true; - if (!MergeWithPlainBaseValue(blob_value_, ikey.user_key)) { - return false; - } - - ResetBlobValue(); // iter_ is positioned after put iter_.Next(); @@ -643,6 +636,7 @@ bool DBIter::MergeValuesNewToOld() { valid_ = false; return false; } + return true; } else if (kTypeWideColumnEntity == ikey.type) { if (!MergeWithWideColumnBaseValue(iter_.value(), ikey.user_key)) { @@ -689,7 +683,7 @@ void DBIter::Prev() { PERF_COUNTER_ADD(iter_prev_count, 1); PERF_CPU_TIMER_GUARD(iter_prev_cpu_nanos, clock_); ReleaseTempPinnedData(); - ResetBlobValue(); + ResetBlobData(); ResetValueAndColumns(); ResetInternalKeysSkippedCounter(); bool ok = true; @@ -1041,21 +1035,9 @@ bool DBIter::FindValueForCurrentKey() { } return true; } else if (last_not_merge_type == kTypeBlobIndex) { - if (expose_blob_index_) { - status_ = - Status::NotSupported("BlobDB does not support merge operator."); - valid_ = false; + if (!MergeWithBlobBaseValue(pinned_value_, saved_key_.GetUserKey())) { return false; } - if (!SetBlobValueIfNeeded(saved_key_.GetUserKey(), pinned_value_)) { - return false; - } - valid_ = true; - if (!MergeWithPlainBaseValue(blob_value_, saved_key_.GetUserKey())) { - return false; - } - - ResetBlobValue(); return true; } else if (last_not_merge_type == kTypeWideColumnEntity) { @@ -1080,13 +1062,9 @@ bool DBIter::FindValueForCurrentKey() { break; case kTypeBlobIndex: - if (!SetBlobValueIfNeeded(saved_key_.GetUserKey(), pinned_value_)) { + if (!SetValueAndColumnsFromBlob(saved_key_.GetUserKey(), pinned_value_)) { return false; } - - SetValueAndColumnsFromPlain(expose_blob_index_ ? pinned_value_ - : blob_value_); - break; case kTypeWideColumnEntity: if (!SetValueAndColumnsFromEntity(pinned_value_)) { @@ -1190,12 +1168,9 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() { pinned_value_ = iter_.value(); } if (ikey.type == kTypeBlobIndex) { - if (!SetBlobValueIfNeeded(ikey.user_key, pinned_value_)) { + if (!SetValueAndColumnsFromBlob(ikey.user_key, pinned_value_)) { return false; } - - SetValueAndColumnsFromPlain(expose_blob_index_ ? pinned_value_ - : blob_value_); } else if (ikey.type == kTypeWideColumnEntity) { if (!SetValueAndColumnsFromEntity(pinned_value_)) { return false; @@ -1261,21 +1236,9 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() { iter_.value(), iter_.iter()->IsValuePinned() /* operand_pinned */); PERF_COUNTER_ADD(internal_merge_count, 1); } else if (ikey.type == kTypeBlobIndex) { - if (expose_blob_index_) { - status_ = - Status::NotSupported("BlobDB does not support merge operator."); - valid_ = false; + if (!MergeWithBlobBaseValue(iter_.value(), saved_key_.GetUserKey())) { return false; } - if (!SetBlobValueIfNeeded(ikey.user_key, iter_.value())) { - return false; - } - valid_ = true; - if (!MergeWithPlainBaseValue(blob_value_, saved_key_.GetUserKey())) { - return false; - } - - ResetBlobValue(); return true; } else if (ikey.type == kTypeWideColumnEntity) { @@ -1342,6 +1305,35 @@ bool DBIter::MergeWithPlainBaseValue(const Slice& value, return SetValueAndColumnsFromMergeResult(s, result_type); } +bool DBIter::MergeWithBlobBaseValue(const Slice& blob_index, + const Slice& user_key) { + assert(!is_blob_); + + if (expose_blob_index_) { + status_ = + Status::NotSupported("Legacy BlobDB does not support merge operator."); + valid_ = false; + return false; + } + + const Status s = blob_reader_.RetrieveAndSetBlobValue(user_key, blob_index); + if (!s.ok()) { + status_ = s; + valid_ = false; + return false; + } + + valid_ = true; + + if (!MergeWithPlainBaseValue(blob_reader_.GetBlobValue(), user_key)) { + return false; + } + + blob_reader_.ResetBlobValue(); + + return true; +} + bool DBIter::MergeWithWideColumnBaseValue(const Slice& entity, const Slice& user_key) { // `op_failure_scope` (an output parameter) is not provided (set to nullptr) @@ -1531,7 +1523,7 @@ void DBIter::Seek(const Slice& target) { status_ = Status::OK(); ReleaseTempPinnedData(); - ResetBlobValue(); + ResetBlobData(); ResetValueAndColumns(); ResetInternalKeysSkippedCounter(); @@ -1607,7 +1599,7 @@ void DBIter::SeekForPrev(const Slice& target) { status_ = Status::OK(); ReleaseTempPinnedData(); - ResetBlobValue(); + ResetBlobData(); ResetValueAndColumns(); ResetInternalKeysSkippedCounter(); @@ -1668,7 +1660,7 @@ void DBIter::SeekToFirst() { status_.PermitUncheckedError(); direction_ = kForward; ReleaseTempPinnedData(); - ResetBlobValue(); + ResetBlobData(); ResetValueAndColumns(); ResetInternalKeysSkippedCounter(); ClearSavedValue(); @@ -1731,7 +1723,7 @@ void DBIter::SeekToLast() { status_.PermitUncheckedError(); direction_ = kReverse; ReleaseTempPinnedData(); - ResetBlobValue(); + ResetBlobData(); ResetValueAndColumns(); ResetInternalKeysSkippedCounter(); ClearSavedValue(); diff --git a/db/db_iter.h b/db/db_iter.h index e277919239..932be36154 100644 --- a/db/db_iter.h +++ b/db/db_iter.h @@ -219,6 +219,31 @@ class DBIter final : public Iterator { void set_valid(bool v) { valid_ = v; } private: + class BlobReader { + public: + BlobReader(const Version* version, ReadTier read_tier, + bool verify_checksums, bool fill_cache, + Env::IOActivity io_activity) + : version_(version), + read_tier_(read_tier), + verify_checksums_(verify_checksums), + fill_cache_(fill_cache), + io_activity_(io_activity) {} + + const Slice& GetBlobValue() const { return blob_value_; } + Status RetrieveAndSetBlobValue(const Slice& user_key, + const Slice& blob_index); + void ResetBlobValue() { blob_value_.Reset(); } + + private: + PinnableSlice blob_value_; + const Version* version_; + ReadTier read_tier_; + bool verify_checksums_; + bool fill_cache_; + Env::IOActivity io_activity_; + }; + // For all methods in this block: // PRE: iter_->Valid() && status_.ok() // Return false if there was an error, and status() is non-ok, valid_ = false; @@ -299,15 +324,6 @@ class DBIter final : public Iterator { : user_comparator_.CompareWithoutTimestamp(a, b); } - // Retrieves the blob value for the specified user key using the given blob - // index when using the integrated BlobDB implementation. - bool SetBlobValueIfNeeded(const Slice& user_key, const Slice& blob_index); - - void ResetBlobValue() { - is_blob_ = false; - blob_value_.Reset(); - } - void SetValueAndColumnsFromPlain(const Slice& slice) { assert(value_.empty()); assert(wide_columns_.empty()); @@ -316,6 +332,9 @@ class DBIter final : public Iterator { wide_columns_.emplace_back(kDefaultWideColumnName, slice); } + bool SetValueAndColumnsFromBlob(const Slice& user_key, + const Slice& blob_index); + bool SetValueAndColumnsFromEntity(Slice slice); bool SetValueAndColumnsFromMergeResult(const Status& merge_status, @@ -326,11 +345,17 @@ class DBIter final : public Iterator { wide_columns_.clear(); } + void ResetBlobData() { + blob_reader_.ResetBlobValue(); + is_blob_ = false; + } + // The following methods perform the actual merge operation for the - // no base value/plain base value/wide-column base value cases. + // no/plain/blob/wide-column base value cases. // If user-defined timestamp is enabled, `user_key` includes timestamp. bool MergeWithNoBaseValue(const Slice& user_key); bool MergeWithPlainBaseValue(const Slice& value, const Slice& user_key); + bool MergeWithBlobBaseValue(const Slice& blob_index, const Slice& user_key); bool MergeWithWideColumnBaseValue(const Slice& entity, const Slice& user_key); bool PrepareValue() { @@ -356,7 +381,7 @@ class DBIter final : public Iterator { UserComparatorWrapper user_comparator_; const MergeOperator* const merge_operator_; IteratorWrapper iter_; - const Version* version_; + BlobReader blob_reader_; ReadCallback* read_callback_; // Max visible sequence number. It is normally the snapshot seq unless we have // uncommitted data in db as in WriteUnCommitted. @@ -376,7 +401,6 @@ class DBIter final : public Iterator { std::string saved_value_; Slice pinned_value_; // for prefix seek mode to support prev() - PinnableSlice blob_value_; // Value of the default column Slice value_; // All columns (i.e. name-value pairs) @@ -410,15 +434,11 @@ class DBIter final : public Iterator { // Expect the inner iterator to maintain a total order. // prefix_extractor_ must be non-NULL if the value is false. const bool expect_total_order_inner_iter_; - ReadTier read_tier_; - bool fill_cache_; - bool verify_checksums_; // Whether the iterator is allowed to expose blob references. Set to true when // the stacked BlobDB implementation is used, false otherwise. bool expose_blob_index_; bool is_blob_; bool arena_mode_; - const Env::IOActivity io_activity_; // List of operands for merge operator. MergeContext merge_context_; LocalStatistics local_stats_;