mirror of https://github.com/facebook/rocksdb.git
Eliminate std::deque initialization while iterating over merge operands
Summary: This patch is similar to D52563, When we iterate over a DB with merge operands we keep creating std::queue to store the operands, optimize this by reusing merge_operands_ data member Before the patch ``` ./db_bench --benchmarks="mergerandom,readseq,readseq,readseq,readseq" --db="/dev/shm/bench_merge_memcpy_on_the_fly/" --merge_operator="put" --merge_keys=10000 --num=10000 DB path: [/dev/shm/bench_merge_memcpy_on_the_fly/] mergerandom : 3.757 micros/op 266141 ops/sec; 29.4 MB/s ( updates:10000) DB path: [/dev/shm/bench_merge_memcpy_on_the_fly/] readseq : 0.413 micros/op 2423538 ops/sec; 268.1 MB/s DB path: [/dev/shm/bench_merge_memcpy_on_the_fly/] readseq : 0.451 micros/op 2219071 ops/sec; 245.5 MB/s DB path: [/dev/shm/bench_merge_memcpy_on_the_fly/] readseq : 0.420 micros/op 2382039 ops/sec; 263.5 MB/s DB path: [/dev/shm/bench_merge_memcpy_on_the_fly/] readseq : 0.408 micros/op 2452017 ops/sec; 271.3 MB/s DB path: [/dev/shm/bench_merge_memcpy_on_the_fly/] mergerandom : 3.947 micros/op 253376 ops/sec; 28.0 MB/s ( updates:10000) DB path: [/dev/shm/bench_merge_memcpy_on_the_fly/] readseq : 0.441 micros/op 2266473 ops/sec; 250.7 MB/s DB path: [/dev/shm/bench_merge_memcpy_on_the_fly/] readseq : 0.471 micros/op 2122033 ops/sec; 234.8 MB/s DB path: [/dev/shm/bench_merge_memcpy_on_the_fly/] readseq : 0.440 micros/op 2271407 ops/sec; 251.3 MB/s DB path: [/dev/shm/bench_merge_memcpy_on_the_fly/] readseq : 0.429 micros/op 2331471 ops/sec; 257.9 MB/s ``` with the patch ``` ./db_bench --benchmarks="mergerandom,readseq,readseq,readseq,readseq" --db="/dev/shm/bench_merge_memcpy_on_the_fly/" --merge_operator="put" --merge_keys=10000 --num=10000 DB path: [/dev/shm/bench_merge_memcpy_on_the_fly/] mergerandom : 4.080 micros/op 245092 ops/sec; 27.1 MB/s ( updates:10000) DB path: [/dev/shm/bench_merge_memcpy_on_the_fly/] readseq : 0.308 micros/op 3241843 ops/sec; 358.6 MB/s DB path: [/dev/shm/bench_merge_memcpy_on_the_fly/] readseq : 0.312 micros/op 3200408 ops/sec; 354.0 MB/s DB path: [/dev/shm/bench_merge_memcpy_on_the_fly/] readseq : 0.332 micros/op 3013962 ops/sec; 333.4 MB/s DB path: [/dev/shm/bench_merge_memcpy_on_the_fly/] readseq : 0.300 micros/op 3328017 ops/sec; 368.2 MB/s DB path: [/dev/shm/bench_merge_memcpy_on_the_fly/] mergerandom : 3.973 micros/op 251705 ops/sec; 27.8 MB/s ( updates:10000) DB path: [/dev/shm/bench_merge_memcpy_on_the_fly/] readseq : 0.320 micros/op 3123752 ops/sec; 345.6 MB/s DB path: [/dev/shm/bench_merge_memcpy_on_the_fly/] readseq : 0.335 micros/op 2986641 ops/sec; 330.4 MB/s DB path: [/dev/shm/bench_merge_memcpy_on_the_fly/] readseq : 0.339 micros/op 2950047 ops/sec; 326.4 MB/s DB path: [/dev/shm/bench_merge_memcpy_on_the_fly/] readseq : 0.319 micros/op 3131565 ops/sec; 346.4 MB/s ``` Test Plan: make check -j64 Reviewers: yhchiang, andrewkr, sdong Reviewed By: sdong Subscribers: andrewkr, dhruba Differential Revision: https://reviews.facebook.net/D56031
This commit is contained in:
parent
f38540b12a
commit
8a1a603fdb
|
@ -15,6 +15,7 @@
|
|||
|
||||
#include "db/dbformat.h"
|
||||
#include "db/filename.h"
|
||||
#include "db/merge_context.h"
|
||||
#include "port/port.h"
|
||||
#include "rocksdb/env.h"
|
||||
#include "rocksdb/iterator.h"
|
||||
|
@ -251,7 +252,7 @@ class DBIter: public Iterator {
|
|||
bool prefix_same_as_start_;
|
||||
bool iter_pinned_;
|
||||
// List of operands for merge operator.
|
||||
std::deque<std::string> merge_operands_;
|
||||
MergeContext merge_context_;
|
||||
LocalStatistics local_stats_;
|
||||
|
||||
// No copying allowed
|
||||
|
@ -411,9 +412,9 @@ void DBIter::MergeValuesNewToOld() {
|
|||
return;
|
||||
}
|
||||
|
||||
merge_context_.Clear();
|
||||
// Start the merge process by pushing the first operand
|
||||
std::deque<std::string> operands;
|
||||
operands.push_front(iter_->value().ToString());
|
||||
merge_context_.PushOperand(iter_->value());
|
||||
|
||||
ParsedInternalKey ikey;
|
||||
for (iter_->Next(); iter_->Valid(); iter_->Next()) {
|
||||
|
@ -438,7 +439,8 @@ void DBIter::MergeValuesNewToOld() {
|
|||
{
|
||||
StopWatchNano timer(env_, statistics_ != nullptr);
|
||||
PERF_TIMER_GUARD(merge_operator_time_nanos);
|
||||
user_merge_operator_->FullMerge(ikey.user_key, &val, operands,
|
||||
user_merge_operator_->FullMerge(ikey.user_key, &val,
|
||||
merge_context_.GetOperands(),
|
||||
&saved_value_, logger_);
|
||||
RecordTick(statistics_, MERGE_OPERATION_TOTAL_TIME,
|
||||
timer.ElapsedNanos());
|
||||
|
@ -450,7 +452,7 @@ void DBIter::MergeValuesNewToOld() {
|
|||
// hit a merge, add the value as an operand and run associative merge.
|
||||
// when complete, add result to operands and continue.
|
||||
const Slice& val = iter_->value();
|
||||
operands.push_front(val.ToString());
|
||||
merge_context_.PushOperand(val);
|
||||
} else {
|
||||
assert(false);
|
||||
}
|
||||
|
@ -463,8 +465,9 @@ void DBIter::MergeValuesNewToOld() {
|
|||
// a deletion marker.
|
||||
// feed null as the existing value to the merge operator, such that
|
||||
// client can differentiate this scenario and do things accordingly.
|
||||
user_merge_operator_->FullMerge(saved_key_.GetKey(), nullptr, operands,
|
||||
&saved_value_, logger_);
|
||||
user_merge_operator_->FullMerge(saved_key_.GetKey(), nullptr,
|
||||
merge_context_.GetOperands(), &saved_value_,
|
||||
logger_);
|
||||
RecordTick(statistics_, MERGE_OPERATION_TOTAL_TIME, timer.ElapsedNanos());
|
||||
}
|
||||
}
|
||||
|
@ -556,7 +559,7 @@ void DBIter::PrevInternal() {
|
|||
// saved_value_
|
||||
bool DBIter::FindValueForCurrentKey() {
|
||||
assert(iter_->Valid());
|
||||
merge_operands_.clear();
|
||||
merge_context_.Clear();
|
||||
// last entry before merge (could be kTypeDeletion, kTypeSingleDeletion or
|
||||
// kTypeValue)
|
||||
ValueType last_not_merge_type = kTypeDeletion;
|
||||
|
@ -576,19 +579,19 @@ bool DBIter::FindValueForCurrentKey() {
|
|||
last_key_entry_type = ikey.type;
|
||||
switch (last_key_entry_type) {
|
||||
case kTypeValue:
|
||||
merge_operands_.clear();
|
||||
merge_context_.Clear();
|
||||
saved_value_ = iter_->value().ToString();
|
||||
last_not_merge_type = kTypeValue;
|
||||
break;
|
||||
case kTypeDeletion:
|
||||
case kTypeSingleDeletion:
|
||||
merge_operands_.clear();
|
||||
merge_context_.Clear();
|
||||
last_not_merge_type = last_key_entry_type;
|
||||
PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
|
||||
break;
|
||||
case kTypeMerge:
|
||||
assert(user_merge_operator_ != nullptr);
|
||||
merge_operands_.push_back(iter_->value().ToString());
|
||||
merge_context_.PushOperandBack(iter_->value());
|
||||
break;
|
||||
default:
|
||||
assert(false);
|
||||
|
@ -611,8 +614,8 @@ bool DBIter::FindValueForCurrentKey() {
|
|||
StopWatchNano timer(env_, statistics_ != nullptr);
|
||||
PERF_TIMER_GUARD(merge_operator_time_nanos);
|
||||
user_merge_operator_->FullMerge(saved_key_.GetKey(), nullptr,
|
||||
merge_operands_, &saved_value_,
|
||||
logger_);
|
||||
merge_context_.GetOperands(),
|
||||
&saved_value_, logger_);
|
||||
RecordTick(statistics_, MERGE_OPERATION_TOTAL_TIME,
|
||||
timer.ElapsedNanos());
|
||||
} else {
|
||||
|
@ -623,8 +626,8 @@ bool DBIter::FindValueForCurrentKey() {
|
|||
StopWatchNano timer(env_, statistics_ != nullptr);
|
||||
PERF_TIMER_GUARD(merge_operator_time_nanos);
|
||||
user_merge_operator_->FullMerge(saved_key_.GetKey(), &temp_slice,
|
||||
merge_operands_, &saved_value_,
|
||||
logger_);
|
||||
merge_context_.GetOperands(),
|
||||
&saved_value_, logger_);
|
||||
RecordTick(statistics_, MERGE_OPERATION_TOTAL_TIME,
|
||||
timer.ElapsedNanos());
|
||||
}
|
||||
|
@ -667,11 +670,11 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() {
|
|||
|
||||
// kTypeMerge. We need to collect all kTypeMerge values and save them
|
||||
// in operands
|
||||
std::deque<std::string> operands;
|
||||
merge_context_.Clear();
|
||||
while (iter_->Valid() &&
|
||||
user_comparator_->Equal(ikey.user_key, saved_key_.GetKey()) &&
|
||||
ikey.type == kTypeMerge) {
|
||||
operands.push_front(iter_->value().ToString());
|
||||
merge_context_.PushOperand(iter_->value());
|
||||
iter_->Next();
|
||||
FindParseableKey(&ikey, kForward);
|
||||
}
|
||||
|
@ -682,7 +685,8 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() {
|
|||
{
|
||||
StopWatchNano timer(env_, statistics_ != nullptr);
|
||||
PERF_TIMER_GUARD(merge_operator_time_nanos);
|
||||
user_merge_operator_->FullMerge(saved_key_.GetKey(), nullptr, operands,
|
||||
user_merge_operator_->FullMerge(saved_key_.GetKey(), nullptr,
|
||||
merge_context_.GetOperands(),
|
||||
&saved_value_, logger_);
|
||||
RecordTick(statistics_, MERGE_OPERATION_TOTAL_TIME, timer.ElapsedNanos());
|
||||
}
|
||||
|
@ -700,8 +704,9 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() {
|
|||
{
|
||||
StopWatchNano timer(env_, statistics_ != nullptr);
|
||||
PERF_TIMER_GUARD(merge_operator_time_nanos);
|
||||
user_merge_operator_->FullMerge(saved_key_.GetKey(), &val, operands,
|
||||
&saved_value_, logger_);
|
||||
user_merge_operator_->FullMerge(saved_key_.GetKey(), &val,
|
||||
merge_context_.GetOperands(), &saved_value_,
|
||||
logger_);
|
||||
RecordTick(statistics_, MERGE_OPERATION_TOTAL_TIME, timer.ElapsedNanos());
|
||||
}
|
||||
valid_ = true;
|
||||
|
|
|
@ -37,6 +37,11 @@ public:
|
|||
Initialize();
|
||||
operand_list->push_front(operand_slice.ToString());
|
||||
}
|
||||
// Push back a merge operand
|
||||
void PushOperandBack(const Slice& operand_slice) {
|
||||
Initialize();
|
||||
operand_list->push_back(operand_slice.ToString());
|
||||
}
|
||||
// return total number of operands in the list
|
||||
size_t GetNumOperands() const {
|
||||
if (!operand_list) {
|
||||
|
|
Loading…
Reference in New Issue