Refactor the BlobDB-related parts of DBIter (#13061)

Summary:
Pull Request resolved: https://github.com/facebook/rocksdb/pull/13061

As groundwork for further changes, the patch refactors the BlobDB-related parts of `DBIter` by 1) introducing a new internal helper class `DBIter::BlobReader` that encapsulates all members needed to retrieve a blob value (namely, `Version` and the `ReadOptions` fields) and 2) factoring out and cleaning up some duplicate logic related to resolving blob references in the non-Merge (see `SetValueAndColumnsFromBlob`) and Merge (see `MergeWithBlobBaseValue`) cases.

Reviewed By: jowlyzhang

Differential Revision: D64078099

fbshipit-source-id: 22d5bd93e6e5be5cc9ecf6c4ee6954f2eb016aff
This commit is contained in:
Levi Tamasi 2024-10-10 15:38:59 -07:00 committed by Facebook GitHub Bot
parent fe6c8cb1d6
commit 2c9aa69a93
2 changed files with 107 additions and 95 deletions

View File

@ -52,7 +52,9 @@ DBIter::DBIter(Env* _env, const ReadOptions& read_options,
user_comparator_(cmp),
merge_operator_(ioptions.merge_operator.get()),
iter_(iter),
version_(version),
blob_reader_(version, read_options.read_tier,
read_options.verify_checksums, read_options.fill_cache,
read_options.io_activity),
read_callback_(read_callback),
sequence_(s),
statistics_(ioptions.stats),
@ -71,13 +73,9 @@ DBIter::DBIter(Env* _env, const ReadOptions& read_options,
expect_total_order_inner_iter_(prefix_extractor_ == nullptr ||
read_options.total_order_seek ||
read_options.auto_prefix_mode),
read_tier_(read_options.read_tier),
fill_cache_(read_options.fill_cache),
verify_checksums_(read_options.verify_checksums),
expose_blob_index_(expose_blob_index),
is_blob_(false),
arena_mode_(arena_mode),
io_activity_(read_options.io_activity),
cfh_(cfh),
timestamp_ub_(read_options.timestamp),
timestamp_lb_(read_options.iter_start_ts),
@ -151,7 +149,7 @@ void DBIter::Next() {
PERF_CPU_TIMER_GUARD(iter_next_cpu_nanos, clock_);
// Release temporarily pinned blocks from last operation
ReleaseTempPinnedData();
ResetBlobValue();
ResetBlobData();
ResetValueAndColumns();
local_stats_.skip_count_ += num_internal_keys_skipped_;
local_stats_.skip_count_--;
@ -194,29 +192,21 @@ void DBIter::Next() {
}
}
bool DBIter::SetBlobValueIfNeeded(const Slice& user_key,
const Slice& blob_index) {
assert(!is_blob_);
Status DBIter::BlobReader::RetrieveAndSetBlobValue(const Slice& user_key,
const Slice& blob_index) {
assert(blob_value_.empty());
if (expose_blob_index_) { // Stacked BlobDB implementation
is_blob_ = true;
return true;
}
if (!version_) {
status_ = Status::Corruption("Encountered unexpected blob index.");
valid_ = false;
return false;
return Status::Corruption("Encountered unexpected blob index.");
}
// TODO: consider moving ReadOptions from ArenaWrappedDBIter to DBIter to
// avoid having to copy options back and forth.
// TODO: plumb Env::IOActivity, Env::IOPriority
// TODO: plumb Env::IOPriority
ReadOptions read_options;
read_options.read_tier = read_tier_;
read_options.fill_cache = fill_cache_;
read_options.verify_checksums = verify_checksums_;
read_options.fill_cache = fill_cache_;
read_options.io_activity = io_activity_;
constexpr FilePrefetchBuffer* prefetch_buffer = nullptr;
constexpr uint64_t* bytes_read = nullptr;
@ -224,13 +214,33 @@ bool DBIter::SetBlobValueIfNeeded(const Slice& user_key,
const Status s = version_->GetBlob(read_options, user_key, blob_index,
prefetch_buffer, &blob_value_, bytes_read);
if (!s.ok()) {
return s;
}
return Status::OK();
}
bool DBIter::SetValueAndColumnsFromBlob(const Slice& user_key,
const Slice& blob_index) {
assert(!is_blob_);
is_blob_ = true;
if (expose_blob_index_) {
SetValueAndColumnsFromPlain(blob_index);
return true;
}
const Status s = blob_reader_.RetrieveAndSetBlobValue(user_key, blob_index);
if (!s.ok()) {
status_ = s;
valid_ = false;
is_blob_ = false;
return false;
}
is_blob_ = true;
SetValueAndColumnsFromPlain(blob_reader_.GetBlobValue());
return true;
}
@ -420,12 +430,9 @@ bool DBIter::FindNextUserEntryInternal(bool skipping_saved_key,
}
if (ikey_.type == kTypeBlobIndex) {
if (!SetBlobValueIfNeeded(ikey_.user_key, iter_.value())) {
if (!SetValueAndColumnsFromBlob(ikey_.user_key, iter_.value())) {
return false;
}
SetValueAndColumnsFromPlain(expose_blob_index_ ? iter_.value()
: blob_value_);
} else if (ikey_.type == kTypeWideColumnEntity) {
if (!SetValueAndColumnsFromEntity(iter_.value())) {
return false;
@ -619,23 +626,9 @@ bool DBIter::MergeValuesNewToOld() {
iter_.value(), iter_.iter()->IsValuePinned() /* operand_pinned */);
PERF_COUNTER_ADD(internal_merge_count, 1);
} else if (kTypeBlobIndex == ikey.type) {
if (expose_blob_index_) {
status_ =
Status::NotSupported("BlobDB does not support merge operator.");
valid_ = false;
if (!MergeWithBlobBaseValue(iter_.value(), ikey.user_key)) {
return false;
}
// hit a put, merge the put value with operands and store the
// final result in saved_value_. We are done!
if (!SetBlobValueIfNeeded(ikey.user_key, iter_.value())) {
return false;
}
valid_ = true;
if (!MergeWithPlainBaseValue(blob_value_, ikey.user_key)) {
return false;
}
ResetBlobValue();
// iter_ is positioned after put
iter_.Next();
@ -643,6 +636,7 @@ bool DBIter::MergeValuesNewToOld() {
valid_ = false;
return false;
}
return true;
} else if (kTypeWideColumnEntity == ikey.type) {
if (!MergeWithWideColumnBaseValue(iter_.value(), ikey.user_key)) {
@ -689,7 +683,7 @@ void DBIter::Prev() {
PERF_COUNTER_ADD(iter_prev_count, 1);
PERF_CPU_TIMER_GUARD(iter_prev_cpu_nanos, clock_);
ReleaseTempPinnedData();
ResetBlobValue();
ResetBlobData();
ResetValueAndColumns();
ResetInternalKeysSkippedCounter();
bool ok = true;
@ -1041,21 +1035,9 @@ bool DBIter::FindValueForCurrentKey() {
}
return true;
} else if (last_not_merge_type == kTypeBlobIndex) {
if (expose_blob_index_) {
status_ =
Status::NotSupported("BlobDB does not support merge operator.");
valid_ = false;
if (!MergeWithBlobBaseValue(pinned_value_, saved_key_.GetUserKey())) {
return false;
}
if (!SetBlobValueIfNeeded(saved_key_.GetUserKey(), pinned_value_)) {
return false;
}
valid_ = true;
if (!MergeWithPlainBaseValue(blob_value_, saved_key_.GetUserKey())) {
return false;
}
ResetBlobValue();
return true;
} else if (last_not_merge_type == kTypeWideColumnEntity) {
@ -1080,13 +1062,9 @@ bool DBIter::FindValueForCurrentKey() {
break;
case kTypeBlobIndex:
if (!SetBlobValueIfNeeded(saved_key_.GetUserKey(), pinned_value_)) {
if (!SetValueAndColumnsFromBlob(saved_key_.GetUserKey(), pinned_value_)) {
return false;
}
SetValueAndColumnsFromPlain(expose_blob_index_ ? pinned_value_
: blob_value_);
break;
case kTypeWideColumnEntity:
if (!SetValueAndColumnsFromEntity(pinned_value_)) {
@ -1190,12 +1168,9 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() {
pinned_value_ = iter_.value();
}
if (ikey.type == kTypeBlobIndex) {
if (!SetBlobValueIfNeeded(ikey.user_key, pinned_value_)) {
if (!SetValueAndColumnsFromBlob(ikey.user_key, pinned_value_)) {
return false;
}
SetValueAndColumnsFromPlain(expose_blob_index_ ? pinned_value_
: blob_value_);
} else if (ikey.type == kTypeWideColumnEntity) {
if (!SetValueAndColumnsFromEntity(pinned_value_)) {
return false;
@ -1261,21 +1236,9 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() {
iter_.value(), iter_.iter()->IsValuePinned() /* operand_pinned */);
PERF_COUNTER_ADD(internal_merge_count, 1);
} else if (ikey.type == kTypeBlobIndex) {
if (expose_blob_index_) {
status_ =
Status::NotSupported("BlobDB does not support merge operator.");
valid_ = false;
if (!MergeWithBlobBaseValue(iter_.value(), saved_key_.GetUserKey())) {
return false;
}
if (!SetBlobValueIfNeeded(ikey.user_key, iter_.value())) {
return false;
}
valid_ = true;
if (!MergeWithPlainBaseValue(blob_value_, saved_key_.GetUserKey())) {
return false;
}
ResetBlobValue();
return true;
} else if (ikey.type == kTypeWideColumnEntity) {
@ -1342,6 +1305,35 @@ bool DBIter::MergeWithPlainBaseValue(const Slice& value,
return SetValueAndColumnsFromMergeResult(s, result_type);
}
bool DBIter::MergeWithBlobBaseValue(const Slice& blob_index,
const Slice& user_key) {
assert(!is_blob_);
if (expose_blob_index_) {
status_ =
Status::NotSupported("Legacy BlobDB does not support merge operator.");
valid_ = false;
return false;
}
const Status s = blob_reader_.RetrieveAndSetBlobValue(user_key, blob_index);
if (!s.ok()) {
status_ = s;
valid_ = false;
return false;
}
valid_ = true;
if (!MergeWithPlainBaseValue(blob_reader_.GetBlobValue(), user_key)) {
return false;
}
blob_reader_.ResetBlobValue();
return true;
}
bool DBIter::MergeWithWideColumnBaseValue(const Slice& entity,
const Slice& user_key) {
// `op_failure_scope` (an output parameter) is not provided (set to nullptr)
@ -1531,7 +1523,7 @@ void DBIter::Seek(const Slice& target) {
status_ = Status::OK();
ReleaseTempPinnedData();
ResetBlobValue();
ResetBlobData();
ResetValueAndColumns();
ResetInternalKeysSkippedCounter();
@ -1607,7 +1599,7 @@ void DBIter::SeekForPrev(const Slice& target) {
status_ = Status::OK();
ReleaseTempPinnedData();
ResetBlobValue();
ResetBlobData();
ResetValueAndColumns();
ResetInternalKeysSkippedCounter();
@ -1668,7 +1660,7 @@ void DBIter::SeekToFirst() {
status_.PermitUncheckedError();
direction_ = kForward;
ReleaseTempPinnedData();
ResetBlobValue();
ResetBlobData();
ResetValueAndColumns();
ResetInternalKeysSkippedCounter();
ClearSavedValue();
@ -1731,7 +1723,7 @@ void DBIter::SeekToLast() {
status_.PermitUncheckedError();
direction_ = kReverse;
ReleaseTempPinnedData();
ResetBlobValue();
ResetBlobData();
ResetValueAndColumns();
ResetInternalKeysSkippedCounter();
ClearSavedValue();

View File

@ -219,6 +219,31 @@ class DBIter final : public Iterator {
void set_valid(bool v) { valid_ = v; }
private:
class BlobReader {
public:
BlobReader(const Version* version, ReadTier read_tier,
bool verify_checksums, bool fill_cache,
Env::IOActivity io_activity)
: version_(version),
read_tier_(read_tier),
verify_checksums_(verify_checksums),
fill_cache_(fill_cache),
io_activity_(io_activity) {}
const Slice& GetBlobValue() const { return blob_value_; }
Status RetrieveAndSetBlobValue(const Slice& user_key,
const Slice& blob_index);
void ResetBlobValue() { blob_value_.Reset(); }
private:
PinnableSlice blob_value_;
const Version* version_;
ReadTier read_tier_;
bool verify_checksums_;
bool fill_cache_;
Env::IOActivity io_activity_;
};
// For all methods in this block:
// PRE: iter_->Valid() && status_.ok()
// Return false if there was an error, and status() is non-ok, valid_ = false;
@ -299,15 +324,6 @@ class DBIter final : public Iterator {
: user_comparator_.CompareWithoutTimestamp(a, b);
}
// Retrieves the blob value for the specified user key using the given blob
// index when using the integrated BlobDB implementation.
bool SetBlobValueIfNeeded(const Slice& user_key, const Slice& blob_index);
void ResetBlobValue() {
is_blob_ = false;
blob_value_.Reset();
}
void SetValueAndColumnsFromPlain(const Slice& slice) {
assert(value_.empty());
assert(wide_columns_.empty());
@ -316,6 +332,9 @@ class DBIter final : public Iterator {
wide_columns_.emplace_back(kDefaultWideColumnName, slice);
}
bool SetValueAndColumnsFromBlob(const Slice& user_key,
const Slice& blob_index);
bool SetValueAndColumnsFromEntity(Slice slice);
bool SetValueAndColumnsFromMergeResult(const Status& merge_status,
@ -326,11 +345,17 @@ class DBIter final : public Iterator {
wide_columns_.clear();
}
void ResetBlobData() {
blob_reader_.ResetBlobValue();
is_blob_ = false;
}
// The following methods perform the actual merge operation for the
// no base value/plain base value/wide-column base value cases.
// no/plain/blob/wide-column base value cases.
// If user-defined timestamp is enabled, `user_key` includes timestamp.
bool MergeWithNoBaseValue(const Slice& user_key);
bool MergeWithPlainBaseValue(const Slice& value, const Slice& user_key);
bool MergeWithBlobBaseValue(const Slice& blob_index, const Slice& user_key);
bool MergeWithWideColumnBaseValue(const Slice& entity, const Slice& user_key);
bool PrepareValue() {
@ -356,7 +381,7 @@ class DBIter final : public Iterator {
UserComparatorWrapper user_comparator_;
const MergeOperator* const merge_operator_;
IteratorWrapper iter_;
const Version* version_;
BlobReader blob_reader_;
ReadCallback* read_callback_;
// Max visible sequence number. It is normally the snapshot seq unless we have
// uncommitted data in db as in WriteUnCommitted.
@ -376,7 +401,6 @@ class DBIter final : public Iterator {
std::string saved_value_;
Slice pinned_value_;
// for prefix seek mode to support prev()
PinnableSlice blob_value_;
// Value of the default column
Slice value_;
// All columns (i.e. name-value pairs)
@ -410,15 +434,11 @@ class DBIter final : public Iterator {
// Expect the inner iterator to maintain a total order.
// prefix_extractor_ must be non-NULL if the value is false.
const bool expect_total_order_inner_iter_;
ReadTier read_tier_;
bool fill_cache_;
bool verify_checksums_;
// Whether the iterator is allowed to expose blob references. Set to true when
// the stacked BlobDB implementation is used, false otherwise.
bool expose_blob_index_;
bool is_blob_;
bool arena_mode_;
const Env::IOActivity io_activity_;
// List of operands for merge operator.
MergeContext merge_context_;
LocalStatistics local_stats_;