From 2ea109521f63461bc75783b711de626b438e1715 Mon Sep 17 00:00:00 2001 From: Levi Tamasi Date: Wed, 9 Nov 2022 12:54:05 -0800 Subject: [PATCH] Revisit the interface of MergeHelper::TimedFullMerge(WithEntity) (#10932) Summary: The patch refines/reworks `MergeHelper::TimedFullMerge(WithEntity)` a bit in two ways. First, it eliminates the recently introduced `TimedFullMerge` overload, which makes the responsibilities clearer by making sure the query result (`value` for `Get`, `columns` for `GetEntity`) is set uniformly in `SaveValue` and `GetContext`. Second, it changes the interface of `TimedFullMergeWithEntity` so it exposes its result in a serialized form; this is a more decoupled design which will come in handy when adding support for `Merge` with wide-column entities to `DBIter`. Pull Request resolved: https://github.com/facebook/rocksdb/pull/10932 Test Plan: `make check` Reviewed By: akankshamahajan15 Differential Revision: D41129399 Pulled By: ltamasi fbshipit-source-id: 69d8da358c77d4fc7e8c40f4dafc2c129a710677 --- db/memtable.cc | 64 +++++++++++++++++++++++++++++++++++++------- db/merge_helper.cc | 59 ++++++---------------------------------- db/merge_helper.h | 13 ++------- db/version_set.cc | 10 ++++--- table/get_context.cc | 63 +++++++++++++++++++++++++++++++++++-------- 5 files changed, 124 insertions(+), 85 deletions(-) diff --git a/db/memtable.cc b/db/memtable.cc index 829a3c0995..98b8644688 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -1066,11 +1066,21 @@ static bool SaveValue(void* arg, const char* entry) { assert(s->do_merge); if (s->value || s->columns) { + std::string result; *(s->status) = MergeHelper::TimedFullMerge( merge_operator, s->key->user_key(), &v, - merge_context->GetOperands(), s->value, s->columns, s->logger, - s->statistics, s->clock, /* result_operand */ nullptr, + merge_context->GetOperands(), &result, s->logger, s->statistics, + s->clock, /* result_operand */ nullptr, /* update_num_ops_stats */ true); + + if (s->status->ok()) { + if (s->value) { + *(s->value) = std::move(result); + } else { + assert(s->columns); + s->columns->SetPlainValue(result); + } + } } } else if (s->value) { s->value->assign(v.data(), v.size()); @@ -1115,11 +1125,27 @@ static bool SaveValue(void* arg, const char* entry) { } else if (*(s->merge_in_progress)) { assert(s->do_merge); - if (s->value || s->columns) { + if (s->value) { + Slice value_of_default; + *(s->status) = WideColumnSerialization::GetValueOfDefaultColumn( + v, value_of_default); + if (s->status->ok()) { + *(s->status) = MergeHelper::TimedFullMerge( + merge_operator, s->key->user_key(), &value_of_default, + merge_context->GetOperands(), s->value, s->logger, + s->statistics, s->clock, /* result_operand */ nullptr, + /* update_num_ops_stats */ true); + } + } else if (s->columns) { + std::string result; *(s->status) = MergeHelper::TimedFullMergeWithEntity( merge_operator, s->key->user_key(), v, - merge_context->GetOperands(), s->value, s->columns, s->logger, - s->statistics, s->clock, /* update_num_ops_stats */ true); + merge_context->GetOperands(), &result, s->logger, s->statistics, + s->clock, /* update_num_ops_stats */ true); + + if (s->status->ok()) { + *(s->status) = s->columns->SetWideColumnValue(result); + } } } else if (s->value) { Slice value_of_default; @@ -1150,11 +1176,21 @@ static bool SaveValue(void* arg, const char* entry) { case kTypeRangeDeletion: { if (*(s->merge_in_progress)) { if (s->value || s->columns) { + std::string result; *(s->status) = MergeHelper::TimedFullMerge( merge_operator, s->key->user_key(), nullptr, - merge_context->GetOperands(), s->value, s->columns, s->logger, - s->statistics, s->clock, /* result_operand */ nullptr, + merge_context->GetOperands(), &result, s->logger, s->statistics, + s->clock, /* result_operand */ nullptr, /* update_num_ops_stats */ true); + + if (s->status->ok()) { + if (s->value) { + *(s->value) = std::move(result); + } else { + assert(s->columns); + s->columns->SetPlainValue(result); + } + } } } else { *(s->status) = Status::NotFound(); @@ -1180,11 +1216,21 @@ static bool SaveValue(void* arg, const char* entry) { if (s->do_merge && merge_operator->ShouldMerge( merge_context->GetOperandsDirectionBackward())) { if (s->value || s->columns) { + std::string result; *(s->status) = MergeHelper::TimedFullMerge( merge_operator, s->key->user_key(), nullptr, - merge_context->GetOperands(), s->value, s->columns, s->logger, - s->statistics, s->clock, /* result_operand */ nullptr, + merge_context->GetOperands(), &result, s->logger, s->statistics, + s->clock, /* result_operand */ nullptr, /* update_num_ops_stats */ true); + + if (s->status->ok()) { + if (s->value) { + *(s->value) = std::move(result); + } else { + assert(s->columns); + s->columns->SetPlainValue(result); + } + } } *(s->found_final_value) = true; diff --git a/db/merge_helper.cc b/db/merge_helper.cc index 5a7c5765e0..2d37d6d332 100644 --- a/db/merge_helper.cc +++ b/db/merge_helper.cc @@ -112,44 +112,10 @@ Status MergeHelper::TimedFullMerge(const MergeOperator* merge_operator, return Status::OK(); } -Status MergeHelper::TimedFullMerge(const MergeOperator* merge_operator, - const Slice& key, const Slice* base_value, - const std::vector& operands, - std::string* value, - PinnableWideColumns* columns, Logger* logger, - Statistics* statistics, SystemClock* clock, - Slice* result_operand, - bool update_num_ops_stats) { - assert(value || columns); - assert(!value || !columns); - - std::string result; - const Status s = - TimedFullMerge(merge_operator, key, base_value, operands, &result, logger, - statistics, clock, result_operand, update_num_ops_stats); - if (!s.ok()) { - return s; - } - - if (value) { - *value = std::move(result); - return Status::OK(); - } - - assert(columns); - columns->SetPlainValue(result); - - return Status::OK(); -} - Status MergeHelper::TimedFullMergeWithEntity( const MergeOperator* merge_operator, const Slice& key, Slice base_entity, - const std::vector& operands, std::string* value, - PinnableWideColumns* columns, Logger* logger, Statistics* statistics, - SystemClock* clock, bool update_num_ops_stats) { - assert(value || columns); - assert(!value || !columns); - + const std::vector& operands, std::string* result, Logger* logger, + Statistics* statistics, SystemClock* clock, bool update_num_ops_stats) { WideColumns base_columns; { @@ -168,44 +134,35 @@ Status MergeHelper::TimedFullMergeWithEntity( value_of_default = base_columns[0].value(); } - std::string result; + std::string merge_result; { constexpr Slice* result_operand = nullptr; const Status s = TimedFullMerge( - merge_operator, key, &value_of_default, operands, &result, logger, + merge_operator, key, &value_of_default, operands, &merge_result, logger, statistics, clock, result_operand, update_num_ops_stats); if (!s.ok()) { return s; } } - if (value) { - *value = std::move(result); - return Status::OK(); - } - - assert(columns); - - std::string output; - if (has_default_column) { - base_columns[0].value() = result; + base_columns[0].value() = merge_result; - const Status s = WideColumnSerialization::Serialize(base_columns, output); + const Status s = WideColumnSerialization::Serialize(base_columns, *result); if (!s.ok()) { return s; } } else { const Status s = - WideColumnSerialization::Serialize(result, base_columns, output); + WideColumnSerialization::Serialize(merge_result, base_columns, *result); if (!s.ok()) { return s; } } - return columns->SetWideColumnValue(output); + return Status::OK(); } // PRE: iter points to the first merge type entry diff --git a/db/merge_helper.h b/db/merge_helper.h index 923850a089..790ec62390 100644 --- a/db/merge_helper.h +++ b/db/merge_helper.h @@ -57,19 +57,10 @@ class MergeHelper { Slice* result_operand, bool update_num_ops_stats); - static Status TimedFullMerge(const MergeOperator* merge_operator, - const Slice& key, const Slice* base_value, - const std::vector& operands, - std::string* value, PinnableWideColumns* columns, - Logger* logger, Statistics* statistics, - SystemClock* clock, Slice* result_operand, - bool update_num_ops_stats); - static Status TimedFullMergeWithEntity( const MergeOperator* merge_operator, const Slice& key, Slice base_entity, - const std::vector& operands, std::string* value, - PinnableWideColumns* columns, Logger* logger, Statistics* statistics, - SystemClock* clock, bool update_num_ops_stats); + const std::vector& operands, std::string* result, Logger* logger, + Statistics* statistics, SystemClock* clock, bool update_num_ops_stats); // During compaction, merge entries until we hit // - a corrupted key diff --git a/db/version_set.cc b/db/version_set.cc index aa0dc394ee..cac3a0a9e1 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -2384,15 +2384,19 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k, } // merge_operands are in saver and we hit the beginning of the key history // do a final merge of nullptr and operands; - std::string* str_value = value != nullptr ? value->GetSelf() : nullptr; - if (str_value || columns) { + if (value || columns) { + std::string result; *status = MergeHelper::TimedFullMerge( merge_operator_, user_key, nullptr, merge_context->GetOperands(), - str_value, columns, info_log_, db_statistics_, clock_, + &result, info_log_, db_statistics_, clock_, /* result_operand */ nullptr, /* update_num_ops_stats */ true); if (status->ok()) { if (LIKELY(value != nullptr)) { + *(value->GetSelf()) = std::move(result); value->PinSelf(); + } else { + assert(columns != nullptr); + columns->SetPlainValue(result); } } } diff --git a/table/get_context.cc b/table/get_context.cc index b2daa1789f..69e7527147 100644 --- a/table/get_context.cc +++ b/table/get_context.cc @@ -468,10 +468,10 @@ void GetContext::Merge(const Slice* value) { assert(do_merge_); assert(!pinnable_val_ || !columns_); + std::string result; const Status s = MergeHelper::TimedFullMerge( - merge_operator_, user_key_, value, merge_context_->GetOperands(), - pinnable_val_ ? pinnable_val_->GetSelf() : nullptr, columns_, logger_, - statistics_, clock_, /* result_operand */ nullptr, + merge_operator_, user_key_, value, merge_context_->GetOperands(), &result, + logger_, statistics_, clock_, /* result_operand */ nullptr, /* update_num_ops_stats */ true); if (!s.ok()) { state_ = kCorrupt; @@ -479,25 +479,66 @@ void GetContext::Merge(const Slice* value) { } if (LIKELY(pinnable_val_ != nullptr)) { + *(pinnable_val_->GetSelf()) = std::move(result); pinnable_val_->PinSelf(); + return; } + + assert(columns_); + columns_->SetPlainValue(result); } void GetContext::MergeWithEntity(Slice entity) { assert(do_merge_); assert(!pinnable_val_ || !columns_); - const Status s = MergeHelper::TimedFullMergeWithEntity( - merge_operator_, user_key_, entity, merge_context_->GetOperands(), - pinnable_val_ ? pinnable_val_->GetSelf() : nullptr, columns_, logger_, - statistics_, clock_, /* update_num_ops_stats */ true); - if (!s.ok()) { - state_ = kCorrupt; + if (LIKELY(pinnable_val_ != nullptr)) { + Slice value_of_default; + + { + const Status s = WideColumnSerialization::GetValueOfDefaultColumn( + entity, value_of_default); + if (!s.ok()) { + state_ = kCorrupt; + return; + } + } + + { + const Status s = MergeHelper::TimedFullMerge( + merge_operator_, user_key_, &value_of_default, + merge_context_->GetOperands(), pinnable_val_->GetSelf(), logger_, + statistics_, clock_, /* result_operand */ nullptr, + /* update_num_ops_stats */ true); + if (!s.ok()) { + state_ = kCorrupt; + return; + } + } + + pinnable_val_->PinSelf(); return; } - if (LIKELY(pinnable_val_ != nullptr)) { - pinnable_val_->PinSelf(); + std::string result; + + { + const Status s = MergeHelper::TimedFullMergeWithEntity( + merge_operator_, user_key_, entity, merge_context_->GetOperands(), + &result, logger_, statistics_, clock_, /* update_num_ops_stats */ true); + if (!s.ok()) { + state_ = kCorrupt; + return; + } + } + + { + assert(columns_); + const Status s = columns_->SetWideColumnValue(result); + if (!s.ok()) { + state_ = kCorrupt; + return; + } } }