Introduce a wide column aware MergeOperator API (#11807)

Summary:
Pull Request resolved: https://github.com/facebook/rocksdb/pull/11807

For now, RocksDB has limited support for using merge with wide columns: when a bunch of merge operands have to be applied to a wide-column base value, RocksDB currently passes only the value of the default column to the application's `MergeOperator`, which means there is no way to update any other columns during a merge. As a first step in making this more general, the patch adds a new API `FullMergeV3` to `MergeOperator`.

`FullMergeV3`'s interface enables applications to receive a plain, wide-column, or non-existent base value as merge input, and to produce a new plain value, a new wide-column value, or an existing operand as merge output. Note that there are no limitations on the column names and values if the merge result is a wide-column entity. Also, the interface is general in the sense that it makes it possible e.g. for a merge that takes a plain base value and some deltas to produce a wide-column entity as a result.

For backward compatibility, the default implementation of `FullMergeV3` falls back to `FullMergeV2` and implements the current logic where merge operands are applied to the default column of the base entity and any other columns are unchanged. (Note that with `FullMergeV3` in the `MergeOperator` interface, this behavior will become customizable.)

This patch just introduces the new API and the default backward compatible implementation. I plan to integrate `FullMergeV3` into the query and compaction logic in subsequent diffs.

Reviewed By: jaykorean

Differential Revision: D49117253

fbshipit-source-id: 109e016f25cd130fc504790818d927bae7fec6bd
This commit is contained in:
Levi Tamasi 2023-09-11 12:13:58 -07:00 committed by Facebook GitHub Bot
parent ed5b6c0d99
commit 760ea373a8
4 changed files with 424 additions and 0 deletions

View File

@ -9,6 +9,10 @@
#include "rocksdb/merge_operator.h"
#include <type_traits>
#include "util/overload.h"
namespace ROCKSDB_NAMESPACE {
bool MergeOperator::FullMergeV2(const MergeOperationInput& merge_in,
@ -23,6 +27,84 @@ bool MergeOperator::FullMergeV2(const MergeOperationInput& merge_in,
&merge_out->new_value, merge_in.logger);
}
bool MergeOperator::FullMergeV3(const MergeOperationInputV3& merge_in,
MergeOperationOutputV3* merge_out) const {
assert(merge_out);
MergeOperationInput in_v2(merge_in.key, nullptr, merge_in.operand_list,
merge_in.logger);
std::string new_value;
Slice existing_operand(nullptr, 0);
MergeOperationOutput out_v2(new_value, existing_operand);
return std::visit(
overload{
[&](const auto& existing) -> bool {
using T = std::decay_t<decltype(existing)>;
if constexpr (std::is_same_v<T, Slice>) {
in_v2.existing_value = &existing;
}
const bool result = FullMergeV2(in_v2, &out_v2);
if (!result) {
merge_out->op_failure_scope = out_v2.op_failure_scope;
return false;
}
if (existing_operand.data()) {
merge_out->new_value = existing_operand;
} else {
merge_out->new_value = std::move(new_value);
}
return true;
},
[&](const WideColumns& existing_columns) -> bool {
const bool has_default_column =
!existing_columns.empty() &&
existing_columns.front().name() == kDefaultWideColumnName;
Slice value_of_default;
if (has_default_column) {
value_of_default = existing_columns.front().value();
}
in_v2.existing_value = &value_of_default;
const bool result = FullMergeV2(in_v2, &out_v2);
if (!result) {
merge_out->op_failure_scope = out_v2.op_failure_scope;
return false;
}
merge_out->new_value = MergeOperationOutputV3::NewColumns();
auto& new_columns = std::get<MergeOperationOutputV3::NewColumns>(
merge_out->new_value);
new_columns.reserve(has_default_column
? existing_columns.size()
: (existing_columns.size() + 1));
if (existing_operand.data()) {
new_columns.emplace_back(kDefaultWideColumnName.ToString(),
existing_operand.ToString());
} else {
new_columns.emplace_back(kDefaultWideColumnName.ToString(),
std::move(new_value));
}
for (size_t i = has_default_column ? 1 : 0;
i < existing_columns.size(); ++i) {
new_columns.emplace_back(existing_columns[i].name().ToString(),
existing_columns[i].value().ToString());
}
return true;
}},
merge_in.existing_value);
}
// The default implementation of PartialMergeMulti, which invokes
// PartialMerge multiple times internally and merges two operands at
// a time.

View File

@ -18,6 +18,7 @@
#include "rocksdb/env.h"
#include "rocksdb/merge_operator.h"
#include "rocksdb/utilities/db_ttl.h"
#include "rocksdb/wide_columns.h"
#include "test_util/testharness.h"
#include "util/coding.h"
#include "utilities/merge_operators.h"
@ -607,6 +608,272 @@ TEST_F(MergeTest, MergeWithCompactionAndFlush) {
ASSERT_OK(DestroyDB(dbname, Options()));
}
TEST_F(MergeTest, FullMergeV3FallbackNewValue) {
// Test that the default FullMergeV3 implementation correctly handles the case
// when FullMergeV2 results in a new value.
const Slice key("foo");
const MergeOperator::MergeOperationInputV3::OperandList operands{
"first", "second", "third"};
constexpr Logger* logger = nullptr;
auto append_operator =
MergeOperators::CreateStringAppendOperator(std::string());
// No existing value
{
MergeOperator::MergeOperationInputV3::ExistingValue existing_value;
const MergeOperator::MergeOperationInputV3 merge_in(
key, std::move(existing_value), operands, logger);
MergeOperator::MergeOperationOutputV3 merge_out;
ASSERT_TRUE(append_operator->FullMergeV3(merge_in, &merge_out));
const auto& result = std::get<std::string>(merge_out.new_value);
ASSERT_EQ(result, operands[0].ToString() + operands[1].ToString() +
operands[2].ToString());
}
// Plain existing value
{
const Slice plain("plain");
MergeOperator::MergeOperationInputV3::ExistingValue existing_value(plain);
const MergeOperator::MergeOperationInputV3 merge_in(
key, std::move(existing_value), operands, logger);
MergeOperator::MergeOperationOutputV3 merge_out;
ASSERT_TRUE(append_operator->FullMergeV3(merge_in, &merge_out));
const auto& result = std::get<std::string>(merge_out.new_value);
ASSERT_EQ(result, plain.ToString() + operands[0].ToString() +
operands[1].ToString() + operands[2].ToString());
}
// Wide-column existing value with default column
{
const WideColumns entity{
{kDefaultWideColumnName, "default"}, {"one", "1"}, {"two", "2"}};
MergeOperator::MergeOperationInputV3::ExistingValue existing_value(entity);
const MergeOperator::MergeOperationInputV3 merge_in(
key, std::move(existing_value), operands, logger);
MergeOperator::MergeOperationOutputV3 merge_out;
ASSERT_TRUE(append_operator->FullMergeV3(merge_in, &merge_out));
const auto& result =
std::get<MergeOperator::MergeOperationOutputV3::NewColumns>(
merge_out.new_value);
ASSERT_EQ(result.size(), entity.size());
ASSERT_EQ(result[0].first, entity[0].name());
ASSERT_EQ(result[0].second,
entity[0].value().ToString() + operands[0].ToString() +
operands[1].ToString() + operands[2].ToString());
ASSERT_EQ(result[1].first, entity[1].name());
ASSERT_EQ(result[1].second, entity[1].value());
ASSERT_EQ(result[2].first, entity[2].name());
ASSERT_EQ(result[2].second, entity[2].value());
}
// Wide-column existing value without default column
{
const WideColumns entity{{"one", "1"}, {"two", "2"}};
MergeOperator::MergeOperationInputV3::ExistingValue existing_value(entity);
const MergeOperator::MergeOperationInputV3 merge_in(
key, std::move(existing_value), operands, logger);
MergeOperator::MergeOperationOutputV3 merge_out;
ASSERT_TRUE(append_operator->FullMergeV3(merge_in, &merge_out));
const auto& result =
std::get<MergeOperator::MergeOperationOutputV3::NewColumns>(
merge_out.new_value);
ASSERT_EQ(result.size(), entity.size() + 1);
ASSERT_EQ(result[0].first, kDefaultWideColumnName);
ASSERT_EQ(result[0].second, operands[0].ToString() +
operands[1].ToString() +
operands[2].ToString());
ASSERT_EQ(result[1].first, entity[0].name());
ASSERT_EQ(result[1].second, entity[0].value());
ASSERT_EQ(result[2].first, entity[1].name());
ASSERT_EQ(result[2].second, entity[1].value());
}
}
TEST_F(MergeTest, FullMergeV3FallbackExistingOperand) {
// Test that the default FullMergeV3 implementation correctly handles the case
// when FullMergeV2 results in an existing operand.
const Slice key("foo");
const MergeOperator::MergeOperationInputV3::OperandList operands{
"first", "second", "third"};
constexpr Logger* logger = nullptr;
auto put_operator = MergeOperators::CreatePutOperator();
// No existing value
{
MergeOperator::MergeOperationInputV3::ExistingValue existing_value;
const MergeOperator::MergeOperationInputV3 merge_in(
key, std::move(existing_value), operands, logger);
MergeOperator::MergeOperationOutputV3 merge_out;
ASSERT_TRUE(put_operator->FullMergeV3(merge_in, &merge_out));
const auto& result = std::get<Slice>(merge_out.new_value);
ASSERT_EQ(result.data(), operands.back().data());
ASSERT_EQ(result.size(), operands.back().size());
}
// Plain existing value
{
const Slice plain("plain");
MergeOperator::MergeOperationInputV3::ExistingValue existing_value(plain);
const MergeOperator::MergeOperationInputV3 merge_in(
key, std::move(existing_value), operands, logger);
MergeOperator::MergeOperationOutputV3 merge_out;
ASSERT_TRUE(put_operator->FullMergeV3(merge_in, &merge_out));
const auto& result = std::get<Slice>(merge_out.new_value);
ASSERT_EQ(result.data(), operands.back().data());
ASSERT_EQ(result.size(), operands.back().size());
}
// Wide-column existing value with default column
{
const WideColumns entity{
{kDefaultWideColumnName, "default"}, {"one", "1"}, {"two", "2"}};
MergeOperator::MergeOperationInputV3::ExistingValue existing_value(entity);
const MergeOperator::MergeOperationInputV3 merge_in(
key, std::move(existing_value), operands, logger);
MergeOperator::MergeOperationOutputV3 merge_out;
ASSERT_TRUE(put_operator->FullMergeV3(merge_in, &merge_out));
const auto& result =
std::get<MergeOperator::MergeOperationOutputV3::NewColumns>(
merge_out.new_value);
ASSERT_EQ(result.size(), entity.size());
ASSERT_EQ(result[0].first, entity[0].name());
ASSERT_EQ(result[0].second, operands.back());
ASSERT_EQ(result[1].first, entity[1].name());
ASSERT_EQ(result[1].second, entity[1].value());
ASSERT_EQ(result[2].first, entity[2].name());
ASSERT_EQ(result[2].second, entity[2].value());
}
// Wide-column existing value without default column
{
const WideColumns entity{{"one", "1"}, {"two", "2"}};
MergeOperator::MergeOperationInputV3::ExistingValue existing_value(entity);
const MergeOperator::MergeOperationInputV3 merge_in(
key, std::move(existing_value), operands, logger);
MergeOperator::MergeOperationOutputV3 merge_out;
ASSERT_TRUE(put_operator->FullMergeV3(merge_in, &merge_out));
const auto& result =
std::get<MergeOperator::MergeOperationOutputV3::NewColumns>(
merge_out.new_value);
ASSERT_EQ(result.size(), entity.size() + 1);
ASSERT_EQ(result[0].first, kDefaultWideColumnName);
ASSERT_EQ(result[0].second, operands.back());
ASSERT_EQ(result[1].first, entity[0].name());
ASSERT_EQ(result[1].second, entity[0].value());
ASSERT_EQ(result[2].first, entity[1].name());
ASSERT_EQ(result[2].second, entity[1].value());
}
}
TEST_F(MergeTest, FullMergeV3FallbackFailure) {
// Test that the default FullMergeV3 implementation correctly handles the case
// when FullMergeV2 fails.
const Slice key("foo");
const MergeOperator::MergeOperationInputV3::OperandList operands{
"first", "second", "third"};
constexpr Logger* logger = nullptr;
class FailMergeOperator : public MergeOperator {
public:
bool FullMergeV2(const MergeOperationInput& /* merge_in */,
MergeOperationOutput* merge_out) const override {
assert(merge_out);
merge_out->op_failure_scope = OpFailureScope::kMustMerge;
return false;
}
const char* Name() const override { return "FailMergeOperator"; }
};
FailMergeOperator fail_operator;
// No existing value
{
MergeOperator::MergeOperationInputV3::ExistingValue existing_value;
const MergeOperator::MergeOperationInputV3 merge_in(
key, std::move(existing_value), operands, logger);
MergeOperator::MergeOperationOutputV3 merge_out;
ASSERT_FALSE(fail_operator.FullMergeV3(merge_in, &merge_out));
ASSERT_EQ(merge_out.op_failure_scope,
MergeOperator::OpFailureScope::kMustMerge);
}
// Plain existing value
{
const Slice plain("plain");
MergeOperator::MergeOperationInputV3::ExistingValue existing_value(plain);
const MergeOperator::MergeOperationInputV3 merge_in(
key, std::move(existing_value), operands, logger);
MergeOperator::MergeOperationOutputV3 merge_out;
ASSERT_FALSE(fail_operator.FullMergeV3(merge_in, &merge_out));
ASSERT_EQ(merge_out.op_failure_scope,
MergeOperator::OpFailureScope::kMustMerge);
}
// Wide-column existing value with default column
{
const WideColumns entity{
{kDefaultWideColumnName, "default"}, {"one", "1"}, {"two", "2"}};
MergeOperator::MergeOperationInputV3::ExistingValue existing_value(entity);
const MergeOperator::MergeOperationInputV3 merge_in(
key, std::move(existing_value), operands, logger);
MergeOperator::MergeOperationOutputV3 merge_out;
ASSERT_FALSE(fail_operator.FullMergeV3(merge_in, &merge_out));
ASSERT_EQ(merge_out.op_failure_scope,
MergeOperator::OpFailureScope::kMustMerge);
}
// Wide-column existing value without default column
{
const WideColumns entity{{"one", "1"}, {"two", "2"}};
MergeOperator::MergeOperationInputV3::ExistingValue existing_value(entity);
const MergeOperator::MergeOperationInputV3 merge_in(
key, std::move(existing_value), operands, logger);
MergeOperator::MergeOperationOutputV3 merge_out;
ASSERT_FALSE(fail_operator.FullMergeV3(merge_in, &merge_out));
ASSERT_EQ(merge_out.op_failure_scope,
MergeOperator::OpFailureScope::kMustMerge);
}
}
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {

View File

@ -8,10 +8,13 @@
#include <deque>
#include <memory>
#include <string>
#include <utility>
#include <variant>
#include <vector>
#include "rocksdb/customizable.h"
#include "rocksdb/slice.h"
#include "rocksdb/wide_columns.h"
namespace ROCKSDB_NAMESPACE {
@ -158,6 +161,55 @@ class MergeOperator : public Customizable {
virtual bool FullMergeV2(const MergeOperationInput& merge_in,
MergeOperationOutput* merge_out) const;
struct MergeOperationInputV3 {
using ExistingValue = std::variant<std::monostate, Slice, WideColumns>;
using OperandList = std::vector<Slice>;
explicit MergeOperationInputV3(const Slice& _key,
ExistingValue&& _existing_value,
const OperandList& _operand_list,
Logger* _logger)
: key(_key),
existing_value(std::move(_existing_value)),
operand_list(_operand_list),
logger(_logger) {}
// The user key, including the user-defined timestamp if applicable.
const Slice& key;
// The base value of the merge operation. Can be one of three things (see
// the ExistingValue variant above): no existing value, plain existing
// value, or wide-column existing value.
ExistingValue existing_value;
// The list of operands to apply.
const OperandList& operand_list;
// The logger to use in case a failure happens during the merge operation.
Logger* logger;
};
struct MergeOperationOutputV3 {
using NewColumns = std::vector<std::pair<std::string, std::string>>;
using NewValue = std::variant<std::string, NewColumns, Slice>;
// The result of the merge operation. Can be one of three things (see the
// NewValue variant above): a new plain value, a new wide-column value, or
// an existing merge operand.
NewValue new_value;
// The scope of the failure if applicable. See above for more details.
OpFailureScope op_failure_scope = OpFailureScope::kDefault;
};
// ************************** UNDER CONSTRUCTION *****************************
// An extended version of FullMergeV2() that supports wide columns on both the
// input and the output side, enabling the application to perform general
// transformations during merges. For backward compatibility, the default
// implementation calls FullMergeV2(). Specifically, if there is no base value
// or the base value is a plain key-value, the default implementation falls
// back to FullMergeV2(). If the base value is a wide-column entity, the
// default implementation invokes FullMergeV2() to perform the merge on the
// default column, and leaves any other columns unchanged.
virtual bool FullMergeV3(const MergeOperationInputV3& merge_in,
MergeOperationOutputV3* merge_out) const;
// This function performs merge(left_op, right_op)
// when both the operands are themselves merge operation types
// that you would have passed to a DB::Merge() call in the same order

23
util/overload.h Normal file
View File

@ -0,0 +1,23 @@
// (c) Meta Platforms, Inc. and affiliates.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#pragma once
#include "rocksdb/rocksdb_namespace.h"
namespace ROCKSDB_NAMESPACE {
// A helper template that can combine multiple functors into a single one to be
// used with std::visit for example. It also works with lambdas, since it
// comes with an explicit deduction guide.
template <typename... Ts>
struct overload : Ts... {
using Ts::operator()...;
};
template <typename... Ts>
overload(Ts...) -> overload<Ts...>;
} // namespace ROCKSDB_NAMESPACE