diff --git a/db/db_bench.cc b/db/db_bench.cc index 150ac46664..07b04719ef 100644 --- a/db/db_bench.cc +++ b/db/db_bench.cc @@ -79,6 +79,7 @@ DEFINE_string(benchmarks, "readtocache," "readreverse," "readwhilewriting," + "readwhilemerging," "readrandomwriterandom," "updaterandom," "randomwithverify," @@ -113,6 +114,8 @@ DEFINE_string(benchmarks, "of DB\n" "\treadwhilewriting -- 1 writer, N threads doing random " "reads\n" + "\treadwhilemerging -- 1 merger, N threads doing random " + "reads\n" "\treadrandomwriterandom -- N threads doing random-read, " "random-write\n" "\tprefixscanrandom -- prefix scan N times in random order\n" @@ -180,6 +183,10 @@ DEFINE_int32(seek_nexts, 0, "How many times to call Next() after Seek() in " "fillseekseq and seekrandom"); +DEFINE_bool(reverse_iterator, false, + "When true use Prev rather than Next for iterators that do " + "Seek and then Next"); + DEFINE_bool(use_uint64_comparator, false, "use Uint64 user comparator"); DEFINE_int64(batch_size, 1, "Batch size"); @@ -316,8 +323,9 @@ static class std::shared_ptr dbstats; DEFINE_int64(writes, -1, "Number of write operations to do. If negative, do" " --num reads."); -DEFINE_int32(writes_per_second, 0, "Per-thread rate limit on writes per second." - " No limit when <= 0. Only for the readwhilewriting test."); +DEFINE_int32(writes_per_second, 0, "Per-thread rate limit on writes and merges " + " per second. No limit when <= 0. Only for the readwhilewriting " + " and readwhilemerging tests."); DEFINE_bool(sync, false, "Sync all writes to disk"); @@ -560,6 +568,11 @@ DEFINE_bool(identity_as_first_hash, false, "the first hash function of cuckoo " "table becomes an identity function. This is only valid when key " "is 8 bytes"); +enum PutOrMerge { + kPut, + kMerge +}; + enum RepFactory { kSkipList, kPrefixHash, @@ -1588,6 +1601,9 @@ class Benchmark { } else if (name == Slice("readwhilewriting")) { num_threads++; // Add extra thread for writing method = &Benchmark::ReadWhileWriting; + } else if (name == Slice("readwhilemerging")) { + num_threads++; // Add extra thread for writing + method = &Benchmark::ReadWhileMerging; } else if (name == Slice("readrandomwriterandom")) { method = &Benchmark::ReadRandomWriteRandom; } else if (name == Slice("readrandommergerandom")) { @@ -2605,7 +2621,7 @@ class Benchmark { if (thread->tid > 0) { IteratorCreation(thread); } else { - BGWriter(thread); + BGWriter(thread, kPut); } } @@ -2668,7 +2684,12 @@ class Benchmark { Slice value = iter_to_use->value(); memcpy(value_buffer, value.data(), std::min(value.size(), sizeof(value_buffer))); - iter_to_use->Next(); + + if (!FLAGS_reverse_iterator) { + iter_to_use->Next(); + } else { + iter_to_use->Prev(); + } assert(iter_to_use->status().ok()); } @@ -2692,7 +2713,7 @@ class Benchmark { if (thread->tid > 0) { SeekRandom(thread); } else { - BGWriter(thread); + BGWriter(thread, kPut); } } @@ -2733,11 +2754,19 @@ class Benchmark { if (thread->tid > 0) { ReadRandom(thread); } else { - BGWriter(thread); + BGWriter(thread, kPut); } } - void BGWriter(ThreadState* thread) { + void ReadWhileMerging(ThreadState* thread) { + if (thread->tid > 0) { + ReadRandom(thread); + } else { + BGWriter(thread, kMerge); + } + } + + void BGWriter(ThreadState* thread, enum PutOrMerge write_merge) { // Special thread that keeps writing until other threads are done. RandomGenerator gen; double last = FLAGS_env->NowMicros(); @@ -2767,9 +2796,16 @@ class Benchmark { } GenerateKeyFromInt(thread->rand.Next() % FLAGS_num, FLAGS_num, &key); - Status s = db->Put(write_options_, key, gen.Generate(value_size_)); + Status s; + + if (write_merge == kPut) { + s = db->Put(write_options_, key, gen.Generate(value_size_)); + } else { + s = db->Merge(write_options_, key, gen.Generate(value_size_)); + } + if (!s.ok()) { - fprintf(stderr, "put error: %s\n", s.ToString().c_str()); + fprintf(stderr, "put or merge error: %s\n", s.ToString().c_str()); exit(1); } thread->stats.FinishedOps(&db_, db_.db, 1); @@ -3207,7 +3243,11 @@ class Benchmark { thread->stats.FinishedOps(nullptr, db, 1); for (int j = 0; j < FLAGS_seek_nexts && i + 1 < FLAGS_num; ++j) { - iter->Next(); + if (!FLAGS_reverse_iterator) { + iter->Next(); + } else { + iter->Prev(); + } GenerateKeyFromInt(++i, FLAGS_num, &key); assert(iter->Valid() && iter->key() == key); thread->stats.FinishedOps(nullptr, db, 1); diff --git a/tools/benchmark.sh b/tools/benchmark.sh index f412131e09..07850069ca 100755 --- a/tools/benchmark.sh +++ b/tools/benchmark.sh @@ -3,7 +3,7 @@ if [ $# -ne 1 ]; then echo -n "./benchmark.sh [bulkload/fillseq/overwrite/filluniquerandom/" - echo "readrandom/readwhilewriting/updaterandom/mergerandom]" + echo "readrandom/readwhilewriting/readwhilemerging/updaterandom/mergerandom]" exit 0 fi @@ -36,7 +36,8 @@ if [ ! -z $DB_BENCH_NO_SYNC ]; then fi num_read_threads=${NUM_READ_THREADS:-16} -writes_per_second=${WRITES_PER_SEC:-$((80 * K))} # (only for readwhilewriting) +# Only for readwhilewriting, readwhilemerging +writes_per_second=${WRITES_PER_SEC:-$((80 * K))} num_nexts_per_seek=${NUM_NEXTS_PER_SEEK:-10} # (only for rangescanwhilewriting) cache_size=$((1 * G)) duration=${DURATION:-0} @@ -167,7 +168,7 @@ function run_readrandom { } function run_readwhilewriting { - echo "Reading $num_keys random keys from database whiling writing.." + echo "Reading $num_keys random keys from database while writing.." cmd="./db_bench $params_r --benchmarks=readwhilewriting \ --use_existing_db=1 \ --num=$num_keys \ @@ -180,8 +181,23 @@ function run_readwhilewriting { eval $cmd } +function run_readwhilemerging { + echo "Reading $num_keys random keys from database while merging.." + cmd="./db_bench $params_r --benchmarks=readwhilemerging \ + --use_existing_db=1 \ + --num=$num_keys \ + --sync=$syncval \ + --disable_data_sync=0 \ + --threads=$num_read_threads \ + --writes_per_second=$writes_per_second \ + --merge_operator=\"put\" \ + 2>&1 | tee -a $output_dir/benchmark_readwhilemerging.log" + echo $cmd | tee $output_dir/benchmark_readwhilemerging.log + eval $cmd +} + function run_rangescanwhilewriting { - echo "Range scan $num_keys random keys from database whiling writing.." + echo "Range scan $num_keys random keys from database while writing.." cmd="./db_bench $params_r --benchmarks=seekrandomwhilewriting \ --use_existing_db=1 \ --num=$num_keys \ @@ -249,6 +265,8 @@ for job in ${jobs[@]}; do run_readrandom elif [ $job = readwhilewriting ]; then run_readwhilewriting + elif [ $job = readwhilemerging ]; then + run_readwhilemerging elif [ $job = rangescanwhilewriting ]; then run_rangescanwhilewriting elif [ $job = updaterandom ]; then @@ -270,7 +288,7 @@ for job in ${jobs[@]}; do if [[ $job == readrandom || $job == readwhilewriting \ || $job == rangescanwhilewriting || $job == updaterandom \ - || $job == mergerandom ]]; then + || $job == mergerandom || $job == readwhilemerging ]]; then lat=$(grep "micros\/op" "$output_dir/benchmark_$job.log" \ | grep "ops\/sec" | awk '{print $3}') qps=$(grep "micros\/op" "$output_dir/benchmark_$job.log" \