Add readwhilemerging benchmark

Summary:
This is like readwhilewriting but uses Merge rather than Put in the writer thread.
I am using it for in-progress benchmarks. I don't think the other benchmarks for Merge
cover this behavior. The purpose for this test is to measure read performance when
readers might have to merge results. This will also benefit from work-in-progress
to add skewed key generation.

Task ID: #

Blame Rev:

Test Plan:
Revert Plan:

Database Impact:

Memcache Impact:

Other Notes:

EImportant:

- begin *PUBLIC* platform impact section -
Bugzilla: #
- end platform impact -

Reviewers: igor

Reviewed By: igor

Subscribers: dhruba

Differential Revision: https://reviews.facebook.net/D35115
This commit is contained in:
Mark Callaghan 2015-03-18 13:50:52 -07:00
parent 230e68727a
commit dfccc7b4e2
2 changed files with 73 additions and 15 deletions

View file

@ -79,6 +79,7 @@ DEFINE_string(benchmarks,
"readtocache," "readtocache,"
"readreverse," "readreverse,"
"readwhilewriting," "readwhilewriting,"
"readwhilemerging,"
"readrandomwriterandom," "readrandomwriterandom,"
"updaterandom," "updaterandom,"
"randomwithverify," "randomwithverify,"
@ -113,6 +114,8 @@ DEFINE_string(benchmarks,
"of DB\n" "of DB\n"
"\treadwhilewriting -- 1 writer, N threads doing random " "\treadwhilewriting -- 1 writer, N threads doing random "
"reads\n" "reads\n"
"\treadwhilemerging -- 1 merger, N threads doing random "
"reads\n"
"\treadrandomwriterandom -- N threads doing random-read, " "\treadrandomwriterandom -- N threads doing random-read, "
"random-write\n" "random-write\n"
"\tprefixscanrandom -- prefix scan N times in random order\n" "\tprefixscanrandom -- prefix scan N times in random order\n"
@ -180,6 +183,10 @@ DEFINE_int32(seek_nexts, 0,
"How many times to call Next() after Seek() in " "How many times to call Next() after Seek() in "
"fillseekseq and seekrandom"); "fillseekseq and seekrandom");
DEFINE_bool(reverse_iterator, false,
"When true use Prev rather than Next for iterators that do "
"Seek and then Next");
DEFINE_bool(use_uint64_comparator, false, "use Uint64 user comparator"); DEFINE_bool(use_uint64_comparator, false, "use Uint64 user comparator");
DEFINE_int64(batch_size, 1, "Batch size"); DEFINE_int64(batch_size, 1, "Batch size");
@ -316,8 +323,9 @@ static class std::shared_ptr<rocksdb::Statistics> dbstats;
DEFINE_int64(writes, -1, "Number of write operations to do. If negative, do" DEFINE_int64(writes, -1, "Number of write operations to do. If negative, do"
" --num reads."); " --num reads.");
DEFINE_int32(writes_per_second, 0, "Per-thread rate limit on writes per second." DEFINE_int32(writes_per_second, 0, "Per-thread rate limit on writes and merges "
" No limit when <= 0. Only for the readwhilewriting test."); " per second. No limit when <= 0. Only for the readwhilewriting "
" and readwhilemerging tests.");
DEFINE_bool(sync, false, "Sync all writes to disk"); DEFINE_bool(sync, false, "Sync all writes to disk");
@ -560,6 +568,11 @@ DEFINE_bool(identity_as_first_hash, false, "the first hash function of cuckoo "
"table becomes an identity function. This is only valid when key " "table becomes an identity function. This is only valid when key "
"is 8 bytes"); "is 8 bytes");
enum PutOrMerge {
kPut,
kMerge
};
enum RepFactory { enum RepFactory {
kSkipList, kSkipList,
kPrefixHash, kPrefixHash,
@ -1588,6 +1601,9 @@ class Benchmark {
} else if (name == Slice("readwhilewriting")) { } else if (name == Slice("readwhilewriting")) {
num_threads++; // Add extra thread for writing num_threads++; // Add extra thread for writing
method = &Benchmark::ReadWhileWriting; method = &Benchmark::ReadWhileWriting;
} else if (name == Slice("readwhilemerging")) {
num_threads++; // Add extra thread for writing
method = &Benchmark::ReadWhileMerging;
} else if (name == Slice("readrandomwriterandom")) { } else if (name == Slice("readrandomwriterandom")) {
method = &Benchmark::ReadRandomWriteRandom; method = &Benchmark::ReadRandomWriteRandom;
} else if (name == Slice("readrandommergerandom")) { } else if (name == Slice("readrandommergerandom")) {
@ -2605,7 +2621,7 @@ class Benchmark {
if (thread->tid > 0) { if (thread->tid > 0) {
IteratorCreation(thread); IteratorCreation(thread);
} else { } else {
BGWriter(thread); BGWriter(thread, kPut);
} }
} }
@ -2668,7 +2684,12 @@ class Benchmark {
Slice value = iter_to_use->value(); Slice value = iter_to_use->value();
memcpy(value_buffer, value.data(), memcpy(value_buffer, value.data(),
std::min(value.size(), sizeof(value_buffer))); std::min(value.size(), sizeof(value_buffer)));
if (!FLAGS_reverse_iterator) {
iter_to_use->Next(); iter_to_use->Next();
} else {
iter_to_use->Prev();
}
assert(iter_to_use->status().ok()); assert(iter_to_use->status().ok());
} }
@ -2692,7 +2713,7 @@ class Benchmark {
if (thread->tid > 0) { if (thread->tid > 0) {
SeekRandom(thread); SeekRandom(thread);
} else { } else {
BGWriter(thread); BGWriter(thread, kPut);
} }
} }
@ -2733,11 +2754,19 @@ class Benchmark {
if (thread->tid > 0) { if (thread->tid > 0) {
ReadRandom(thread); ReadRandom(thread);
} else { } else {
BGWriter(thread); BGWriter(thread, kPut);
} }
} }
void BGWriter(ThreadState* thread) { void ReadWhileMerging(ThreadState* thread) {
if (thread->tid > 0) {
ReadRandom(thread);
} else {
BGWriter(thread, kMerge);
}
}
void BGWriter(ThreadState* thread, enum PutOrMerge write_merge) {
// Special thread that keeps writing until other threads are done. // Special thread that keeps writing until other threads are done.
RandomGenerator gen; RandomGenerator gen;
double last = FLAGS_env->NowMicros(); double last = FLAGS_env->NowMicros();
@ -2767,9 +2796,16 @@ class Benchmark {
} }
GenerateKeyFromInt(thread->rand.Next() % FLAGS_num, FLAGS_num, &key); GenerateKeyFromInt(thread->rand.Next() % FLAGS_num, FLAGS_num, &key);
Status s = db->Put(write_options_, key, gen.Generate(value_size_)); Status s;
if (write_merge == kPut) {
s = db->Put(write_options_, key, gen.Generate(value_size_));
} else {
s = db->Merge(write_options_, key, gen.Generate(value_size_));
}
if (!s.ok()) { if (!s.ok()) {
fprintf(stderr, "put error: %s\n", s.ToString().c_str()); fprintf(stderr, "put or merge error: %s\n", s.ToString().c_str());
exit(1); exit(1);
} }
thread->stats.FinishedOps(&db_, db_.db, 1); thread->stats.FinishedOps(&db_, db_.db, 1);
@ -3207,7 +3243,11 @@ class Benchmark {
thread->stats.FinishedOps(nullptr, db, 1); thread->stats.FinishedOps(nullptr, db, 1);
for (int j = 0; j < FLAGS_seek_nexts && i + 1 < FLAGS_num; ++j) { for (int j = 0; j < FLAGS_seek_nexts && i + 1 < FLAGS_num; ++j) {
if (!FLAGS_reverse_iterator) {
iter->Next(); iter->Next();
} else {
iter->Prev();
}
GenerateKeyFromInt(++i, FLAGS_num, &key); GenerateKeyFromInt(++i, FLAGS_num, &key);
assert(iter->Valid() && iter->key() == key); assert(iter->Valid() && iter->key() == key);
thread->stats.FinishedOps(nullptr, db, 1); thread->stats.FinishedOps(nullptr, db, 1);

View file

@ -3,7 +3,7 @@
if [ $# -ne 1 ]; then if [ $# -ne 1 ]; then
echo -n "./benchmark.sh [bulkload/fillseq/overwrite/filluniquerandom/" echo -n "./benchmark.sh [bulkload/fillseq/overwrite/filluniquerandom/"
echo "readrandom/readwhilewriting/updaterandom/mergerandom]" echo "readrandom/readwhilewriting/readwhilemerging/updaterandom/mergerandom]"
exit 0 exit 0
fi fi
@ -36,7 +36,8 @@ if [ ! -z $DB_BENCH_NO_SYNC ]; then
fi fi
num_read_threads=${NUM_READ_THREADS:-16} num_read_threads=${NUM_READ_THREADS:-16}
writes_per_second=${WRITES_PER_SEC:-$((80 * K))} # (only for readwhilewriting) # Only for readwhilewriting, readwhilemerging
writes_per_second=${WRITES_PER_SEC:-$((80 * K))}
num_nexts_per_seek=${NUM_NEXTS_PER_SEEK:-10} # (only for rangescanwhilewriting) num_nexts_per_seek=${NUM_NEXTS_PER_SEEK:-10} # (only for rangescanwhilewriting)
cache_size=$((1 * G)) cache_size=$((1 * G))
duration=${DURATION:-0} duration=${DURATION:-0}
@ -167,7 +168,7 @@ function run_readrandom {
} }
function run_readwhilewriting { function run_readwhilewriting {
echo "Reading $num_keys random keys from database whiling writing.." echo "Reading $num_keys random keys from database while writing.."
cmd="./db_bench $params_r --benchmarks=readwhilewriting \ cmd="./db_bench $params_r --benchmarks=readwhilewriting \
--use_existing_db=1 \ --use_existing_db=1 \
--num=$num_keys \ --num=$num_keys \
@ -180,8 +181,23 @@ function run_readwhilewriting {
eval $cmd eval $cmd
} }
function run_readwhilemerging {
echo "Reading $num_keys random keys from database while merging.."
cmd="./db_bench $params_r --benchmarks=readwhilemerging \
--use_existing_db=1 \
--num=$num_keys \
--sync=$syncval \
--disable_data_sync=0 \
--threads=$num_read_threads \
--writes_per_second=$writes_per_second \
--merge_operator=\"put\" \
2>&1 | tee -a $output_dir/benchmark_readwhilemerging.log"
echo $cmd | tee $output_dir/benchmark_readwhilemerging.log
eval $cmd
}
function run_rangescanwhilewriting { function run_rangescanwhilewriting {
echo "Range scan $num_keys random keys from database whiling writing.." echo "Range scan $num_keys random keys from database while writing.."
cmd="./db_bench $params_r --benchmarks=seekrandomwhilewriting \ cmd="./db_bench $params_r --benchmarks=seekrandomwhilewriting \
--use_existing_db=1 \ --use_existing_db=1 \
--num=$num_keys \ --num=$num_keys \
@ -249,6 +265,8 @@ for job in ${jobs[@]}; do
run_readrandom run_readrandom
elif [ $job = readwhilewriting ]; then elif [ $job = readwhilewriting ]; then
run_readwhilewriting run_readwhilewriting
elif [ $job = readwhilemerging ]; then
run_readwhilemerging
elif [ $job = rangescanwhilewriting ]; then elif [ $job = rangescanwhilewriting ]; then
run_rangescanwhilewriting run_rangescanwhilewriting
elif [ $job = updaterandom ]; then elif [ $job = updaterandom ]; then
@ -270,7 +288,7 @@ for job in ${jobs[@]}; do
if [[ $job == readrandom || $job == readwhilewriting \ if [[ $job == readrandom || $job == readwhilewriting \
|| $job == rangescanwhilewriting || $job == updaterandom \ || $job == rangescanwhilewriting || $job == updaterandom \
|| $job == mergerandom ]]; then || $job == mergerandom || $job == readwhilemerging ]]; then
lat=$(grep "micros\/op" "$output_dir/benchmark_$job.log" \ lat=$(grep "micros\/op" "$output_dir/benchmark_$job.log" \
| grep "ops\/sec" | awk '{print $3}') | grep "ops\/sec" | awk '{print $3}')
qps=$(grep "micros\/op" "$output_dir/benchmark_$job.log" \ qps=$(grep "micros\/op" "$output_dir/benchmark_$job.log" \