From dfccc7b4e250dac30a44a2a653aac4f780352736 Mon Sep 17 00:00:00 2001 From: Mark Callaghan Date: Wed, 18 Mar 2015 13:50:52 -0700 Subject: [PATCH] Add readwhilemerging benchmark Summary: This is like readwhilewriting but uses Merge rather than Put in the writer thread. I am using it for in-progress benchmarks. I don't think the other benchmarks for Merge cover this behavior. The purpose for this test is to measure read performance when readers might have to merge results. This will also benefit from work-in-progress to add skewed key generation. Task ID: # Blame Rev: Test Plan: Revert Plan: Database Impact: Memcache Impact: Other Notes: EImportant: - begin *PUBLIC* platform impact section - Bugzilla: # - end platform impact - Reviewers: igor Reviewed By: igor Subscribers: dhruba Differential Revision: https://reviews.facebook.net/D35115 --- db/db_bench.cc | 60 ++++++++++++++++++++++++++++++++++++++-------- tools/benchmark.sh | 28 ++++++++++++++++++---- 2 files changed, 73 insertions(+), 15 deletions(-) diff --git a/db/db_bench.cc b/db/db_bench.cc index 150ac46664..07b04719ef 100644 --- a/db/db_bench.cc +++ b/db/db_bench.cc @@ -79,6 +79,7 @@ DEFINE_string(benchmarks, "readtocache," "readreverse," "readwhilewriting," + "readwhilemerging," "readrandomwriterandom," "updaterandom," "randomwithverify," @@ -113,6 +114,8 @@ DEFINE_string(benchmarks, "of DB\n" "\treadwhilewriting -- 1 writer, N threads doing random " "reads\n" + "\treadwhilemerging -- 1 merger, N threads doing random " + "reads\n" "\treadrandomwriterandom -- N threads doing random-read, " "random-write\n" "\tprefixscanrandom -- prefix scan N times in random order\n" @@ -180,6 +183,10 @@ DEFINE_int32(seek_nexts, 0, "How many times to call Next() after Seek() in " "fillseekseq and seekrandom"); +DEFINE_bool(reverse_iterator, false, + "When true use Prev rather than Next for iterators that do " + "Seek and then Next"); + DEFINE_bool(use_uint64_comparator, false, "use Uint64 user comparator"); DEFINE_int64(batch_size, 1, "Batch size"); @@ -316,8 +323,9 @@ static class std::shared_ptr dbstats; DEFINE_int64(writes, -1, "Number of write operations to do. If negative, do" " --num reads."); -DEFINE_int32(writes_per_second, 0, "Per-thread rate limit on writes per second." - " No limit when <= 0. Only for the readwhilewriting test."); +DEFINE_int32(writes_per_second, 0, "Per-thread rate limit on writes and merges " + " per second. No limit when <= 0. Only for the readwhilewriting " + " and readwhilemerging tests."); DEFINE_bool(sync, false, "Sync all writes to disk"); @@ -560,6 +568,11 @@ DEFINE_bool(identity_as_first_hash, false, "the first hash function of cuckoo " "table becomes an identity function. This is only valid when key " "is 8 bytes"); +enum PutOrMerge { + kPut, + kMerge +}; + enum RepFactory { kSkipList, kPrefixHash, @@ -1588,6 +1601,9 @@ class Benchmark { } else if (name == Slice("readwhilewriting")) { num_threads++; // Add extra thread for writing method = &Benchmark::ReadWhileWriting; + } else if (name == Slice("readwhilemerging")) { + num_threads++; // Add extra thread for writing + method = &Benchmark::ReadWhileMerging; } else if (name == Slice("readrandomwriterandom")) { method = &Benchmark::ReadRandomWriteRandom; } else if (name == Slice("readrandommergerandom")) { @@ -2605,7 +2621,7 @@ class Benchmark { if (thread->tid > 0) { IteratorCreation(thread); } else { - BGWriter(thread); + BGWriter(thread, kPut); } } @@ -2668,7 +2684,12 @@ class Benchmark { Slice value = iter_to_use->value(); memcpy(value_buffer, value.data(), std::min(value.size(), sizeof(value_buffer))); - iter_to_use->Next(); + + if (!FLAGS_reverse_iterator) { + iter_to_use->Next(); + } else { + iter_to_use->Prev(); + } assert(iter_to_use->status().ok()); } @@ -2692,7 +2713,7 @@ class Benchmark { if (thread->tid > 0) { SeekRandom(thread); } else { - BGWriter(thread); + BGWriter(thread, kPut); } } @@ -2733,11 +2754,19 @@ class Benchmark { if (thread->tid > 0) { ReadRandom(thread); } else { - BGWriter(thread); + BGWriter(thread, kPut); } } - void BGWriter(ThreadState* thread) { + void ReadWhileMerging(ThreadState* thread) { + if (thread->tid > 0) { + ReadRandom(thread); + } else { + BGWriter(thread, kMerge); + } + } + + void BGWriter(ThreadState* thread, enum PutOrMerge write_merge) { // Special thread that keeps writing until other threads are done. RandomGenerator gen; double last = FLAGS_env->NowMicros(); @@ -2767,9 +2796,16 @@ class Benchmark { } GenerateKeyFromInt(thread->rand.Next() % FLAGS_num, FLAGS_num, &key); - Status s = db->Put(write_options_, key, gen.Generate(value_size_)); + Status s; + + if (write_merge == kPut) { + s = db->Put(write_options_, key, gen.Generate(value_size_)); + } else { + s = db->Merge(write_options_, key, gen.Generate(value_size_)); + } + if (!s.ok()) { - fprintf(stderr, "put error: %s\n", s.ToString().c_str()); + fprintf(stderr, "put or merge error: %s\n", s.ToString().c_str()); exit(1); } thread->stats.FinishedOps(&db_, db_.db, 1); @@ -3207,7 +3243,11 @@ class Benchmark { thread->stats.FinishedOps(nullptr, db, 1); for (int j = 0; j < FLAGS_seek_nexts && i + 1 < FLAGS_num; ++j) { - iter->Next(); + if (!FLAGS_reverse_iterator) { + iter->Next(); + } else { + iter->Prev(); + } GenerateKeyFromInt(++i, FLAGS_num, &key); assert(iter->Valid() && iter->key() == key); thread->stats.FinishedOps(nullptr, db, 1); diff --git a/tools/benchmark.sh b/tools/benchmark.sh index f412131e09..07850069ca 100755 --- a/tools/benchmark.sh +++ b/tools/benchmark.sh @@ -3,7 +3,7 @@ if [ $# -ne 1 ]; then echo -n "./benchmark.sh [bulkload/fillseq/overwrite/filluniquerandom/" - echo "readrandom/readwhilewriting/updaterandom/mergerandom]" + echo "readrandom/readwhilewriting/readwhilemerging/updaterandom/mergerandom]" exit 0 fi @@ -36,7 +36,8 @@ if [ ! -z $DB_BENCH_NO_SYNC ]; then fi num_read_threads=${NUM_READ_THREADS:-16} -writes_per_second=${WRITES_PER_SEC:-$((80 * K))} # (only for readwhilewriting) +# Only for readwhilewriting, readwhilemerging +writes_per_second=${WRITES_PER_SEC:-$((80 * K))} num_nexts_per_seek=${NUM_NEXTS_PER_SEEK:-10} # (only for rangescanwhilewriting) cache_size=$((1 * G)) duration=${DURATION:-0} @@ -167,7 +168,7 @@ function run_readrandom { } function run_readwhilewriting { - echo "Reading $num_keys random keys from database whiling writing.." + echo "Reading $num_keys random keys from database while writing.." cmd="./db_bench $params_r --benchmarks=readwhilewriting \ --use_existing_db=1 \ --num=$num_keys \ @@ -180,8 +181,23 @@ function run_readwhilewriting { eval $cmd } +function run_readwhilemerging { + echo "Reading $num_keys random keys from database while merging.." + cmd="./db_bench $params_r --benchmarks=readwhilemerging \ + --use_existing_db=1 \ + --num=$num_keys \ + --sync=$syncval \ + --disable_data_sync=0 \ + --threads=$num_read_threads \ + --writes_per_second=$writes_per_second \ + --merge_operator=\"put\" \ + 2>&1 | tee -a $output_dir/benchmark_readwhilemerging.log" + echo $cmd | tee $output_dir/benchmark_readwhilemerging.log + eval $cmd +} + function run_rangescanwhilewriting { - echo "Range scan $num_keys random keys from database whiling writing.." + echo "Range scan $num_keys random keys from database while writing.." cmd="./db_bench $params_r --benchmarks=seekrandomwhilewriting \ --use_existing_db=1 \ --num=$num_keys \ @@ -249,6 +265,8 @@ for job in ${jobs[@]}; do run_readrandom elif [ $job = readwhilewriting ]; then run_readwhilewriting + elif [ $job = readwhilemerging ]; then + run_readwhilemerging elif [ $job = rangescanwhilewriting ]; then run_rangescanwhilewriting elif [ $job = updaterandom ]; then @@ -270,7 +288,7 @@ for job in ${jobs[@]}; do if [[ $job == readrandom || $job == readwhilewriting \ || $job == rangescanwhilewriting || $job == updaterandom \ - || $job == mergerandom ]]; then + || $job == mergerandom || $job == readwhilemerging ]]; then lat=$(grep "micros\/op" "$output_dir/benchmark_$job.log" \ | grep "ops\/sec" | awk '{print $3}') qps=$(grep "micros\/op" "$output_dir/benchmark_$job.log" \