Add `ContinueCallback` to `GetMergeOperands()` (#12438)

Summary:
The use case is similar to `MergeOperator::ShouldMerge()` for `Get()`: preventing reads into LSM components for merge operands that are of no interest to the user. `MergeOperator::ShouldMerge()` cannot be reused here because:

- Its name does not make sense in the context of `GetMergeOperands()` since `GetMergeOperands()` never invokes merge
- The callback is part of the `MergeOperator`, but an option specific to the read operation makes more sense to me

If there are any ideas for an API design that covers both `MergeOperator::ShouldMerge()`'s use cases and `GetMergeOperandsOptions::continue_cb`'s use cases, that would be ideal, but for now this is what I came up with.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/12438

Reviewed By: hx235

Differential Revision: D54914669

Pulled By: ajkr

fbshipit-source-id: 5f3ff78d3890adc0b1b74bedf3921221930ce63a
This commit is contained in:
Andrew Kryczka 2024-03-15 12:25:49 -07:00 committed by Facebook GitHub Bot
parent c3c0cfc3a8
commit 3f5bd46a07
7 changed files with 145 additions and 4 deletions

View File

@ -2323,6 +2323,8 @@ Status DBImpl::GetImpl(const ReadOptions& read_options, const Slice& key,
// Prepare to store a list of merge operations if merge occurs. // Prepare to store a list of merge operations if merge occurs.
MergeContext merge_context; MergeContext merge_context;
merge_context.get_merge_operands_options =
get_impl_options.get_merge_operands_options;
SequenceNumber max_covering_tombstone_seq = 0; SequenceNumber max_covering_tombstone_seq = 0;
Status s; Status s;

View File

@ -430,6 +430,111 @@ TEST_F(DBMergeOperandTest, GetMergeOperandsLargeResultOptimization) {
} }
} }
TEST_F(DBMergeOperandTest, GetMergeOperandsShortCircuitInMemtable) {
const int kNumOperands = 10;
const int kNumOperandsToFetch = 5;
Options options = CurrentOptions();
options.merge_operator = MergeOperators::CreateStringAppendOperator();
DestroyAndReopen(options);
Random rnd(301);
std::vector<std::string> expected_merge_operands;
expected_merge_operands.reserve(kNumOperands);
for (int i = 0; i < kNumOperands; ++i) {
expected_merge_operands.emplace_back(rnd.RandomString(7 /* len */));
ASSERT_OK(Merge("key", expected_merge_operands.back()));
}
std::vector<PinnableSlice> merge_operands(kNumOperands);
GetMergeOperandsOptions merge_operands_info;
merge_operands_info.expected_max_number_of_operands = kNumOperands;
int num_fetched = 0;
merge_operands_info.continue_cb = [&](Slice /* value */) {
num_fetched++;
return num_fetched != kNumOperandsToFetch;
};
int num_merge_operands = 0;
ASSERT_OK(db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(),
"key", merge_operands.data(),
&merge_operands_info, &num_merge_operands));
ASSERT_EQ(kNumOperandsToFetch, num_merge_operands);
ASSERT_EQ(kNumOperandsToFetch, num_fetched);
for (int i = 0; i < kNumOperandsToFetch; ++i) {
ASSERT_EQ(expected_merge_operands[kNumOperands - kNumOperandsToFetch + i],
merge_operands[i]);
}
}
TEST_F(DBMergeOperandTest, GetMergeOperandsShortCircuitBaseValue) {
// The continuation callback doesn't need to be called on a base value because
// there's no remaining work to be saved.
Options options = CurrentOptions();
options.merge_operator = MergeOperators::CreateStringAppendOperator();
DestroyAndReopen(options);
Random rnd(301);
std::string expected_value = rnd.RandomString(7 /* len */);
ASSERT_OK(Put("key", expected_value));
std::vector<PinnableSlice> merge_operands(1);
GetMergeOperandsOptions merge_operands_info;
merge_operands_info.expected_max_number_of_operands = 1;
int num_fetched = 0;
merge_operands_info.continue_cb = [&num_fetched](Slice /* value */) {
num_fetched++;
return true;
};
int num_merge_operands = 0;
ASSERT_OK(db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(),
"key", merge_operands.data(),
&merge_operands_info, &num_merge_operands));
ASSERT_EQ(1, num_merge_operands);
ASSERT_EQ(0, num_fetched);
ASSERT_EQ(expected_value, merge_operands[0]);
}
TEST_F(DBMergeOperandTest, GetMergeOperandsShortCircuitInSst) {
const int kNumOperands = 10;
const int kNumOperandsToFetch = 5;
Options options = CurrentOptions();
options.disable_auto_compactions = true;
options.merge_operator = MergeOperators::CreateStringAppendOperator();
DestroyAndReopen(options);
Random rnd(301);
std::vector<std::string> expected_merge_operands;
expected_merge_operands.reserve(kNumOperands);
for (int i = 0; i < kNumOperands; ++i) {
expected_merge_operands.emplace_back(rnd.RandomString(7 /* len */));
ASSERT_OK(Merge("key", expected_merge_operands.back()));
ASSERT_OK(Flush());
}
std::vector<PinnableSlice> merge_operands(kNumOperands);
GetMergeOperandsOptions merge_operands_info;
merge_operands_info.expected_max_number_of_operands = kNumOperands;
int num_fetched = 0;
merge_operands_info.continue_cb = [&](Slice /* value */) {
num_fetched++;
return num_fetched != kNumOperandsToFetch;
};
int num_merge_operands = 0;
ASSERT_OK(db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(),
"key", merge_operands.data(),
&merge_operands_info, &num_merge_operands));
ASSERT_EQ(kNumOperandsToFetch, num_merge_operands);
ASSERT_EQ(kNumOperandsToFetch, num_fetched);
for (int i = 0; i < kNumOperandsToFetch; ++i) {
ASSERT_EQ(expected_merge_operands[kNumOperands - kNumOperandsToFetch + i],
merge_operands[i]);
}
}
TEST_F(DBMergeOperandTest, GetMergeOperandsBaseDeletionInImmMem) { TEST_F(DBMergeOperandTest, GetMergeOperandsBaseDeletionInImmMem) {
// In this test, "k1" has a MERGE in a mutable memtable on top of a base // In this test, "k1" has a MERGE in a mutable memtable on top of a base
// DELETE in an immutable memtable. // DELETE in an immutable memtable.

View File

@ -1207,6 +1207,14 @@ static bool SaveValue(void* arg, const char* entry) {
*(s->found_final_value) = true; *(s->found_final_value) = true;
return false; return false;
} }
if (merge_context->get_merge_operands_options != nullptr &&
merge_context->get_merge_operands_options->continue_cb != nullptr &&
!merge_context->get_merge_operands_options->continue_cb(v)) {
// We were told not to continue.
*(s->found_final_value) = true;
return false;
}
return true; return true;
} }
default: { default: {

View File

@ -9,6 +9,7 @@
#include <string> #include <string>
#include <vector> #include <vector>
#include "rocksdb/db.h"
#include "rocksdb/slice.h" #include "rocksdb/slice.h"
namespace ROCKSDB_NAMESPACE { namespace ROCKSDB_NAMESPACE {
@ -21,6 +22,8 @@ const std::vector<Slice> empty_operand_list;
// will be fetched from the context when issuing partial of full merge. // will be fetched from the context when issuing partial of full merge.
class MergeContext { class MergeContext {
public: public:
GetMergeOperandsOptions* get_merge_operands_options = nullptr;
// Clear all the operands // Clear all the operands
void Clear() { void Clear() {
if (operand_list_) { if (operand_list_) {

View File

@ -136,12 +136,26 @@ struct IngestExternalFileArg {
}; };
struct GetMergeOperandsOptions { struct GetMergeOperandsOptions {
using ContinueCallback = std::function<bool(Slice)>;
// A limit on the number of merge operands returned by the GetMergeOperands() // A limit on the number of merge operands returned by the GetMergeOperands()
// API. In contrast with ReadOptions::merge_operator_max_count, this is a hard // API. In contrast with ReadOptions::merge_operator_max_count, this is a hard
// limit: when it is exceeded, no merge operands will be returned and the // limit: when it is exceeded, no merge operands will be returned and the
// query will fail with an Incomplete status. See also the // query will fail with an Incomplete status. See also the
// DB::GetMergeOperands() API below. // DB::GetMergeOperands() API below.
int expected_max_number_of_operands = 0; int expected_max_number_of_operands = 0;
// `continue_cb` will be called after reading each merge operand, excluding
// any base value. Operands are read in order from newest to oldest. The
// operand value is provided as an argument.
//
// Returning false will end the lookup process at the merge operand on which
// `continue_cb` was just invoked. Returning true allows the lookup to
// continue.
//
// If it is nullptr, `GetMergeOperands()` will behave as if it always returned
// true (continue fetching merge operands until there are no more).
ContinueCallback continue_cb;
}; };
// A collections of table properties objects, where // A collections of table properties objects, where
@ -643,11 +657,12 @@ class DB {
} }
// Populates the `merge_operands` array with all the merge operands in the DB // Populates the `merge_operands` array with all the merge operands in the DB
// for `key`. The `merge_operands` array will be populated in the order of // for `key`, or a customizable suffix of merge operands when
// insertion. The number of entries populated in `merge_operands` will be // `GetMergeOperandsOptions::continue_cb` is set. The `merge_operands` array
// assigned to `*number_of_operands`. // will be populated in the order of insertion. The number of entries
// populated in `merge_operands` will be assigned to `*number_of_operands`.
// //
// If the number of merge operands in DB for `key` is greater than // If the number of merge operands to return for `key` is greater than
// `merge_operands_options.expected_max_number_of_operands`, // `merge_operands_options.expected_max_number_of_operands`,
// `merge_operands` is not populated and the return value is // `merge_operands` is not populated and the return value is
// `Status::Incomplete`. In that case, `*number_of_operands` will be assigned // `Status::Incomplete`. In that case, `*number_of_operands` will be assigned

View File

@ -467,6 +467,13 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key,
MergeWithNoBaseValue(); MergeWithNoBaseValue();
return false; return false;
} }
if (merge_context_->get_merge_operands_options != nullptr &&
merge_context_->get_merge_operands_options->continue_cb !=
nullptr &&
!merge_context_->get_merge_operands_options->continue_cb(value)) {
state_ = kFound;
return false;
}
return true; return true;
default: default:

View File

@ -0,0 +1 @@
* Added an option, `GetMergeOperandsOptions::continue_cb`, to give users the ability to end `GetMergeOperands()`'s lookup process before all merge operands were found.