[JNI] Add readwhilewriting to Java db_bench

Summary: Add readwhilewriting to Java db_bench

Test Plan:
make jni
make jdb_bench

Reviewers: haobo, ankgup87, sdong, dhruba

Reviewed By: haobo

CC: leveldb

Differential Revision: https://reviews.facebook.net/D17619
This commit is contained in:
Yueh-Hsuan Chiang 2014-04-10 16:12:04 -07:00
parent ddef6841b3
commit dfe2d2f3a2
2 changed files with 158 additions and 48 deletions

View file

@ -26,9 +26,14 @@ test: java
db_bench: java db_bench: java
javac org/rocksdb/benchmark/*.java javac org/rocksdb/benchmark/*.java
rm -rf /tmp/rocksdbjni-bench rm -rf /tmp/rocksdbjni-bench
java -Djava.library.path=.:../ -cp "$(ROCKSDB_JAR):.:./*" org.rocksdb.benchmark.DbBenchmark --threads=1 --benchmarks=fillseq,readrandom java -Djava.library.path=.:../ -cp "$(ROCKSDB_JAR):.:./*" org.rocksdb.benchmark.DbBenchmark --threads=1 --benchmarks=fillseq,readrandom,readwhilewriting
java -Djava.library.path=.:../ -cp "$(ROCKSDB_JAR):.:./*" org.rocksdb.benchmark.DbBenchmark --threads=2 --benchmarks=fillseq,readrandom rm -rf /tmp/rocksdbjni-bench
java -Djava.library.path=.:../ -cp "$(ROCKSDB_JAR):.:./*" org.rocksdb.benchmark.DbBenchmark --threads=4 --benchmarks=fillseq,readrandom java -Djava.library.path=.:../ -cp "$(ROCKSDB_JAR):.:./*" org.rocksdb.benchmark.DbBenchmark --threads=2 --benchmarks=fillseq,readrandom,readwhilewriting
java -Djava.library.path=.:../ -cp "$(ROCKSDB_JAR):.:./*" org.rocksdb.benchmark.DbBenchmark --threads=8 --benchmarks=fillseq,readrandom rm -rf /tmp/rocksdbjni-bench
java -Djava.library.path=.:../ -cp "$(ROCKSDB_JAR):.:./*" org.rocksdb.benchmark.DbBenchmark --threads=16 --benchmarks=fillseq,readrandom java -Djava.library.path=.:../ -cp "$(ROCKSDB_JAR):.:./*" org.rocksdb.benchmark.DbBenchmark --threads=4 --benchmarks=fillseq,readrandom,readwhilewriting
java -Djava.library.path=.:../ -cp "$(ROCKSDB_JAR):.:./*" org.rocksdb.benchmark.DbBenchmark --threads=32 --benchmarks=fillseq,readrandom rm -rf /tmp/rocksdbjni-bench
java -Djava.library.path=.:../ -cp "$(ROCKSDB_JAR):.:./*" org.rocksdb.benchmark.DbBenchmark --threads=8 --benchmarks=fillseq,readrandom,readwhilewriting
rm -rf /tmp/rocksdbjni-bench
java -Djava.library.path=.:../ -cp "$(ROCKSDB_JAR):.:./*" org.rocksdb.benchmark.DbBenchmark --threads=16 --benchmarks=fillseq,readrandom,readwhilewriting
rm -rf /tmp/rocksdbjni-bench
java -Djava.library.path=.:../ -cp "$(ROCKSDB_JAR):.:./*" org.rocksdb.benchmark.DbBenchmark --threads=32 --benchmarks=fillseq,readrandom,readwhilewriting

View file

@ -47,7 +47,7 @@ class Stats {
double seconds_; double seconds_;
long done_; long done_;
long found_; long found_;
long lastReportDone_; long lastOpTime_;
long nextReport_; long nextReport_;
long bytes_; long bytes_;
StringBuilder message_; StringBuilder message_;
@ -57,10 +57,10 @@ class Stats {
id_ = id; id_ = id;
nextReport_ = 100; nextReport_ = 100;
done_ = 0; done_ = 0;
lastReportDone_ = 0;
bytes_ = 0; bytes_ = 0;
seconds_ = 0; seconds_ = 0;
start_ = System.nanoTime(); start_ = System.nanoTime();
lastOpTime_ = start_;
finish_ = start_; finish_ = start_;
found_ = 0; found_ = 0;
message_ = new StringBuilder(""); message_ = new StringBuilder("");
@ -102,7 +102,7 @@ class Stats {
void finishedSingleOp(int bytes) { void finishedSingleOp(int bytes) {
done_++; done_++;
lastReportDone_ = System.nanoTime(); lastOpTime_ = System.nanoTime();
bytes_ += bytes; bytes_ += bytes;
if (done_ >= nextReport_) { if (done_ >= nextReport_) {
if (nextReport_ < 1000) { if (nextReport_ < 1000) {
@ -202,6 +202,16 @@ public class DbBenchmark {
super(tid, randSeed, numEntries, keyRange); super(tid, randSeed, numEntries, keyRange);
writeOpt_ = writeOpt; writeOpt_ = writeOpt;
entriesPerBatch_ = entriesPerBatch; entriesPerBatch_ = entriesPerBatch;
maxWritesPerSecond_ = -1;
}
public WriteTask(
int tid, long randSeed, long numEntries, long keyRange,
WriteOptions writeOpt, long entriesPerBatch, long maxWritesPerSecond) {
super(tid, randSeed, numEntries, keyRange);
writeOpt_ = writeOpt;
entriesPerBatch_ = entriesPerBatch;
maxWritesPerSecond_ = maxWritesPerSecond;
} }
@Override public void runTask() throws RocksDBException { @Override public void runTask() throws RocksDBException {
@ -211,11 +221,16 @@ public class DbBenchmark {
byte[] key = new byte[keySize_]; byte[] key = new byte[keySize_];
byte[] value = new byte[valueSize_]; byte[] value = new byte[valueSize_];
try {
if (entriesPerBatch_ == 1) { if (entriesPerBatch_ == 1) {
for (long i = 0; i < numEntries_; ++i) { for (long i = 0; i < numEntries_; ++i) {
getKey(key, i, keyRange_); getKey(key, i, keyRange_);
db_.put(writeOpt_, key, DbBenchmark.this.gen_.generate(valueSize_)); db_.put(writeOpt_, key, DbBenchmark.this.gen_.generate(valueSize_));
stats_.finishedSingleOp(keySize_ + valueSize_); stats_.finishedSingleOp(keySize_ + valueSize_);
writeRateControl(i);
if (isFinished()) {
return;
}
} }
} else { } else {
for (long i = 0; i < numEntries_; i += entriesPerBatch_) { for (long i = 0; i < numEntries_; i += entriesPerBatch_) {
@ -227,13 +242,32 @@ public class DbBenchmark {
} }
db_.write(writeOpt_, batch); db_.write(writeOpt_, batch);
batch.dispose(); batch.dispose();
writeRateControl(i);
if (isFinished()) {
return;
} }
} }
} }
} catch (InterruptedException e) {
// thread has been terminated.
}
}
protected void writeRateControl(long writeCount)
throws InterruptedException {
if (maxWritesPerSecond_ <= 0) return;
long minInterval =
writeCount * TimeUnit.SECONDS.toNanos(1) / maxWritesPerSecond_;
long interval = System.nanoTime() - stats_.start_;
if (minInterval - interval > TimeUnit.MILLISECONDS.toNanos(1)) {
TimeUnit.NANOSECONDS.sleep(minInterval - interval);
}
}
abstract protected void getKey(byte[] key, long id, long range); abstract protected void getKey(byte[] key, long id, long range);
protected WriteOptions writeOpt_; protected WriteOptions writeOpt_;
protected long entriesPerBatch_; protected long entriesPerBatch_;
protected long maxWritesPerSecond_;
} }
class WriteSequentialTask extends WriteTask { class WriteSequentialTask extends WriteTask {
@ -243,6 +277,14 @@ public class DbBenchmark {
super(tid, randSeed, numEntries, keyRange, super(tid, randSeed, numEntries, keyRange,
writeOpt, entriesPerBatch); writeOpt, entriesPerBatch);
} }
public WriteSequentialTask(
int tid, long randSeed, long numEntries, long keyRange,
WriteOptions writeOpt, long entriesPerBatch,
long maxWritesPerSecond) {
super(tid, randSeed, numEntries, keyRange,
writeOpt, entriesPerBatch,
maxWritesPerSecond);
}
@Override protected void getKey(byte[] key, long id, long range) { @Override protected void getKey(byte[] key, long id, long range) {
getFixedKey(key, id); getFixedKey(key, id);
} }
@ -255,6 +297,14 @@ public class DbBenchmark {
super(tid, randSeed, numEntries, keyRange, super(tid, randSeed, numEntries, keyRange,
writeOpt, entriesPerBatch); writeOpt, entriesPerBatch);
} }
public WriteRandomTask(
int tid, long randSeed, long numEntries, long keyRange,
WriteOptions writeOpt, long entriesPerBatch,
long maxWritesPerSecond) {
super(tid, randSeed, numEntries, keyRange,
writeOpt, entriesPerBatch,
maxWritesPerSecond);
}
@Override protected void getKey(byte[] key, long id, long range) { @Override protected void getKey(byte[] key, long id, long range) {
getRandomKey(key, range); getRandomKey(key, range);
} }
@ -278,6 +328,9 @@ public class DbBenchmark {
} else { } else {
stats_.finishedSingleOp(keySize_); stats_.finishedSingleOp(keySize_);
} }
if (isFinished()) {
return;
}
} }
} }
} }
@ -312,7 +365,9 @@ public class DbBenchmark {
useExisting_ = (boolean) flags.get(Flag.use_existing_db); useExisting_ = (boolean) flags.get(Flag.use_existing_db);
randSeed_ = (long) flags.get(Flag.seed); randSeed_ = (long) flags.get(Flag.seed);
databaseDir_ = (String) flags.get(Flag.db); databaseDir_ = (String) flags.get(Flag.db);
writesPerSeconds_ = (int) flags.get(Flag.writes_per_second);
gen_ = new RandomGenerator(compressionRatio_); gen_ = new RandomGenerator(compressionRatio_);
finishLock_ = new Object();
} }
private void run() throws RocksDBException { private void run() throws RocksDBException {
@ -325,29 +380,25 @@ public class DbBenchmark {
for (String benchmark : benchmarks_) { for (String benchmark : benchmarks_) {
List<Callable<Stats>> tasks = new ArrayList<Callable<Stats>>(); List<Callable<Stats>> tasks = new ArrayList<Callable<Stats>>();
List<Callable<Stats>> bgTasks = new ArrayList<Callable<Stats>>();
WriteOptions writeOpt = new WriteOptions(); WriteOptions writeOpt = new WriteOptions();
int currentTaskId = 0; int currentTaskId = 0;
int concurrentThreads = threadNum_;
boolean known = true; boolean known = true;
if (benchmark.equals("fillseq")) { if (benchmark.equals("fillseq")) {
tasks.add(new WriteSequentialTask( tasks.add(new WriteSequentialTask(
currentTaskId, randSeed_, num_, num_, writeOpt, 1)); currentTaskId++, randSeed_, num_, num_, writeOpt, 1));
concurrentThreads = 1;
} else if (benchmark.equals("fillbatch")) { } else if (benchmark.equals("fillbatch")) {
tasks.add(new WriteRandomTask( tasks.add(new WriteRandomTask(
currentTaskId, randSeed_, num_ / 1000, num_, writeOpt, 1000)); currentTaskId++, randSeed_, num_ / 1000, num_, writeOpt, 1000));
concurrentThreads = 1;
} else if (benchmark.equals("fillrandom")) { } else if (benchmark.equals("fillrandom")) {
tasks.add(new WriteRandomTask( tasks.add(new WriteRandomTask(
currentTaskId, randSeed_, num_, num_, writeOpt, 1)); currentTaskId++, randSeed_, num_, num_, writeOpt, 1));
concurrentThreads = 1;
} else if (benchmark.equals("fillsync")) { } else if (benchmark.equals("fillsync")) {
writeOpt.setSync(true); writeOpt.setSync(true);
tasks.add(new WriteRandomTask( tasks.add(new WriteRandomTask(
currentTaskId, randSeed_, num_ / 1000, num_ / 1000, currentTaskId++, randSeed_, num_ / 1000, num_ / 1000,
writeOpt, 1)); writeOpt, 1));
concurrentThreads = 1;
} else if (benchmark.equals("readseq")) { } else if (benchmark.equals("readseq")) {
for (int t = 0; t < threadNum_; ++t) { for (int t = 0; t < threadNum_; ++t) {
tasks.add(new ReadSequentialTask( tasks.add(new ReadSequentialTask(
@ -359,6 +410,15 @@ public class DbBenchmark {
tasks.add(new ReadRandomTask( tasks.add(new ReadRandomTask(
currentTaskId++, randSeed_, reads_ / threadNum_, num_)); currentTaskId++, randSeed_, reads_ / threadNum_, num_));
} }
} else if (benchmark.equals("readwhilewriting")) {
WriteTask writeTask = new WriteRandomTask(
-1, randSeed_, Long.MAX_VALUE, num_, writeOpt, 1, writesPerSeconds_);
writeTask.stats_.setExcludeFromMerge();
bgTasks.add(writeTask);
for (int t = 0; t < threadNum_; ++t) {
tasks.add(new ReadRandomTask(
currentTaskId++, randSeed_, reads_ / threadNum_, num_));
}
} else if (benchmark.equals("readhot")) { } else if (benchmark.equals("readhot")) {
for (int t = 0; t < threadNum_; ++t) { for (int t = 0; t < threadNum_; ++t) {
tasks.add(new ReadRandomTask( tasks.add(new ReadRandomTask(
@ -373,17 +433,34 @@ public class DbBenchmark {
} }
if (known) { if (known) {
ExecutorService executor = Executors.newCachedThreadPool(); ExecutorService executor = Executors.newCachedThreadPool();
ExecutorService bgExecutor = Executors.newCachedThreadPool();
try { try {
// measure only the main executor time
List<Future<Stats>> bgResults = new ArrayList<Future<Stats>>();
for (Callable bgTask : bgTasks) {
bgResults.add(bgExecutor.submit(bgTask));
}
start(); start();
List<Future<Stats>> results = executor.invokeAll(tasks); List<Future<Stats>> results = executor.invokeAll(tasks);
executor.shutdown(); executor.shutdown();
boolean finished = executor.awaitTermination(3, TimeUnit.DAYS); boolean finished = executor.awaitTermination(10, TimeUnit.SECONDS);
// do something
stop(benchmark, results, concurrentThreads);
if (!finished) { if (!finished) {
// do something else System.out.format(
System.out.format("Benchmark %s was not finished before timeout."); "Benchmark %s was not finished before timeout.",
benchmark);
executor.shutdownNow();
} }
setFinished(true);
bgExecutor.shutdown();
finished = bgExecutor.awaitTermination(10, TimeUnit.SECONDS);
if (!finished) {
System.out.format(
"Benchmark %s was not finished before timeout.",
benchmark);
bgExecutor.shutdownNow();
}
stop(benchmark, results, currentTaskId);
} catch (InterruptedException e) { } catch (InterruptedException e) {
System.err.println(e); System.err.println(e);
} }
@ -423,6 +500,7 @@ public class DbBenchmark {
} }
private void start() { private void start() {
setFinished(false);
startTime_ = System.nanoTime(); startTime_ = System.nanoTime();
} }
@ -449,7 +527,7 @@ public class DbBenchmark {
} }
System.out.printf( System.out.printf(
"%-12s : %11.5f micros/op; %6.1f MB/s; %d / %d task(s) finished.\n", "%-16s : %11.5f micros/op; %6.1f MB/s; %d / %d task(s) finished.\n",
benchmark, elapsedSeconds * 1e6 / num_, benchmark, elapsedSeconds * 1e6 / num_,
(stats.bytes_ / 1048576.0) / elapsedSeconds, (stats.bytes_ / 1048576.0) / elapsedSeconds,
taskFinishedCount, concurrentThreads); taskFinishedCount, concurrentThreads);
@ -536,15 +614,18 @@ public class DbBenchmark {
"fillrandom"), "fillrandom"),
"Comma-separated list of operations to run in the specified order\n" + "Comma-separated list of operations to run in the specified order\n" +
"\tActual benchmarks:\n" + "\tActual benchmarks:\n" +
"\t\tfillseq -- write N values in sequential key order in async mode\n" + "\t\tfillseq -- write N values in sequential key order in async mode.\n" +
"\t\tfillrandom -- write N values in random key order in async mode\n" + "\t\tfillrandom -- write N values in random key order in async mode.\n" +
"\t\tfillbatch -- write N/1000 batch where each batch has 1000 values\n" + "\t\tfillbatch -- write N/1000 batch where each batch has 1000 values\n" +
"\t\t in random key order in sync mode\n" + "\t\t in random key order in sync mode.\n" +
"\t\tfillsync -- write N/100 values in random key order in sync mode\n" + "\t\tfillsync -- write N/100 values in random key order in sync mode.\n" +
"\t\tfill100K -- write N/1000 100K values in random order in async mode\n" + "\t\tfill100K -- write N/1000 100K values in random order in async mode.\n" +
"\t\treadseq -- read N times sequentially\n" + "\t\treadseq -- read N times sequentially.\n" +
"\t\treadrandom -- read N times in random order\n" + "\t\treadrandom -- read N times in random order.\n" +
"\t\treadhot -- read N times in random order from 1% section of DB\n" + "\t\treadhot -- read N times in random order from 1% section of DB.\n" +
"\t\treadwhilewriting -- measure the read performance of multiple readers\n" +
"\t\t with a bg single writer. The write rate of the bg\n" +
"\t\t is capped by --writes_per_second.\n" +
"\tMeta Operations:\n" + "\tMeta Operations:\n" +
"\t\tdelete -- delete DB") { "\t\tdelete -- delete DB") {
@Override public Object parseValue(String value) { @Override public Object parseValue(String value) {
@ -613,6 +694,15 @@ public class DbBenchmark {
} }
}, },
writes_per_second(10000,
"The write-rate of the background writer used in the\n" +
"`readwhilewriting` benchmark. Non-positive number indicates\n" +
"using an unbounded write-rate in `readwhilewriting` benchmark.") {
@Override public Object parseValue(String value) {
return Integer.parseInt(value);
}
},
cache_size(-1, cache_size(-1,
"Number of bytes to use as a cache of uncompressed data.\n" + "Number of bytes to use as a cache of uncompressed data.\n" +
"\tNegative means use default settings.") { "\tNegative means use default settings.") {
@ -682,6 +772,18 @@ public class DbBenchmark {
} }
} }
boolean isFinished() {
synchronized(finishLock_) {
return isFinished_;
}
}
void setFinished(boolean flag) {
synchronized(finishLock_) {
isFinished_ = flag;
}
}
RocksDB db_; RocksDB db_;
final List<String> benchmarks_; final List<String> benchmarks_;
final int num_; final int num_;
@ -690,10 +792,13 @@ public class DbBenchmark {
final int valueSize_; final int valueSize_;
final int writeBufferSize_; final int writeBufferSize_;
final int threadNum_; final int threadNum_;
final int writesPerSeconds_;
final long randSeed_; final long randSeed_;
final boolean useExisting_; final boolean useExisting_;
final String databaseDir_; final String databaseDir_;
final double compressionRatio_; final double compressionRatio_;
RandomGenerator gen_; RandomGenerator gen_;
long startTime_; long startTime_;
Object finishLock_;
boolean isFinished_;
} }