diff --git a/db/db_merge_operator_test.cc b/db/db_merge_operator_test.cc index 5a7028e774..e82e0cbf09 100644 --- a/db/db_merge_operator_test.cc +++ b/db/db_merge_operator_test.cc @@ -6,10 +6,12 @@ #include #include "db/db_test_util.h" +#include "db/dbformat.h" #include "db/forward_iterator.h" #include "port/stack_trace.h" #include "rocksdb/merge_operator.h" #include "rocksdb/snapshot.h" +#include "rocksdb/utilities/debug.h" #include "util/random.h" #include "utilities/merge_operators.h" #include "utilities/merge_operators/string_append/stringappend2.h" @@ -949,6 +951,98 @@ TEST_P(PerConfigMergeOperatorPinningTest, Randomized) { VerifyDBFromMap(true_data); } +TEST_F(DBMergeOperatorTest, MaxSuccessiveMergesBaseValues) { + Options options = CurrentOptions(); + options.create_if_missing = true; + options.merge_operator = MergeOperators::CreatePutOperator(); + options.max_successive_merges = 1; + options.env = env_; + Reopen(options); + + constexpr char foo[] = "foo"; + constexpr char bar[] = "bar"; + constexpr char baz[] = "baz"; + constexpr char qux[] = "qux"; + constexpr char corge[] = "corge"; + + // No base value + { + constexpr char key[] = "key1"; + + ASSERT_OK(db_->Merge(WriteOptions(), db_->DefaultColumnFamily(), key, foo)); + ASSERT_OK(db_->Merge(WriteOptions(), db_->DefaultColumnFamily(), key, bar)); + + PinnableSlice result; + ASSERT_OK( + db_->Get(ReadOptions(), db_->DefaultColumnFamily(), key, &result)); + ASSERT_EQ(result, bar); + + // We expect the second Merge to be converted to a Put because of + // max_successive_merges. + constexpr size_t max_key_versions = 8; + std::vector key_versions; + ASSERT_OK(GetAllKeyVersions(db_, db_->DefaultColumnFamily(), key, key, + max_key_versions, &key_versions)); + ASSERT_EQ(key_versions.size(), 2); + ASSERT_EQ(key_versions[0].type, kTypeValue); + ASSERT_EQ(key_versions[1].type, kTypeMerge); + } + + // Plain base value + { + constexpr char key[] = "key2"; + + ASSERT_OK(db_->Put(WriteOptions(), db_->DefaultColumnFamily(), key, foo)); + ASSERT_OK(db_->Merge(WriteOptions(), db_->DefaultColumnFamily(), key, bar)); + ASSERT_OK(db_->Merge(WriteOptions(), db_->DefaultColumnFamily(), key, baz)); + + PinnableSlice result; + ASSERT_OK( + db_->Get(ReadOptions(), db_->DefaultColumnFamily(), key, &result)); + ASSERT_EQ(result, baz); + + // We expect the second Merge to be converted to a Put because of + // max_successive_merges. + constexpr size_t max_key_versions = 8; + std::vector key_versions; + ASSERT_OK(GetAllKeyVersions(db_, db_->DefaultColumnFamily(), key, key, + max_key_versions, &key_versions)); + ASSERT_EQ(key_versions.size(), 3); + ASSERT_EQ(key_versions[0].type, kTypeValue); + ASSERT_EQ(key_versions[1].type, kTypeMerge); + ASSERT_EQ(key_versions[2].type, kTypeValue); + } + + // Wide-column base value + { + constexpr char key[] = "key3"; + const WideColumns columns{{kDefaultWideColumnName, foo}, {bar, baz}}; + + ASSERT_OK(db_->PutEntity(WriteOptions(), db_->DefaultColumnFamily(), key, + columns)); + ASSERT_OK(db_->Merge(WriteOptions(), db_->DefaultColumnFamily(), key, qux)); + ASSERT_OK( + db_->Merge(WriteOptions(), db_->DefaultColumnFamily(), key, corge)); + + PinnableWideColumns result; + ASSERT_OK(db_->GetEntity(ReadOptions(), db_->DefaultColumnFamily(), key, + &result)); + const WideColumns expected{{kDefaultWideColumnName, corge}, {bar, baz}}; + ASSERT_EQ(result.columns(), expected); + + // We expect the second Merge to be converted to a PutEntity because of + // max_successive_merges. + constexpr size_t max_key_versions = 8; + std::vector key_versions; + ASSERT_OK(GetAllKeyVersions(db_, db_->DefaultColumnFamily(), key, key, + max_key_versions, &key_versions)); + ASSERT_EQ(key_versions.size(), 3); + ASSERT_EQ(key_versions[0].type, kTypeWideColumnEntity); + ASSERT_EQ(key_versions[1].type, kTypeMerge); + ASSERT_EQ(key_versions[2].type, kTypeWideColumnEntity); + } +} + } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) { diff --git a/db/merge_helper.cc b/db/merge_helper.cc index 9d212fc51a..d8b1d788bb 100644 --- a/db/merge_helper.cc +++ b/db/merge_helper.cc @@ -298,6 +298,20 @@ Status MergeHelper::TimedFullMerge( result_type, op_failure_scope); } +Status MergeHelper::TimedFullMerge( + const MergeOperator* merge_operator, const Slice& key, WideBaseValueTag, + const WideColumns& columns, const std::vector& operands, + Logger* logger, Statistics* statistics, SystemClock* clock, + bool update_num_ops_stats, std::string* result, Slice* result_operand, + ValueType* result_type, MergeOperator::OpFailureScope* op_failure_scope) { + MergeOperator::MergeOperationInputV3::ExistingValue existing_value(columns); + + return TimedFullMergeImpl(merge_operator, key, std::move(existing_value), + operands, logger, statistics, clock, + update_num_ops_stats, result, result_operand, + result_type, op_failure_scope); +} + Status MergeHelper::TimedFullMerge( const MergeOperator* merge_operator, const Slice& key, NoBaseValueTag, const std::vector& operands, Logger* logger, Statistics* statistics, @@ -351,6 +365,21 @@ Status MergeHelper::TimedFullMerge( op_failure_scope); } +Status MergeHelper::TimedFullMerge( + const MergeOperator* merge_operator, const Slice& key, WideBaseValueTag, + const WideColumns& columns, const std::vector& operands, + Logger* logger, Statistics* statistics, SystemClock* clock, + bool update_num_ops_stats, std::string* result_value, + PinnableWideColumns* result_entity, + MergeOperator::OpFailureScope* op_failure_scope) { + MergeOperator::MergeOperationInputV3::ExistingValue existing_value(columns); + + return TimedFullMergeImpl(merge_operator, key, std::move(existing_value), + operands, logger, statistics, clock, + update_num_ops_stats, result_value, result_entity, + op_failure_scope); +} + // PRE: iter points to the first merge type entry // POST: iter points to the first entry beyond the merge process (or the end) // keys_, operands_ are updated to reflect the merge result. diff --git a/db/merge_helper.h b/db/merge_helper.h index 93267c9a92..84c5f35351 100644 --- a/db/merge_helper.h +++ b/db/merge_helper.h @@ -85,6 +85,13 @@ class MergeHelper { std::string* result, Slice* result_operand, ValueType* result_type, MergeOperator::OpFailureScope* op_failure_scope); + static Status TimedFullMerge( + const MergeOperator* merge_operator, const Slice& key, WideBaseValueTag, + const WideColumns& columns, const std::vector& operands, + Logger* logger, Statistics* statistics, SystemClock* clock, + bool update_num_ops_stats, std::string* result, Slice* result_operand, + ValueType* result_type, MergeOperator::OpFailureScope* op_failure_scope); + // Variants that expose the merge result translated to the form requested by // the client. (For example, if the result is a wide-column structure but the // client requested the results in plain-value form, the value of the default @@ -112,6 +119,16 @@ class MergeHelper { std::string* result_value, PinnableWideColumns* result_entity, MergeOperator::OpFailureScope* op_failure_scope); + static Status TimedFullMerge(const MergeOperator* merge_operator, + const Slice& key, WideBaseValueTag, + const WideColumns& columns, + const std::vector& operands, + Logger* logger, Statistics* statistics, + SystemClock* clock, bool update_num_ops_stats, + std::string* result_value, + PinnableWideColumns* result_entity, + MergeOperator::OpFailureScope* op_failure_scope); + // During compaction, merge entries until we hit // - a corrupted key // - a Put/Delete, diff --git a/db/wide/wide_columns_helper.h b/db/wide/wide_columns_helper.h index 86c77c02d9..a870fae30d 100644 --- a/db/wide/wide_columns_helper.h +++ b/db/wide/wide_columns_helper.h @@ -24,6 +24,11 @@ class WideColumnsHelper { return !columns.empty() && columns.front().name() == kDefaultWideColumnName; } + static bool HasDefaultColumnOnly(const WideColumns& columns) { + return columns.size() == 1 && + columns.front().name() == kDefaultWideColumnName; + } + static const Slice& GetDefaultColumn(const WideColumns& columns) { assert(HasDefaultColumn(columns)); return columns.front().value(); diff --git a/db/write_batch.cc b/db/write_batch.cc index 17ccca2fa7..4502a81ffb 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -2483,15 +2483,15 @@ class MemTableInserter : public WriteBatch::Handler { } if (perform_merge) { - // TODO: support wide-column base values for max_successive_merges - - // 1) Get the existing value - std::string get_value; + // 1) Get the existing value. Use the wide column APIs to make sure we + // don't lose any columns in the process. + PinnableWideColumns existing; // Pass in the sequence number so that we also include previous merge // operations in the same batch. SnapshotImpl read_from_snapshot; read_from_snapshot.number_ = sequence_; + // TODO: plumb Env::IOActivity ReadOptions read_options; read_options.snapshot = &read_from_snapshot; @@ -2500,28 +2500,47 @@ class MemTableInserter : public WriteBatch::Handler { if (cf_handle == nullptr) { cf_handle = db_->DefaultColumnFamily(); } - Status get_status = db_->Get(read_options, cf_handle, key, &get_value); + + Status get_status = + db_->GetEntity(read_options, cf_handle, key, &existing); if (!get_status.ok()) { // Failed to read a key we know exists. Store the delta in memtable. perform_merge = false; } else { - Slice get_value_slice = Slice(get_value); - // 2) Apply this merge auto merge_operator = moptions->merge_operator; assert(merge_operator); + const auto& columns = existing.columns(); + + Status merge_status; std::string new_value; ValueType new_value_type; - // `op_failure_scope` (an output parameter) is not provided (set to - // nullptr) since a failure must be propagated regardless of its value. - Status merge_status = MergeHelper::TimedFullMerge( - merge_operator, key, MergeHelper::kPlainBaseValue, get_value_slice, - {value}, moptions->info_log, moptions->statistics, - SystemClock::Default().get(), - /* update_num_ops_stats */ false, &new_value, - /* result_operand */ nullptr, &new_value_type, - /* op_failure_scope */ nullptr); + + if (WideColumnsHelper::HasDefaultColumnOnly(columns)) { + // `op_failure_scope` (an output parameter) is not provided (set to + // nullptr) since a failure must be propagated regardless of its + // value. + merge_status = MergeHelper::TimedFullMerge( + merge_operator, key, MergeHelper::kPlainBaseValue, + WideColumnsHelper::GetDefaultColumn(columns), {value}, + moptions->info_log, moptions->statistics, + SystemClock::Default().get(), + /* update_num_ops_stats */ false, &new_value, + /* result_operand */ nullptr, &new_value_type, + /* op_failure_scope */ nullptr); + } else { + // `op_failure_scope` (an output parameter) is not provided (set to + // nullptr) since a failure must be propagated regardless of its + // value. + merge_status = MergeHelper::TimedFullMerge( + merge_operator, key, MergeHelper::kWideBaseValue, columns, + {value}, moptions->info_log, moptions->statistics, + SystemClock::Default().get(), + /* update_num_ops_stats */ false, &new_value, + /* result_operand */ nullptr, &new_value_type, + /* op_failure_scope */ nullptr); + } if (!merge_status.ok()) { // Failed to merge! @@ -2530,12 +2549,13 @@ class MemTableInserter : public WriteBatch::Handler { } else { // 3) Add value to memtable assert(!concurrent_memtable_writes_); + assert(new_value_type == kTypeValue || + new_value_type == kTypeWideColumnEntity); + if (kv_prot_info != nullptr) { auto merged_kv_prot_info = kv_prot_info->StripC(column_family_id).ProtectS(sequence_); merged_kv_prot_info.UpdateV(value, new_value); - assert(new_value_type == kTypeValue || - new_value_type == kTypeWideColumnEntity); merged_kv_prot_info.UpdateO(kTypeMerge, new_value_type); ret_status = mem->Add(sequence_, new_value_type, key, new_value, &merged_kv_prot_info); diff --git a/tools/ldb_cmd.cc b/tools/ldb_cmd.cc index ff2483dce8..6edf0637f4 100644 --- a/tools/ldb_cmd.cc +++ b/tools/ldb_cmd.cc @@ -1100,8 +1100,7 @@ std::string LDBCommand::PrintKeyValueOrWideColumns( const Slice& key, const Slice& value, const WideColumns& wide_columns, bool is_key_hex, bool is_value_hex) { if (wide_columns.empty() || - (wide_columns.size() == 1 && - WideColumnsHelper::HasDefaultColumn(wide_columns))) { + WideColumnsHelper::HasDefaultColumnOnly(wide_columns)) { return PrintKeyValue(key.ToString(), value.ToString(), is_key_hex, is_value_hex); } diff --git a/unreleased_history/bug_fixes/max_successive_merges_wide_columns.md b/unreleased_history/bug_fixes/max_successive_merges_wide_columns.md new file mode 100644 index 0000000000..d24b6cf308 --- /dev/null +++ b/unreleased_history/bug_fixes/max_successive_merges_wide_columns.md @@ -0,0 +1 @@ +Fixed the handling of wide-column base values in the `max_successive_merges` logic.