Handle Merges correctly in GetEntity (#10894)

Summary:
The PR fixes the handling of `Merge`s in `GetEntity`. Note that `Merge` is not yet
supported for wide-column entities written using `PutEntity`; this change is
about returning correct (i.e. consistent with `Get`) results in cases like when the
base value is a plain old key-value written using `Put` or when there is no real base
value because we hit either a tombstone or the beginning of history.

Implementation-wise, the patch introduces a new wrapper around the existing
`MergeHelper::TimedFullMerge` that can store the merge result in either a string
(for the purposes of `Get`) or a `PinnableWideColumns` instance (for `GetEntity`).

Pull Request resolved: https://github.com/facebook/rocksdb/pull/10894

Test Plan: `make check`

Reviewed By: riversand963

Differential Revision: D40782708

Pulled By: ltamasi

fbshipit-source-id: 3d700d56b2ef81f02ba1e2d93f6481bf13abcc90
This commit is contained in:
Levi Tamasi 2022-10-28 10:48:51 -07:00 committed by Facebook GitHub Bot
parent 1e6f1ef894
commit 7867a1112b
6 changed files with 218 additions and 34 deletions

View File

@ -1064,20 +1064,10 @@ static bool SaveValue(void* arg, const char* entry) {
assert(s->do_merge);
if (s->value || s->columns) {
std::string result;
*(s->status) = MergeHelper::TimedFullMerge(
merge_operator, s->key->user_key(), &v,
merge_context->GetOperands(), &result, s->logger, s->statistics,
s->clock, nullptr /* result_operand */, true);
if (s->status->ok()) {
if (s->value) {
*(s->value) = std::move(result);
} else {
assert(s->columns);
s->columns->SetPlainValue(result);
}
}
merge_context->GetOperands(), s->value, s->columns, s->logger,
s->statistics, s->clock, nullptr /* result_operand */, true);
}
} else if (s->value) {
s->value->assign(v.data(), v.size());
@ -1148,10 +1138,10 @@ static bool SaveValue(void* arg, const char* entry) {
case kTypeSingleDeletion:
case kTypeRangeDeletion: {
if (*(s->merge_in_progress)) {
if (s->value != nullptr) {
if (s->value || s->columns) {
*(s->status) = MergeHelper::TimedFullMerge(
merge_operator, s->key->user_key(), nullptr,
merge_context->GetOperands(), s->value, s->logger,
merge_context->GetOperands(), s->value, s->columns, s->logger,
s->statistics, s->clock, nullptr /* result_operand */, true);
}
} else {
@ -1177,10 +1167,13 @@ static bool SaveValue(void* arg, const char* entry) {
v, s->inplace_update_support == false /* operand_pinned */);
if (s->do_merge && merge_operator->ShouldMerge(
merge_context->GetOperandsDirectionBackward())) {
*(s->status) = MergeHelper::TimedFullMerge(
merge_operator, s->key->user_key(), nullptr,
merge_context->GetOperands(), s->value, s->logger, s->statistics,
s->clock, nullptr /* result_operand */, true);
if (s->value || s->columns) {
*(s->status) = MergeHelper::TimedFullMerge(
merge_operator, s->key->user_key(), nullptr,
merge_context->GetOperands(), s->value, s->columns, s->logger,
s->statistics, s->clock, nullptr /* result_operand */, true);
}
*(s->found_final_value) = true;
return false;
}

View File

@ -110,6 +110,36 @@ Status MergeHelper::TimedFullMerge(const MergeOperator* merge_operator,
return Status::OK();
}
Status MergeHelper::TimedFullMerge(const MergeOperator* merge_operator,
const Slice& key, const Slice* base_value,
const std::vector<Slice>& operands,
std::string* value,
PinnableWideColumns* columns, Logger* logger,
Statistics* statistics, SystemClock* clock,
Slice* result_operand,
bool update_num_ops_stats) {
assert(value || columns);
assert(!value || !columns);
std::string result;
const Status s =
TimedFullMerge(merge_operator, key, base_value, operands, &result, logger,
statistics, clock, result_operand, update_num_ops_stats);
if (!s.ok()) {
return s;
}
if (value) {
*value = std::move(result);
return Status::OK();
}
assert(columns);
columns->SetPlainValue(result);
return Status::OK();
}
// PRE: iter points to the first merge type entry
// POST: iter points to the first entry beyond the merge process (or the end)
// keys_, operands_ are updated to reflect the merge result.

View File

@ -56,6 +56,15 @@ class MergeHelper {
Slice* result_operand = nullptr,
bool update_num_ops_stats = false);
static Status TimedFullMerge(const MergeOperator* merge_operator,
const Slice& key, const Slice* base_value,
const std::vector<Slice>& operands,
std::string* value, PinnableWideColumns* columns,
Logger* logger, Statistics* statistics,
SystemClock* clock,
Slice* result_operand = nullptr,
bool update_num_ops_stats = false);
// Merge entries until we hit
// - a corrupted key
// - a Put/Delete,

View File

@ -2403,12 +2403,16 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k,
// merge_operands are in saver and we hit the beginning of the key history
// do a final merge of nullptr and operands;
std::string* str_value = value != nullptr ? value->GetSelf() : nullptr;
*status = MergeHelper::TimedFullMerge(
merge_operator_, user_key, nullptr, merge_context->GetOperands(),
str_value, info_log_, db_statistics_, clock_,
nullptr /* result_operand */, true);
if (LIKELY(value != nullptr)) {
value->PinSelf();
if (str_value || columns) {
*status = MergeHelper::TimedFullMerge(
merge_operator_, user_key, nullptr, merge_context->GetOperands(),
str_value, columns, info_log_, db_statistics_, clock_,
nullptr /* result_operand */, true);
if (status->ok()) {
if (LIKELY(value != nullptr)) {
value->PinSelf();
}
}
}
} else {
if (key_exists != nullptr) {

View File

@ -209,6 +209,148 @@ TEST_F(DBWideBasicTest, PutEntityColumnFamily) {
ASSERT_OK(db_->Write(WriteOptions(), &batch));
}
TEST_F(DBWideBasicTest, MergePlainKeyValue) {
Options options = GetDefaultOptions();
options.create_if_missing = true;
options.merge_operator = MergeOperators::CreateStringAppendOperator();
Reopen(options);
// Put + Merge
constexpr char first_key[] = "first";
constexpr char first_base_value[] = "hello";
constexpr char first_merge_op[] = "world";
// Delete + Merge
constexpr char second_key[] = "second";
constexpr char second_merge_op[] = "foo";
// Merge without any preceding KV
constexpr char third_key[] = "third";
constexpr char third_merge_op[] = "bar";
auto write_base = [&]() {
// Write "base" KVs: a Put for the 1st key and a Delete for the 2nd one;
// note there is no "base" KV for the 3rd
ASSERT_OK(db_->Put(WriteOptions(), db_->DefaultColumnFamily(), first_key,
first_base_value));
ASSERT_OK(
db_->Delete(WriteOptions(), db_->DefaultColumnFamily(), second_key));
};
auto write_merge = [&]() {
// Write Merge operands
ASSERT_OK(db_->Merge(WriteOptions(), db_->DefaultColumnFamily(), first_key,
first_merge_op));
ASSERT_OK(db_->Merge(WriteOptions(), db_->DefaultColumnFamily(), second_key,
second_merge_op));
ASSERT_OK(db_->Merge(WriteOptions(), db_->DefaultColumnFamily(), third_key,
third_merge_op));
};
const std::string expected_first_column(std::string(first_base_value) + "," +
first_merge_op);
const WideColumns expected_first_columns{
{kDefaultWideColumnName, expected_first_column}};
const WideColumns expected_second_columns{
{kDefaultWideColumnName, second_merge_op}};
const WideColumns expected_third_columns{
{kDefaultWideColumnName, third_merge_op}};
auto verify = [&]() {
{
PinnableWideColumns result;
ASSERT_OK(db_->GetEntity(ReadOptions(), db_->DefaultColumnFamily(),
first_key, &result));
ASSERT_EQ(result.columns(), expected_first_columns);
}
{
PinnableWideColumns result;
ASSERT_OK(db_->GetEntity(ReadOptions(), db_->DefaultColumnFamily(),
second_key, &result));
ASSERT_EQ(result.columns(), expected_second_columns);
}
{
PinnableWideColumns result;
ASSERT_OK(db_->GetEntity(ReadOptions(), db_->DefaultColumnFamily(),
third_key, &result));
ASSERT_EQ(result.columns(), expected_third_columns);
}
{
std::unique_ptr<Iterator> iter(db_->NewIterator(ReadOptions()));
iter->SeekToFirst();
ASSERT_TRUE(iter->Valid());
ASSERT_OK(iter->status());
ASSERT_EQ(iter->key(), first_key);
ASSERT_EQ(iter->value(), expected_first_columns[0].value());
ASSERT_EQ(iter->columns(), expected_first_columns);
iter->Next();
ASSERT_TRUE(iter->Valid());
ASSERT_OK(iter->status());
ASSERT_EQ(iter->key(), second_key);
ASSERT_EQ(iter->value(), expected_second_columns[0].value());
ASSERT_EQ(iter->columns(), expected_second_columns);
iter->Next();
ASSERT_TRUE(iter->Valid());
ASSERT_OK(iter->status());
ASSERT_EQ(iter->key(), third_key);
ASSERT_EQ(iter->value(), expected_third_columns[0].value());
ASSERT_EQ(iter->columns(), expected_third_columns);
iter->Next();
ASSERT_FALSE(iter->Valid());
ASSERT_OK(iter->status());
iter->SeekToLast();
ASSERT_TRUE(iter->Valid());
ASSERT_OK(iter->status());
ASSERT_EQ(iter->key(), third_key);
ASSERT_EQ(iter->value(), expected_third_columns[0].value());
ASSERT_EQ(iter->columns(), expected_third_columns);
iter->Prev();
ASSERT_TRUE(iter->Valid());
ASSERT_OK(iter->status());
ASSERT_EQ(iter->key(), second_key);
ASSERT_EQ(iter->value(), expected_second_columns[0].value());
ASSERT_EQ(iter->columns(), expected_second_columns);
iter->Prev();
ASSERT_TRUE(iter->Valid());
ASSERT_OK(iter->status());
ASSERT_EQ(iter->key(), first_key);
ASSERT_EQ(iter->value(), expected_first_columns[0].value());
ASSERT_EQ(iter->columns(), expected_first_columns);
iter->Prev();
ASSERT_FALSE(iter->Valid());
ASSERT_OK(iter->status());
}
};
// Base KVs (if any) and Merge operands both in memtable
write_base();
write_merge();
verify();
// Base KVs (if any) and Merge operands both in storage
ASSERT_OK(Flush());
verify();
// Base KVs (if any) in storage, Merge operands in memtable
DestroyAndReopen(options);
write_base();
ASSERT_OK(Flush());
write_merge();
verify();
}
TEST_F(DBWideBasicTest, PutEntityMergeNotSupported) {
Options options = GetDefaultOptions();
options.merge_operator = MergeOperators::CreateStringAppendOperator();

View File

@ -407,7 +407,9 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key,
state_ = kDeleted;
} else if (kMerge == state_) {
state_ = kFound;
Merge(nullptr);
if (do_merge_) {
Merge(nullptr);
}
// If do_merge_ = false then the current value shouldn't be part of
// merge_context_->operand_list
}
@ -438,16 +440,20 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key,
}
void GetContext::Merge(const Slice* value) {
assert(do_merge_);
assert(!pinnable_val_ || !columns_);
const Status s = MergeHelper::TimedFullMerge(
merge_operator_, user_key_, value, merge_context_->GetOperands(),
pinnable_val_ ? pinnable_val_->GetSelf() : nullptr, columns_, logger_,
statistics_, clock_);
if (!s.ok()) {
state_ = kCorrupt;
return;
}
if (LIKELY(pinnable_val_ != nullptr)) {
if (do_merge_) {
Status merge_status = MergeHelper::TimedFullMerge(
merge_operator_, user_key_, value, merge_context_->GetOperands(),
pinnable_val_->GetSelf(), logger_, statistics_, clock_);
pinnable_val_->PinSelf();
if (!merge_status.ok()) {
state_ = kCorrupt;
}
}
pinnable_val_->PinSelf();
}
}