diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index f2f9147503..04d679a682 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -479,6 +479,8 @@ static class std::shared_ptr dbstats; DEFINE_int64(writes, -1, "Number of write operations to do. If negative, do" " --num reads."); +DEFINE_bool(finish_after_writes, false, "Write thread terminates after all writes are finished"); + DEFINE_bool(sync, false, "Sync all writes to disk"); DEFINE_bool(use_fsync, false, "If true, issue fsync instead of fdatasync"); @@ -487,6 +489,9 @@ DEFINE_bool(disable_wal, false, "If true, do not write WAL for write."); DEFINE_string(wal_dir, "", "If not empty, use the given dir for WAL"); +DEFINE_string(truth_db, "/dev/shm/truth_db/dbbench", + "Truth key/values used when using verify"); + DEFINE_int32(num_levels, 7, "The total number of levels"); DEFINE_int64(target_file_size_base, rocksdb::Options().target_file_size_base, @@ -2175,6 +2180,37 @@ class Benchmark { return base_name + ToString(id); } +void VerifyDBFromDB(std::string& truth_db_name) { + DBWithColumnFamilies truth_db; + auto s = DB::OpenForReadOnly(open_options_, truth_db_name, &truth_db.db); + if (!s.ok()) { + fprintf(stderr, "open error: %s\n", s.ToString().c_str()); + exit(1); + } + ReadOptions ro; + ro.total_order_seek = true; + std::unique_ptr truth_iter(truth_db.db->NewIterator(ro)); + std::unique_ptr db_iter(db_.db->NewIterator(ro)); + // Verify that all the key/values in truth_db are retrivable in db with ::Get + fprintf(stderr, "Verifying db >= truth_db with ::Get...\n"); + for (truth_iter->SeekToFirst(); truth_iter->Valid(); truth_iter->Next()) { + std::string value; + s = db_.db->Get(ro, truth_iter->key(), &value); + assert(s.ok()); + // TODO(myabandeh): provide debugging hints + assert(Slice(value) == truth_iter->value()); + } + // Verify that the db iterator does not give any extra key/value + fprintf(stderr, "Verifying db == truth_db...\n"); + for (db_iter->SeekToFirst(), truth_iter->SeekToFirst(); db_iter->Valid(); db_iter->Next(), truth_iter->Next()) { + assert(truth_iter->Valid()); + assert(truth_iter->value() == db_iter->value()); + } + // No more key should be left unchecked in truth_db + assert(!truth_iter->Valid()); + fprintf(stderr, "...Verified\n"); +} + void Run() { if (!SanityCheck()) { exit(1); @@ -2393,6 +2429,8 @@ class Benchmark { method = &Benchmark::TimeSeries; } else if (name == "stats") { PrintStats("rocksdb.stats"); + } else if (name == "verify") { + VerifyDBFromDB(FLAGS_truth_db); } else if (name == "levelstats") { PrintStats("rocksdb.levelstats"); } else if (name == "sstables") { @@ -4160,14 +4198,30 @@ class Benchmark { std::unique_ptr key_guard; Slice key = AllocateKey(&key_guard); + uint32_t written = 0; + bool hint_printed = false; while (true) { DB* db = SelectDB(thread); { MutexLock l(&thread->shared->mu); + if (FLAGS_finish_after_writes && written == writes_) { + fprintf(stderr, "Exiting the writer after %u writes...\n", written); + break; + } if (thread->shared->num_done + 1 >= thread->shared->num_initialized) { // Other threads have finished - break; + if (FLAGS_finish_after_writes) { + // Wait for the writes to be finished + if (!hint_printed) { + fprintf(stderr, "Reads are finished. Have %d more writes to do\n", + (int)writes_ - written); + hint_printed = true; + } + } else { + // Finish the write immediately + break; + } } } @@ -4179,6 +4233,7 @@ class Benchmark { } else { s = db->Merge(write_options_, key, gen.Generate(value_size_)); } + written++; if (!s.ok()) { fprintf(stderr, "put or merge error: %s\n", s.ToString().c_str());