diff --git a/tools/db_stress.cc b/tools/db_stress.cc index 3d889ced75..9df690465d 100644 --- a/tools/db_stress.cc +++ b/tools/db_stress.cc @@ -98,8 +98,11 @@ static int FLAGS_level0_stop_writes_trigger = 12; // Number of files in level-0 that will slow down writes. static int FLAGS_level0_slowdown_writes_trigger = 8; -// Ratio of reads to writes (expressed as a percentage) -static unsigned int FLAGS_readwritepercent = 10; +// Ratio of reads to total workload (expressed as a percentage) +static unsigned int FLAGS_readpercent = 10; + +// Ratio of deletes to total workload (expressed as a percentage) +static unsigned int FLAGS_delpercent = 30; // Option to disable compation triggered by read. static int FLAGS_disable_seek_compaction = false; @@ -133,6 +136,7 @@ class Stats { double seconds_; long done_; long writes_; + long deletes_; int next_report_; size_t bytes_; double last_op_finish_; @@ -146,6 +150,7 @@ class Stats { hist_.Clear(); done_ = 0; writes_ = 0; + deletes_ = 0; bytes_ = 0; seconds_ = 0; start_ = FLAGS_env->NowMicros(); @@ -157,6 +162,7 @@ class Stats { hist_.Merge(other.hist_); done_ += other.done_; writes_ += other.writes_; + deletes_ += other.deletes_; bytes_ += other.bytes_; seconds_ += other.seconds_; if (other.start_ < start_) start_ = other.start_; @@ -199,6 +205,10 @@ class Stats { bytes_ += n; } + void AddOneDelete() { + deletes_ ++; + } + void Report(const char* name) { std::string extra; if (bytes_ < 1 || done_ < 1) { @@ -216,6 +226,7 @@ class Stats { seconds_ * 1e6 / done_, (long)throughput); fprintf(stdout, "%-12s: Wrote %.2f MB (%.2f MB/sec) (%ld%% of %ld ops)\n", "", bytes_mb, rate, (100*writes_)/done_, done_); + fprintf(stdout, "%-12s: Deleted %ld times\n", "", deletes_); if (FLAGS_histogram) { fprintf(stdout, "Microseconds per op:\n%s\n", hist_.ToString().c_str()); @@ -282,7 +293,7 @@ class SharedState { num_initialized_++; } - void IncPopulated() { + void IncOperated() { num_populated_++; } @@ -294,7 +305,7 @@ class SharedState { return num_initialized_ >= num_threads_; } - bool AllPopulated() const { + bool AllOperated() const { return num_populated_ >= num_threads_; } @@ -330,6 +341,10 @@ class SharedState { return values_[key]; } + void Delete(long key) const { + values_[key] = SENTINEL; + } + uint32_t GetSeed() const { return seed_; } @@ -405,8 +420,8 @@ class StressTest { FLAGS_env->StartThread(ThreadBody, threads[i]); } // Each thread goes through the following states: - // initializing -> wait for others to init -> populate - // wait for others to populate -> verify -> done + // initializing -> wait for others to init -> read/populate/depopulate + // wait for others to operate -> verify -> done { MutexLock l(shared.GetMutex()); @@ -414,10 +429,11 @@ class StressTest { shared.GetCondVar()->Wait(); } - fprintf(stdout, "Starting to populate db\n"); + fprintf(stdout, "Starting database operations\n"); + shared.SetStart(); shared.GetCondVar()->SignalAll(); - while (!shared.AllPopulated()) { + while (!shared.AllOperated()) { shared.GetCondVar()->Wait(); } @@ -438,7 +454,7 @@ class StressTest { delete threads[i]; threads[i] = NULL; } - fprintf(stdout, "Verification successfull\n"); + fprintf(stdout, "Verification successful\n"); PrintStatistics(); } @@ -458,13 +474,12 @@ class StressTest { shared->GetCondVar()->Wait(); } } - - thread->shared->GetStressTest()->PopulateDb(thread); + thread->shared->GetStressTest()->OperateDb(thread); { MutexLock l(shared->GetMutex()); - shared->IncPopulated(); - if (shared->AllPopulated()) { + shared->IncOperated(); + if (shared->AllOperated()) { shared->GetCondVar()->SignalAll(); } while (!shared->VerifyStarted()) { @@ -484,10 +499,10 @@ class StressTest { } - void PopulateDb(ThreadState* thread) { + void OperateDb(ThreadState* thread) { ReadOptions read_opts(FLAGS_verify_checksum, true); WriteOptions write_opts; - char value[100], prev_value[100]; + char value[100]; long max_key = thread->shared->GetMaxKey(); std::string from_db; if (FLAGS_sync) { @@ -496,21 +511,31 @@ class StressTest { write_opts.disableWAL = FLAGS_disable_wal; thread->stats.Start(); - for (long i=0; i < FLAGS_ops_per_thread; i++) { + for (long i = 0; i < FLAGS_ops_per_thread; i++) { long rand_key = thread->rand.Next() % max_key; Slice key((char*)&rand_key, sizeof(rand_key)); - if (FLAGS_readwritepercent > thread->rand.Uniform(100)) { - // introduce some read load. + //Read:10%;Delete:30%;Write:60% + unsigned int probability_operation = thread->rand.Uniform(100); + if (probability_operation < FLAGS_readpercent) { + // read load db_->Get(read_opts, key, &from_db); + } else if (probability_operation < FLAGS_delpercent + FLAGS_readpercent) { + //introduce delete load + { + MutexLock l(thread->shared->GetMutexForKey(rand_key)); + thread->shared->Delete(rand_key); + db_->Delete(write_opts, key); + } + thread->stats.AddOneDelete(); } else { + // write load uint32_t value_base = thread->rand.Next(); size_t sz = GenerateValue(value_base, value, sizeof(value)); Slice v(value, sz); { MutexLock l(thread->shared->GetMutexForKey(rand_key)); if (FLAGS_verify_before_write) { - VerifyValue(rand_key, read_opts, *(thread->shared), prev_value, - sizeof(prev_value), &from_db, true); + VerifyValue(rand_key, read_opts, *(thread->shared), &from_db, true); } thread->shared->Put(rand_key, value_base); db_->Put(write_opts, key, v); @@ -525,12 +550,11 @@ class StressTest { void VerifyDb(const SharedState &shared, long start) const { ReadOptions options(FLAGS_verify_checksum, true); - char value[100]; long max_key = shared.GetMaxKey(); long step = shared.GetNumThreads(); for (long i = start; i < max_key; i+= step) { std::string from_db; - VerifyValue(i, options, shared, value, sizeof(value), &from_db); + VerifyValue(i, options, shared, &from_db, true); if (from_db.length()) { PrintKeyValue(i, from_db.data(), from_db.length()); } @@ -538,15 +562,16 @@ class StressTest { } void VerificationAbort(std::string msg, long key) const { - fprintf(stderr, "Verification failed for key %ld: %s\n", + fprintf(stderr, "Verification failed for key %ld: %s\n", key, msg.c_str()); exit(1); } void VerifyValue(long key, const ReadOptions &opts, const SharedState &shared, - char *value, size_t value_sz, std::string *value_from_db, bool strict=false) const { Slice k((char*)&key, sizeof(key)); + char value[100]; + size_t value_sz = 0; uint32_t value_base = shared.Get(key); if (value_base == SharedState::SENTINEL && !strict) { return; @@ -594,7 +619,8 @@ class StressTest { kMajorVersion, kMinorVersion); fprintf(stdout, "Number of threads : %d\n", FLAGS_threads); fprintf(stdout, "Ops per thread : %d\n", FLAGS_ops_per_thread); - fprintf(stdout, "Read percentage : %d\n", FLAGS_readwritepercent); + fprintf(stdout, "Read percentage : %d\n", FLAGS_readpercent); + fprintf(stdout, "Delete percentage : %d\n", FLAGS_delpercent); fprintf(stdout, "Max key : %ld\n", FLAGS_max_key); fprintf(stdout, "Num keys per lock : %d\n", 1 << FLAGS_log2_keys_per_lock); @@ -733,9 +759,12 @@ int main(int argc, char** argv) { } else if (sscanf(argv[i], "--sync=%d%c", &n, &junk) == 1 && (n == 0 || n == 1)) { FLAGS_sync = n; - } else if (sscanf(argv[i], "--readwritepercent=%d%c", &n, &junk) == 1 && - (n > 0 || n < 100)) { - FLAGS_readwritepercent = n; + } else if (sscanf(argv[i], "--readpercent=%d%c", &n, &junk) == 1 && + (n >= 0 && n <= 100)) { + FLAGS_readpercent = n; + } else if (sscanf(argv[i], "--delpercent=%d%c", &n, &junk) == 1 && + (n >= 0 && n <= 100)) { + FLAGS_delpercent = n; } else if (sscanf(argv[i], "--disable_data_sync=%d%c", &n, &junk) == 1 && (n == 0 || n == 1)) { FLAGS_disable_data_sync = n; @@ -787,6 +816,11 @@ int main(int argc, char** argv) { } } + if ((FLAGS_readpercent + FLAGS_delpercent) > 100) { + fprintf(stderr, "Error: Read + Delete percents > 100!\n"); + exit(1); + } + // Choose a location for the test database if none given with --db= if (FLAGS_db == NULL) { leveldb::Env::Default()->GetTestDirectory(&default_db_path);