diff --git a/db/merge_operator.cc b/db/merge_operator.cc index d325856406..e5d80c8cb6 100644 --- a/db/merge_operator.cc +++ b/db/merge_operator.cc @@ -9,6 +9,10 @@ #include "rocksdb/merge_operator.h" +#include + +#include "util/overload.h" + namespace ROCKSDB_NAMESPACE { bool MergeOperator::FullMergeV2(const MergeOperationInput& merge_in, @@ -23,6 +27,84 @@ bool MergeOperator::FullMergeV2(const MergeOperationInput& merge_in, &merge_out->new_value, merge_in.logger); } +bool MergeOperator::FullMergeV3(const MergeOperationInputV3& merge_in, + MergeOperationOutputV3* merge_out) const { + assert(merge_out); + + MergeOperationInput in_v2(merge_in.key, nullptr, merge_in.operand_list, + merge_in.logger); + + std::string new_value; + Slice existing_operand(nullptr, 0); + MergeOperationOutput out_v2(new_value, existing_operand); + + return std::visit( + overload{ + [&](const auto& existing) -> bool { + using T = std::decay_t; + + if constexpr (std::is_same_v) { + in_v2.existing_value = &existing; + } + + const bool result = FullMergeV2(in_v2, &out_v2); + if (!result) { + merge_out->op_failure_scope = out_v2.op_failure_scope; + return false; + } + + if (existing_operand.data()) { + merge_out->new_value = existing_operand; + } else { + merge_out->new_value = std::move(new_value); + } + + return true; + }, + [&](const WideColumns& existing_columns) -> bool { + const bool has_default_column = + !existing_columns.empty() && + existing_columns.front().name() == kDefaultWideColumnName; + + Slice value_of_default; + if (has_default_column) { + value_of_default = existing_columns.front().value(); + } + + in_v2.existing_value = &value_of_default; + + const bool result = FullMergeV2(in_v2, &out_v2); + if (!result) { + merge_out->op_failure_scope = out_v2.op_failure_scope; + return false; + } + + merge_out->new_value = MergeOperationOutputV3::NewColumns(); + auto& new_columns = std::get( + merge_out->new_value); + new_columns.reserve(has_default_column + ? existing_columns.size() + : (existing_columns.size() + 1)); + + if (existing_operand.data()) { + new_columns.emplace_back(kDefaultWideColumnName.ToString(), + existing_operand.ToString()); + } else { + new_columns.emplace_back(kDefaultWideColumnName.ToString(), + std::move(new_value)); + } + + for (size_t i = has_default_column ? 1 : 0; + i < existing_columns.size(); ++i) { + new_columns.emplace_back(existing_columns[i].name().ToString(), + existing_columns[i].value().ToString()); + } + + return true; + }}, + merge_in.existing_value); +} + // The default implementation of PartialMergeMulti, which invokes // PartialMerge multiple times internally and merges two operands at // a time. diff --git a/db/merge_test.cc b/db/merge_test.cc index 6d1333e550..93a8535a7e 100644 --- a/db/merge_test.cc +++ b/db/merge_test.cc @@ -18,6 +18,7 @@ #include "rocksdb/env.h" #include "rocksdb/merge_operator.h" #include "rocksdb/utilities/db_ttl.h" +#include "rocksdb/wide_columns.h" #include "test_util/testharness.h" #include "util/coding.h" #include "utilities/merge_operators.h" @@ -607,6 +608,272 @@ TEST_F(MergeTest, MergeWithCompactionAndFlush) { ASSERT_OK(DestroyDB(dbname, Options())); } +TEST_F(MergeTest, FullMergeV3FallbackNewValue) { + // Test that the default FullMergeV3 implementation correctly handles the case + // when FullMergeV2 results in a new value. + + const Slice key("foo"); + const MergeOperator::MergeOperationInputV3::OperandList operands{ + "first", "second", "third"}; + constexpr Logger* logger = nullptr; + + auto append_operator = + MergeOperators::CreateStringAppendOperator(std::string()); + + // No existing value + { + MergeOperator::MergeOperationInputV3::ExistingValue existing_value; + const MergeOperator::MergeOperationInputV3 merge_in( + key, std::move(existing_value), operands, logger); + + MergeOperator::MergeOperationOutputV3 merge_out; + + ASSERT_TRUE(append_operator->FullMergeV3(merge_in, &merge_out)); + + const auto& result = std::get(merge_out.new_value); + ASSERT_EQ(result, operands[0].ToString() + operands[1].ToString() + + operands[2].ToString()); + } + + // Plain existing value + { + const Slice plain("plain"); + MergeOperator::MergeOperationInputV3::ExistingValue existing_value(plain); + const MergeOperator::MergeOperationInputV3 merge_in( + key, std::move(existing_value), operands, logger); + + MergeOperator::MergeOperationOutputV3 merge_out; + + ASSERT_TRUE(append_operator->FullMergeV3(merge_in, &merge_out)); + + const auto& result = std::get(merge_out.new_value); + ASSERT_EQ(result, plain.ToString() + operands[0].ToString() + + operands[1].ToString() + operands[2].ToString()); + } + + // Wide-column existing value with default column + { + const WideColumns entity{ + {kDefaultWideColumnName, "default"}, {"one", "1"}, {"two", "2"}}; + MergeOperator::MergeOperationInputV3::ExistingValue existing_value(entity); + const MergeOperator::MergeOperationInputV3 merge_in( + key, std::move(existing_value), operands, logger); + + MergeOperator::MergeOperationOutputV3 merge_out; + + ASSERT_TRUE(append_operator->FullMergeV3(merge_in, &merge_out)); + + const auto& result = + std::get( + merge_out.new_value); + ASSERT_EQ(result.size(), entity.size()); + ASSERT_EQ(result[0].first, entity[0].name()); + ASSERT_EQ(result[0].second, + entity[0].value().ToString() + operands[0].ToString() + + operands[1].ToString() + operands[2].ToString()); + ASSERT_EQ(result[1].first, entity[1].name()); + ASSERT_EQ(result[1].second, entity[1].value()); + ASSERT_EQ(result[2].first, entity[2].name()); + ASSERT_EQ(result[2].second, entity[2].value()); + } + + // Wide-column existing value without default column + { + const WideColumns entity{{"one", "1"}, {"two", "2"}}; + MergeOperator::MergeOperationInputV3::ExistingValue existing_value(entity); + const MergeOperator::MergeOperationInputV3 merge_in( + key, std::move(existing_value), operands, logger); + + MergeOperator::MergeOperationOutputV3 merge_out; + + ASSERT_TRUE(append_operator->FullMergeV3(merge_in, &merge_out)); + + const auto& result = + std::get( + merge_out.new_value); + ASSERT_EQ(result.size(), entity.size() + 1); + ASSERT_EQ(result[0].first, kDefaultWideColumnName); + ASSERT_EQ(result[0].second, operands[0].ToString() + + operands[1].ToString() + + operands[2].ToString()); + ASSERT_EQ(result[1].first, entity[0].name()); + ASSERT_EQ(result[1].second, entity[0].value()); + ASSERT_EQ(result[2].first, entity[1].name()); + ASSERT_EQ(result[2].second, entity[1].value()); + } +} + +TEST_F(MergeTest, FullMergeV3FallbackExistingOperand) { + // Test that the default FullMergeV3 implementation correctly handles the case + // when FullMergeV2 results in an existing operand. + + const Slice key("foo"); + const MergeOperator::MergeOperationInputV3::OperandList operands{ + "first", "second", "third"}; + constexpr Logger* logger = nullptr; + + auto put_operator = MergeOperators::CreatePutOperator(); + + // No existing value + { + MergeOperator::MergeOperationInputV3::ExistingValue existing_value; + const MergeOperator::MergeOperationInputV3 merge_in( + key, std::move(existing_value), operands, logger); + + MergeOperator::MergeOperationOutputV3 merge_out; + + ASSERT_TRUE(put_operator->FullMergeV3(merge_in, &merge_out)); + + const auto& result = std::get(merge_out.new_value); + ASSERT_EQ(result.data(), operands.back().data()); + ASSERT_EQ(result.size(), operands.back().size()); + } + + // Plain existing value + { + const Slice plain("plain"); + MergeOperator::MergeOperationInputV3::ExistingValue existing_value(plain); + const MergeOperator::MergeOperationInputV3 merge_in( + key, std::move(existing_value), operands, logger); + + MergeOperator::MergeOperationOutputV3 merge_out; + + ASSERT_TRUE(put_operator->FullMergeV3(merge_in, &merge_out)); + + const auto& result = std::get(merge_out.new_value); + ASSERT_EQ(result.data(), operands.back().data()); + ASSERT_EQ(result.size(), operands.back().size()); + } + + // Wide-column existing value with default column + { + const WideColumns entity{ + {kDefaultWideColumnName, "default"}, {"one", "1"}, {"two", "2"}}; + MergeOperator::MergeOperationInputV3::ExistingValue existing_value(entity); + const MergeOperator::MergeOperationInputV3 merge_in( + key, std::move(existing_value), operands, logger); + + MergeOperator::MergeOperationOutputV3 merge_out; + + ASSERT_TRUE(put_operator->FullMergeV3(merge_in, &merge_out)); + + const auto& result = + std::get( + merge_out.new_value); + ASSERT_EQ(result.size(), entity.size()); + ASSERT_EQ(result[0].first, entity[0].name()); + ASSERT_EQ(result[0].second, operands.back()); + ASSERT_EQ(result[1].first, entity[1].name()); + ASSERT_EQ(result[1].second, entity[1].value()); + ASSERT_EQ(result[2].first, entity[2].name()); + ASSERT_EQ(result[2].second, entity[2].value()); + } + + // Wide-column existing value without default column + { + const WideColumns entity{{"one", "1"}, {"two", "2"}}; + MergeOperator::MergeOperationInputV3::ExistingValue existing_value(entity); + const MergeOperator::MergeOperationInputV3 merge_in( + key, std::move(existing_value), operands, logger); + + MergeOperator::MergeOperationOutputV3 merge_out; + + ASSERT_TRUE(put_operator->FullMergeV3(merge_in, &merge_out)); + + const auto& result = + std::get( + merge_out.new_value); + ASSERT_EQ(result.size(), entity.size() + 1); + ASSERT_EQ(result[0].first, kDefaultWideColumnName); + ASSERT_EQ(result[0].second, operands.back()); + ASSERT_EQ(result[1].first, entity[0].name()); + ASSERT_EQ(result[1].second, entity[0].value()); + ASSERT_EQ(result[2].first, entity[1].name()); + ASSERT_EQ(result[2].second, entity[1].value()); + } +} + +TEST_F(MergeTest, FullMergeV3FallbackFailure) { + // Test that the default FullMergeV3 implementation correctly handles the case + // when FullMergeV2 fails. + + const Slice key("foo"); + const MergeOperator::MergeOperationInputV3::OperandList operands{ + "first", "second", "third"}; + constexpr Logger* logger = nullptr; + + class FailMergeOperator : public MergeOperator { + public: + bool FullMergeV2(const MergeOperationInput& /* merge_in */, + MergeOperationOutput* merge_out) const override { + assert(merge_out); + merge_out->op_failure_scope = OpFailureScope::kMustMerge; + + return false; + } + + const char* Name() const override { return "FailMergeOperator"; } + }; + + FailMergeOperator fail_operator; + + // No existing value + { + MergeOperator::MergeOperationInputV3::ExistingValue existing_value; + const MergeOperator::MergeOperationInputV3 merge_in( + key, std::move(existing_value), operands, logger); + + MergeOperator::MergeOperationOutputV3 merge_out; + + ASSERT_FALSE(fail_operator.FullMergeV3(merge_in, &merge_out)); + ASSERT_EQ(merge_out.op_failure_scope, + MergeOperator::OpFailureScope::kMustMerge); + } + + // Plain existing value + { + const Slice plain("plain"); + MergeOperator::MergeOperationInputV3::ExistingValue existing_value(plain); + const MergeOperator::MergeOperationInputV3 merge_in( + key, std::move(existing_value), operands, logger); + + MergeOperator::MergeOperationOutputV3 merge_out; + + ASSERT_FALSE(fail_operator.FullMergeV3(merge_in, &merge_out)); + ASSERT_EQ(merge_out.op_failure_scope, + MergeOperator::OpFailureScope::kMustMerge); + } + + // Wide-column existing value with default column + { + const WideColumns entity{ + {kDefaultWideColumnName, "default"}, {"one", "1"}, {"two", "2"}}; + MergeOperator::MergeOperationInputV3::ExistingValue existing_value(entity); + const MergeOperator::MergeOperationInputV3 merge_in( + key, std::move(existing_value), operands, logger); + + MergeOperator::MergeOperationOutputV3 merge_out; + + ASSERT_FALSE(fail_operator.FullMergeV3(merge_in, &merge_out)); + ASSERT_EQ(merge_out.op_failure_scope, + MergeOperator::OpFailureScope::kMustMerge); + } + + // Wide-column existing value without default column + { + const WideColumns entity{{"one", "1"}, {"two", "2"}}; + MergeOperator::MergeOperationInputV3::ExistingValue existing_value(entity); + const MergeOperator::MergeOperationInputV3 merge_in( + key, std::move(existing_value), operands, logger); + + MergeOperator::MergeOperationOutputV3 merge_out; + + ASSERT_FALSE(fail_operator.FullMergeV3(merge_in, &merge_out)); + ASSERT_EQ(merge_out.op_failure_scope, + MergeOperator::OpFailureScope::kMustMerge); + } +} + } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) { diff --git a/include/rocksdb/merge_operator.h b/include/rocksdb/merge_operator.h index 077130475d..4db9380b6b 100644 --- a/include/rocksdb/merge_operator.h +++ b/include/rocksdb/merge_operator.h @@ -8,10 +8,13 @@ #include #include #include +#include +#include #include #include "rocksdb/customizable.h" #include "rocksdb/slice.h" +#include "rocksdb/wide_columns.h" namespace ROCKSDB_NAMESPACE { @@ -158,6 +161,55 @@ class MergeOperator : public Customizable { virtual bool FullMergeV2(const MergeOperationInput& merge_in, MergeOperationOutput* merge_out) const; + struct MergeOperationInputV3 { + using ExistingValue = std::variant; + using OperandList = std::vector; + + explicit MergeOperationInputV3(const Slice& _key, + ExistingValue&& _existing_value, + const OperandList& _operand_list, + Logger* _logger) + : key(_key), + existing_value(std::move(_existing_value)), + operand_list(_operand_list), + logger(_logger) {} + + // The user key, including the user-defined timestamp if applicable. + const Slice& key; + // The base value of the merge operation. Can be one of three things (see + // the ExistingValue variant above): no existing value, plain existing + // value, or wide-column existing value. + ExistingValue existing_value; + // The list of operands to apply. + const OperandList& operand_list; + // The logger to use in case a failure happens during the merge operation. + Logger* logger; + }; + + struct MergeOperationOutputV3 { + using NewColumns = std::vector>; + using NewValue = std::variant; + + // The result of the merge operation. Can be one of three things (see the + // NewValue variant above): a new plain value, a new wide-column value, or + // an existing merge operand. + NewValue new_value; + // The scope of the failure if applicable. See above for more details. + OpFailureScope op_failure_scope = OpFailureScope::kDefault; + }; + + // ************************** UNDER CONSTRUCTION ***************************** + // An extended version of FullMergeV2() that supports wide columns on both the + // input and the output side, enabling the application to perform general + // transformations during merges. For backward compatibility, the default + // implementation calls FullMergeV2(). Specifically, if there is no base value + // or the base value is a plain key-value, the default implementation falls + // back to FullMergeV2(). If the base value is a wide-column entity, the + // default implementation invokes FullMergeV2() to perform the merge on the + // default column, and leaves any other columns unchanged. + virtual bool FullMergeV3(const MergeOperationInputV3& merge_in, + MergeOperationOutputV3* merge_out) const; + // This function performs merge(left_op, right_op) // when both the operands are themselves merge operation types // that you would have passed to a DB::Merge() call in the same order diff --git a/util/overload.h b/util/overload.h new file mode 100644 index 0000000000..428e805deb --- /dev/null +++ b/util/overload.h @@ -0,0 +1,23 @@ +// (c) Meta Platforms, Inc. and affiliates. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#include "rocksdb/rocksdb_namespace.h" + +namespace ROCKSDB_NAMESPACE { + +// A helper template that can combine multiple functors into a single one to be +// used with std::visit for example. It also works with lambdas, since it +// comes with an explicit deduction guide. +template +struct overload : Ts... { + using Ts::operator()...; +}; + +template +overload(Ts...) -> overload; + +} // namespace ROCKSDB_NAMESPACE