diff --git a/tools/db_stress.cc b/tools/db_stress.cc index ed68626a03..900b6f38cf 100644 --- a/tools/db_stress.cc +++ b/tools/db_stress.cc @@ -39,6 +39,16 @@ static uint32_t FLAGS_seed = 2341234; // Max number of key/values to place in database static long FLAGS_max_key = 2 * KB * KB * KB; +// If set, the test uses MultiGet, MultiPut and MultiDelete that +// do a different kind of validation during the test itself, +// rather than at the end. This is meant to solve the following +// problems at the expense of doing less degree of validation. +// (a) No need to acquire mutexes during writes (less cache flushes +// in multi-core leading to speed up) +// (b) No long validation at the end (more speed up) +// (c) Also test snapshot and atomicity of batch writes +static bool FLAGS_test_batches_snapshots = false; + // Number of concurrent threads to run. static int FLAGS_threads = 32; @@ -165,6 +175,8 @@ class Stats { long done_; long writes_; long deletes_; + long founds_; + long errors_; int next_report_; size_t bytes_; double last_op_finish_; @@ -179,6 +191,8 @@ class Stats { done_ = 0; writes_ = 0; deletes_ = 0; + founds_ = 0; + errors_ = 0; bytes_ = 0; seconds_ = 0; start_ = FLAGS_env->NowMicros(); @@ -228,13 +242,21 @@ class Stats { } } - void AddBytesForOneWrite(size_t n) { - writes_ ++; - bytes_ += n; + void AddBytesForWrites(int nwrites, size_t nbytes) { + writes_ += nwrites; + bytes_ += nbytes; } - void AddOneDelete() { - deletes_ ++; + void AddDeletes(int n) { + deletes_ += n; + } + + void AddFounds(int n) { + founds_ += n; + } + + void AddErrors(int n) { + errors_ += n; } void Report(const char* name) { @@ -255,6 +277,8 @@ class Stats { fprintf(stdout, "%-12s: Wrote %.2f MB (%.2f MB/sec) (%ld%% of %ld ops)\n", "", bytes_mb, rate, (100*writes_)/done_, done_); fprintf(stdout, "%-12s: Deleted %ld times\n", "", deletes_); + fprintf(stdout, "%-12s: Found lookups %ld times\n", "", founds_); + fprintf(stdout, "%-12s: Got errors %ld times\n", "", errors_); if (FLAGS_histogram) { fprintf(stdout, "Microseconds per op:\n%s\n", hist_.ToString().c_str()); @@ -281,10 +305,17 @@ class SharedState { start_(false), start_verify_(false), stress_test_(stress_test) { + if (FLAGS_test_batches_snapshots) { + key_locks_ = nullptr; + values_ = nullptr; + fprintf(stdout, "No lock creation because test_batches_snapshots set\n"); + return; + } values_ = new uint32_t[max_key_]; for (long i = 0; i < max_key_; i++) { values_[i] = SENTINEL; } + long num_locks = (max_key_ >> log2_keys_per_lock_); if (max_key_ & ((1 << log2_keys_per_lock_) - 1)) { num_locks ++; @@ -533,7 +564,10 @@ class StressTest { } } - thread->shared->GetStressTest()->VerifyDb(*(thread->shared), thread->tid); + if (!FLAGS_test_batches_snapshots) { + thread->shared->GetStressTest()->VerifyDb(*(thread->shared), + thread->tid); + } { MutexLock l(shared->GetMutex()); @@ -545,6 +579,119 @@ class StressTest { } + // Given a key K and value V, this puts ("0"+K, "0"+V), ("1"+K, "1"+V), ... + // ("9"+K, "9"+V) in DB atomically i.e in a single batch. + // Also refer MultiGet. + Status MultiPut(ThreadState* thread, + const WriteOptions& writeoptions, + const Slice& key, const Slice& value, size_t sz) { + std::string keys[10] = {"9", "8", "7", "6", "5", + "4", "3", "2", "1", "0"}; + std::string values[10] = {"9", "8", "7", "6", "5", + "4", "3", "2", "1", "0"}; + Slice value_slices[10]; + WriteBatch batch; + Status s; + for (int i = 0; i < 10; i++) { + keys[i] += key.ToString(); + values[i] += value.ToString(); + value_slices[i] = values[i]; + batch.Put(keys[i], value_slices[i]); + } + + s = db_->Write(writeoptions, &batch); + if (!s.ok()) { + fprintf(stderr, "multiput error: %s\n", s.ToString().c_str()); + thread->stats.AddErrors(1); + } else { + // we did 10 writes each of size sz + 1 + thread->stats.AddBytesForWrites(10, (sz + 1) * 10); + } + + return s; + } + + + // Given a key K, this deletes ("0"+K), ("1"+K),... ("9"+K) + // in DB atomically i.e in a single batch. Also refer MultiGet. + Status MultiDelete(ThreadState* thread, + const WriteOptions& writeoptions, + const Slice& key) { + std::string keys[10] = {"9", "7", "5", "3", "1", + "8", "6", "4", "2", "0"}; + + WriteBatch batch; + Status s; + for (int i = 0; i < 10; i++) { + keys[i] += key.ToString(); + batch.Delete(keys[i]); + } + + s = db_->Write(writeoptions, &batch); + if (!s.ok()) { + fprintf(stderr, "multidelete error: %s\n", s.ToString().c_str()); + thread->stats.AddErrors(1); + } else { + thread->stats.AddDeletes(10); + } + + return s; + } + + // Given a key K, this gets values for "0"+K, "1"+K,..."9"+K + // in the same snapshot, and verifies that all the values are of the form + // "0"+V, "1"+V,..."9"+V. + // ASSUMES that MultiPut was used to put (K, V) into the DB. + Status MultiGet(ThreadState* thread, + const ReadOptions& readoptions, + const Slice& key, std::string* value) { + std::string keys[10] = {"0", "1", "2", "3", "4", "5", "6", "7", "8", "9"}; + Slice key_slices[10]; + std::string values[10]; + ReadOptions readoptionscopy = readoptions; + readoptionscopy.snapshot = db_->GetSnapshot(); + Status s; + for (int i = 0; i < 10; i++) { + keys[i] += key.ToString(); + key_slices[i] = keys[i]; + s = db_->Get(readoptionscopy, key_slices[i], value); + if (!s.ok() && !s.IsNotFound()) { + fprintf(stderr, "get error: %s\n", s.ToString().c_str()); + values[i] = ""; + thread->stats.AddErrors(1); + // we continue after error rather than exiting so that we can + // find more errors if any + } else if (s.IsNotFound()) { + values[i] = ""; + } else { + values[i] = *value; + + char expected_prefix = (keys[i])[0]; + char actual_prefix = (values[i])[0]; + if (actual_prefix != expected_prefix) { + fprintf(stderr, "expected prefix = %c actual = %c\n", + expected_prefix, actual_prefix); + } + (values[i])[0] = ' '; // blank out the differing character + thread->stats.AddFounds(1); + } + } + db_->ReleaseSnapshot(readoptionscopy.snapshot); + + // Now that we retrieved all values, check that they all match + for (int i = 1; i < 10; i++) { + if (values[i] != values[0]) { + fprintf(stderr, "inconsistent values for key %s: %s, %s\n", + key.ToString().c_str(), values[0].c_str(), + values[i].c_str()); + // we continue after error rather than exiting so that we can + // find more errors if any + } + } + + return s; + } + void OperateDb(ThreadState* thread) { ReadOptions read_opts(FLAGS_verify_checksum, true); WriteOptions write_opts; @@ -573,36 +720,45 @@ class StressTest { thread->stats.Start(); } } + long rand_key = thread->rand.Next() % max_key; Slice key((char*)&rand_key, sizeof(rand_key)); //Read:10%;Delete:30%;Write:60% unsigned int probability_operation = thread->rand.Uniform(100); if (probability_operation < FLAGS_readpercent) { // read load - db_->Get(read_opts, key, &from_db); + FLAGS_test_batches_snapshots ? + MultiGet(thread, read_opts, key, &from_db) : + db_->Get(read_opts, key, &from_db); } else if (probability_operation < FLAGS_delpercent + FLAGS_readpercent) { //introduce delete load - { + if (!FLAGS_test_batches_snapshots) { MutexLock l(thread->shared->GetMutexForKey(rand_key)); thread->shared->Delete(rand_key); db_->Delete(write_opts, key); + thread->stats.AddDeletes(1); + } else { + MultiDelete(thread, write_opts, key); } - thread->stats.AddOneDelete(); + } else { // write load uint32_t value_base = thread->rand.Next(); size_t sz = GenerateValue(value_base, value, sizeof(value)); Slice v(value, sz); - { + if (!FLAGS_test_batches_snapshots) { MutexLock l(thread->shared->GetMutexForKey(rand_key)); if (FLAGS_verify_before_write) { VerifyValue(rand_key, read_opts, *(thread->shared), &from_db, true); } thread->shared->Put(rand_key, value_base); db_->Put(write_opts, key, v); + thread->stats.AddBytesForWrites(1, sz); + } else { + MultiPut(thread, write_opts, key, v, sz); } + PrintKeyValue(rand_key, value, sz); - thread->stats.AddBytesForOneWrite(sz); } thread->stats.FinishedSingleOp(); } @@ -682,7 +838,11 @@ class StressTest { fprintf(stdout, "Read percentage : %d\n", FLAGS_readpercent); fprintf(stdout, "Delete percentage : %d\n", FLAGS_delpercent); fprintf(stdout, "Max key : %ld\n", FLAGS_max_key); + fprintf(stdout, "Ratio #ops/#keys : %ld\n", + (FLAGS_ops_per_thread * FLAGS_threads)/FLAGS_max_key); fprintf(stdout, "Num times DB reopens: %d\n", FLAGS_reopen); + fprintf(stdout, "Batches/snapshots : %d\n", + FLAGS_test_batches_snapshots); fprintf(stdout, "Num keys per lock : %d\n", 1 << FLAGS_log2_keys_per_lock); @@ -807,6 +967,9 @@ int main(int argc, char** argv) { } else if (sscanf(argv[i], "--verify_before_write=%d%c", &n, &junk) == 1 && (n == 0 || n == 1)) { FLAGS_verify_before_write = n; + } else if (sscanf(argv[i], "--test_batches_snapshots=%d%c", &n, &junk) == 1 + && (n == 0 || n == 1)) { + FLAGS_test_batches_snapshots = n; } else if (sscanf(argv[i], "--threads=%d%c", &n, &junk) == 1) { FLAGS_threads = n; } else if (sscanf(argv[i], "--value_size_mult=%d%c", &n, &junk) == 1) {