From 465b9103f83329a1edfe477aa87fb06300ec0ab2 Mon Sep 17 00:00:00 2001 From: Vamsi Ponnekanti Date: Wed, 20 Feb 2013 15:57:27 -0800 Subject: [PATCH] [Add a second kind of verification to db_stress Summary: Currently the test tracks all writes in memory and uses it for verification at the end. This has 4 problems: (a) It needs mutex for each write to ensure in-memory update and leveldb update are done atomically. This slows down the benchmark. (b) Verification phase at the end is time consuming as well (c) Does not test batch writes or snapshots (d) We cannot kill the test and restart multiple times in a loop because in-memory state will be lost. I am adding a FLAGS_multi that does MultiGet/MultiPut/MultiDelete instead of get/put/delete to get/put/delete a group of related keys with same values atomically. Every get retrieves the group of keys and checks that their values are same. This does not have the above problems but the downside is that it does less amount of validation than the other approach. Test Plan: This whole this is a test! Here is a small run. I am doing larger run now. [nponnekanti@dev902 /data/users/nponnekanti/rocksdb] ./db_stress --ops_per_thread=10000 --multi=1 --ops_per_key=25 LevelDB version : 1.5 Number of threads : 32 Ops per thread : 10000 Read percentage : 10 Delete percentage : 30 Max key : 2147483648 Num times DB reopens: 10 Num keys per lock : 4 Compression : snappy ------------------------------------------------ Creating 536870912 locks 2013/02/20-16:59:32 Starting database operations Created bg thread 0x7f9ebcfff700 2013/02/20-16:59:37 Reopening database for the 1th time 2013/02/20-16:59:46 Reopening database for the 2th time 2013/02/20-16:59:57 Reopening database for the 3th time 2013/02/20-17:00:11 Reopening database for the 4th time 2013/02/20-17:00:25 Reopening database for the 5th time 2013/02/20-17:00:36 Reopening database for the 6th time 2013/02/20-17:00:47 Reopening database for the 7th time 2013/02/20-17:00:59 Reopening database for the 8th time 2013/02/20-17:01:10 Reopening database for the 9th time 2013/02/20-17:01:20 Reopening database for the 10th time 2013/02/20-17:01:31 Reopening database for the 11th time 2013/02/20-17:01:31 Starting verification Stress Test : 109.125 micros/op 22191 ops/sec : Wrote 0.00 MB (0.23 MB/sec) (59% of 32 ops) : Deleted 10 times 2013/02/20-17:01:31 Verification successful Revert Plan: OK Task ID: # Reviewers: dhruba, emayanke Reviewed By: emayanke CC: leveldb Differential Revision: https://reviews.facebook.net/D8733 --- tools/db_stress.cc | 185 ++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 174 insertions(+), 11 deletions(-) diff --git a/tools/db_stress.cc b/tools/db_stress.cc index ed68626a03..900b6f38cf 100644 --- a/tools/db_stress.cc +++ b/tools/db_stress.cc @@ -39,6 +39,16 @@ static uint32_t FLAGS_seed = 2341234; // Max number of key/values to place in database static long FLAGS_max_key = 2 * KB * KB * KB; +// If set, the test uses MultiGet, MultiPut and MultiDelete that +// do a different kind of validation during the test itself, +// rather than at the end. This is meant to solve the following +// problems at the expense of doing less degree of validation. +// (a) No need to acquire mutexes during writes (less cache flushes +// in multi-core leading to speed up) +// (b) No long validation at the end (more speed up) +// (c) Also test snapshot and atomicity of batch writes +static bool FLAGS_test_batches_snapshots = false; + // Number of concurrent threads to run. static int FLAGS_threads = 32; @@ -165,6 +175,8 @@ class Stats { long done_; long writes_; long deletes_; + long founds_; + long errors_; int next_report_; size_t bytes_; double last_op_finish_; @@ -179,6 +191,8 @@ class Stats { done_ = 0; writes_ = 0; deletes_ = 0; + founds_ = 0; + errors_ = 0; bytes_ = 0; seconds_ = 0; start_ = FLAGS_env->NowMicros(); @@ -228,13 +242,21 @@ class Stats { } } - void AddBytesForOneWrite(size_t n) { - writes_ ++; - bytes_ += n; + void AddBytesForWrites(int nwrites, size_t nbytes) { + writes_ += nwrites; + bytes_ += nbytes; } - void AddOneDelete() { - deletes_ ++; + void AddDeletes(int n) { + deletes_ += n; + } + + void AddFounds(int n) { + founds_ += n; + } + + void AddErrors(int n) { + errors_ += n; } void Report(const char* name) { @@ -255,6 +277,8 @@ class Stats { fprintf(stdout, "%-12s: Wrote %.2f MB (%.2f MB/sec) (%ld%% of %ld ops)\n", "", bytes_mb, rate, (100*writes_)/done_, done_); fprintf(stdout, "%-12s: Deleted %ld times\n", "", deletes_); + fprintf(stdout, "%-12s: Found lookups %ld times\n", "", founds_); + fprintf(stdout, "%-12s: Got errors %ld times\n", "", errors_); if (FLAGS_histogram) { fprintf(stdout, "Microseconds per op:\n%s\n", hist_.ToString().c_str()); @@ -281,10 +305,17 @@ class SharedState { start_(false), start_verify_(false), stress_test_(stress_test) { + if (FLAGS_test_batches_snapshots) { + key_locks_ = nullptr; + values_ = nullptr; + fprintf(stdout, "No lock creation because test_batches_snapshots set\n"); + return; + } values_ = new uint32_t[max_key_]; for (long i = 0; i < max_key_; i++) { values_[i] = SENTINEL; } + long num_locks = (max_key_ >> log2_keys_per_lock_); if (max_key_ & ((1 << log2_keys_per_lock_) - 1)) { num_locks ++; @@ -533,7 +564,10 @@ class StressTest { } } - thread->shared->GetStressTest()->VerifyDb(*(thread->shared), thread->tid); + if (!FLAGS_test_batches_snapshots) { + thread->shared->GetStressTest()->VerifyDb(*(thread->shared), + thread->tid); + } { MutexLock l(shared->GetMutex()); @@ -545,6 +579,119 @@ class StressTest { } + // Given a key K and value V, this puts ("0"+K, "0"+V), ("1"+K, "1"+V), ... + // ("9"+K, "9"+V) in DB atomically i.e in a single batch. + // Also refer MultiGet. + Status MultiPut(ThreadState* thread, + const WriteOptions& writeoptions, + const Slice& key, const Slice& value, size_t sz) { + std::string keys[10] = {"9", "8", "7", "6", "5", + "4", "3", "2", "1", "0"}; + std::string values[10] = {"9", "8", "7", "6", "5", + "4", "3", "2", "1", "0"}; + Slice value_slices[10]; + WriteBatch batch; + Status s; + for (int i = 0; i < 10; i++) { + keys[i] += key.ToString(); + values[i] += value.ToString(); + value_slices[i] = values[i]; + batch.Put(keys[i], value_slices[i]); + } + + s = db_->Write(writeoptions, &batch); + if (!s.ok()) { + fprintf(stderr, "multiput error: %s\n", s.ToString().c_str()); + thread->stats.AddErrors(1); + } else { + // we did 10 writes each of size sz + 1 + thread->stats.AddBytesForWrites(10, (sz + 1) * 10); + } + + return s; + } + + + // Given a key K, this deletes ("0"+K), ("1"+K),... ("9"+K) + // in DB atomically i.e in a single batch. Also refer MultiGet. + Status MultiDelete(ThreadState* thread, + const WriteOptions& writeoptions, + const Slice& key) { + std::string keys[10] = {"9", "7", "5", "3", "1", + "8", "6", "4", "2", "0"}; + + WriteBatch batch; + Status s; + for (int i = 0; i < 10; i++) { + keys[i] += key.ToString(); + batch.Delete(keys[i]); + } + + s = db_->Write(writeoptions, &batch); + if (!s.ok()) { + fprintf(stderr, "multidelete error: %s\n", s.ToString().c_str()); + thread->stats.AddErrors(1); + } else { + thread->stats.AddDeletes(10); + } + + return s; + } + + // Given a key K, this gets values for "0"+K, "1"+K,..."9"+K + // in the same snapshot, and verifies that all the values are of the form + // "0"+V, "1"+V,..."9"+V. + // ASSUMES that MultiPut was used to put (K, V) into the DB. + Status MultiGet(ThreadState* thread, + const ReadOptions& readoptions, + const Slice& key, std::string* value) { + std::string keys[10] = {"0", "1", "2", "3", "4", "5", "6", "7", "8", "9"}; + Slice key_slices[10]; + std::string values[10]; + ReadOptions readoptionscopy = readoptions; + readoptionscopy.snapshot = db_->GetSnapshot(); + Status s; + for (int i = 0; i < 10; i++) { + keys[i] += key.ToString(); + key_slices[i] = keys[i]; + s = db_->Get(readoptionscopy, key_slices[i], value); + if (!s.ok() && !s.IsNotFound()) { + fprintf(stderr, "get error: %s\n", s.ToString().c_str()); + values[i] = ""; + thread->stats.AddErrors(1); + // we continue after error rather than exiting so that we can + // find more errors if any + } else if (s.IsNotFound()) { + values[i] = ""; + } else { + values[i] = *value; + + char expected_prefix = (keys[i])[0]; + char actual_prefix = (values[i])[0]; + if (actual_prefix != expected_prefix) { + fprintf(stderr, "expected prefix = %c actual = %c\n", + expected_prefix, actual_prefix); + } + (values[i])[0] = ' '; // blank out the differing character + thread->stats.AddFounds(1); + } + } + db_->ReleaseSnapshot(readoptionscopy.snapshot); + + // Now that we retrieved all values, check that they all match + for (int i = 1; i < 10; i++) { + if (values[i] != values[0]) { + fprintf(stderr, "inconsistent values for key %s: %s, %s\n", + key.ToString().c_str(), values[0].c_str(), + values[i].c_str()); + // we continue after error rather than exiting so that we can + // find more errors if any + } + } + + return s; + } + void OperateDb(ThreadState* thread) { ReadOptions read_opts(FLAGS_verify_checksum, true); WriteOptions write_opts; @@ -573,36 +720,45 @@ class StressTest { thread->stats.Start(); } } + long rand_key = thread->rand.Next() % max_key; Slice key((char*)&rand_key, sizeof(rand_key)); //Read:10%;Delete:30%;Write:60% unsigned int probability_operation = thread->rand.Uniform(100); if (probability_operation < FLAGS_readpercent) { // read load - db_->Get(read_opts, key, &from_db); + FLAGS_test_batches_snapshots ? + MultiGet(thread, read_opts, key, &from_db) : + db_->Get(read_opts, key, &from_db); } else if (probability_operation < FLAGS_delpercent + FLAGS_readpercent) { //introduce delete load - { + if (!FLAGS_test_batches_snapshots) { MutexLock l(thread->shared->GetMutexForKey(rand_key)); thread->shared->Delete(rand_key); db_->Delete(write_opts, key); + thread->stats.AddDeletes(1); + } else { + MultiDelete(thread, write_opts, key); } - thread->stats.AddOneDelete(); + } else { // write load uint32_t value_base = thread->rand.Next(); size_t sz = GenerateValue(value_base, value, sizeof(value)); Slice v(value, sz); - { + if (!FLAGS_test_batches_snapshots) { MutexLock l(thread->shared->GetMutexForKey(rand_key)); if (FLAGS_verify_before_write) { VerifyValue(rand_key, read_opts, *(thread->shared), &from_db, true); } thread->shared->Put(rand_key, value_base); db_->Put(write_opts, key, v); + thread->stats.AddBytesForWrites(1, sz); + } else { + MultiPut(thread, write_opts, key, v, sz); } + PrintKeyValue(rand_key, value, sz); - thread->stats.AddBytesForOneWrite(sz); } thread->stats.FinishedSingleOp(); } @@ -682,7 +838,11 @@ class StressTest { fprintf(stdout, "Read percentage : %d\n", FLAGS_readpercent); fprintf(stdout, "Delete percentage : %d\n", FLAGS_delpercent); fprintf(stdout, "Max key : %ld\n", FLAGS_max_key); + fprintf(stdout, "Ratio #ops/#keys : %ld\n", + (FLAGS_ops_per_thread * FLAGS_threads)/FLAGS_max_key); fprintf(stdout, "Num times DB reopens: %d\n", FLAGS_reopen); + fprintf(stdout, "Batches/snapshots : %d\n", + FLAGS_test_batches_snapshots); fprintf(stdout, "Num keys per lock : %d\n", 1 << FLAGS_log2_keys_per_lock); @@ -807,6 +967,9 @@ int main(int argc, char** argv) { } else if (sscanf(argv[i], "--verify_before_write=%d%c", &n, &junk) == 1 && (n == 0 || n == 1)) { FLAGS_verify_before_write = n; + } else if (sscanf(argv[i], "--test_batches_snapshots=%d%c", &n, &junk) == 1 + && (n == 0 || n == 1)) { + FLAGS_test_batches_snapshots = n; } else if (sscanf(argv[i], "--threads=%d%c", &n, &junk) == 1) { FLAGS_threads = n; } else if (sscanf(argv[i], "--value_size_mult=%d%c", &n, &junk) == 1) {