diff --git a/src.mk b/src.mk index f980750288..37114717d4 100644 --- a/src.mk +++ b/src.mk @@ -101,6 +101,7 @@ LIB_SOURCES = \ util/env_posix.cc \ util/io_posix.cc \ util/thread_posix.cc \ + util/transaction_test_util.cc \ util/sst_file_manager_impl.cc \ util/file_util.cc \ util/file_reader_writer.cc \ diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index 65ce703f17..4f00375f80 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -36,33 +36,34 @@ #include "db/db_impl.h" #include "db/version_set.h" -#include "rocksdb/options.h" +#include "hdfs/env_hdfs.h" +#include "port/port.h" +#include "port/stack_trace.h" #include "rocksdb/cache.h" #include "rocksdb/db.h" #include "rocksdb/env.h" -#include "rocksdb/memtablerep.h" -#include "rocksdb/write_batch.h" -#include "rocksdb/slice.h" #include "rocksdb/filter_policy.h" -#include "rocksdb/rate_limiter.h" -#include "rocksdb/slice_transform.h" +#include "rocksdb/memtablerep.h" +#include "rocksdb/options.h" #include "rocksdb/perf_context.h" +#include "rocksdb/rate_limiter.h" +#include "rocksdb/slice.h" +#include "rocksdb/slice_transform.h" #include "rocksdb/utilities/flashcache.h" +#include "rocksdb/utilities/optimistic_transaction_db.h" #include "rocksdb/utilities/transaction.h" #include "rocksdb/utilities/transaction_db.h" -#include "rocksdb/utilities/optimistic_transaction_db.h" -#include "port/port.h" -#include "port/stack_trace.h" -#include "util/crc32c.h" +#include "rocksdb/write_batch.h" #include "util/compression.h" +#include "util/crc32c.h" #include "util/histogram.h" #include "util/mutexlock.h" #include "util/random.h" -#include "util/string_util.h" #include "util/statistics.h" +#include "util/string_util.h" #include "util/testutil.h" +#include "util/transaction_test_util.h" #include "util/xxhash.h" -#include "hdfs/env_hdfs.h" #include "utilities/merge_operators.h" #ifdef OS_WIN @@ -3763,18 +3764,22 @@ class Benchmark { ReadOptions options(FLAGS_verify_checksum, true); Duration duration(FLAGS_duration, readwrites_); ReadOptions read_options(FLAGS_verify_checksum, true); - std::string value; - DB* db = db_.db; - uint64_t transactions_done = 0; - uint64_t transactions_aborted = 0; - Status s; uint64_t num_prefix_ranges = FLAGS_transaction_sets; + uint64_t transactions_done = 0; if (num_prefix_ranges == 0 || num_prefix_ranges > 9999) { fprintf(stderr, "invalid value for transaction_sets\n"); abort(); } + TransactionOptions txn_options; + txn_options.lock_timeout = FLAGS_transaction_lock_timeout; + txn_options.set_snapshot = FLAGS_transaction_set_snapshot; + + RandomTransactionInserter inserter(&thread->rand, write_options_, + read_options, FLAGS_num, + num_prefix_ranges); + if (FLAGS_num_multi_db > 1) { fprintf(stderr, "Cannot run RandomTransaction benchmark with " @@ -3783,126 +3788,26 @@ class Benchmark { } while (!duration.Done(1)) { - Transaction* txn = nullptr; - WriteBatch* batch = nullptr; + bool success; + // RandomTransactionInserter will attempt to insert a key for each + // # of FLAGS_transaction_sets if (FLAGS_optimistic_transaction_db) { - txn = db_.opt_txn_db->BeginTransaction(write_options_); - assert(txn); + success = inserter.OptimisticTransactionDBInsert(db_.opt_txn_db); } else if (FLAGS_transaction_db) { TransactionDB* txn_db = reinterpret_cast(db_.db); - - TransactionOptions txn_options; - txn_options.lock_timeout = FLAGS_transaction_lock_timeout; - - txn = txn_db->BeginTransaction(write_options_, txn_options); - assert(txn); + success = inserter.TransactionDBInsert(txn_db, txn_options); } else { - batch = new WriteBatch(); + success = inserter.DBInsert(db_.db); } - if (txn && FLAGS_transaction_set_snapshot) { - txn->SetSnapshot(); - } - - // pick a random number to use to increment a key in each set - uint64_t incr = (thread->rand.Next() % 100) + 1; - - bool failed = false; - // For each set, pick a key at random and increment it - for (uint8_t i = 0; i < num_prefix_ranges; i++) { - uint64_t int_value; - char prefix_buf[5]; - - // key format: [SET#][random#] - std::string rand_key = ToString(thread->rand.Next() % FLAGS_num); - Slice base_key(rand_key); - - // Pad prefix appropriately so we can iterate over each set - snprintf(prefix_buf, sizeof(prefix_buf), "%04d", i + 1); - std::string full_key = std::string(prefix_buf) + base_key.ToString(); - Slice key(full_key); - - if (txn) { - s = txn->GetForUpdate(read_options, key, &value); - } else { - s = db->Get(read_options, key, &value); - } - - if (s.ok()) { - int_value = std::stoull(value); - - if (int_value == 0 || int_value == ULONG_MAX) { - fprintf(stderr, "Get returned unexpected value: %s\n", - value.c_str()); - abort(); - } - } else if (s.IsNotFound()) { - int_value = 0; - } else if (!(s.IsBusy() || s.IsTimedOut() || s.IsTryAgain())) { - fprintf(stderr, "Get returned an unexpected error: %s\n", - s.ToString().c_str()); - abort(); - } else { - failed = true; - break; - } - - if (FLAGS_transaction_sleep > 0) { - FLAGS_env->SleepForMicroseconds(thread->rand.Next() % - FLAGS_transaction_sleep); - } - - std::string sum = ToString(int_value + incr); - if (txn) { - s = txn->Put(key, sum); - if (!s.ok()) { - // Since we did a GetForUpdate, Put should not fail. - fprintf(stderr, "Put returned an unexpected error: %s\n", - s.ToString().c_str()); - abort(); - } - } else { - batch->Put(key, sum); - } - } - - if (txn) { - if (failed) { - transactions_aborted++; - txn->Rollback(); - s = Status::OK(); - } else { - s = txn->Commit(); - } - } else { - s = db->Write(write_options_, batch); - } - - if (!s.ok()) { - failed = true; - - // Ideally, we'd want to run this stress test with enough concurrency - // on a small enough set of keys that we get some failed transactions - // due to conflicts. - if (FLAGS_optimistic_transaction_db && - (s.IsBusy() || s.IsTimedOut() || s.IsTryAgain())) { - transactions_aborted++; - } else if (FLAGS_transaction_db && s.IsExpired()) { - transactions_aborted++; - } else { - fprintf(stderr, "Unexpected write error: %s\n", s.ToString().c_str()); - abort(); - } - } - - delete txn; - delete batch; - - if (!failed) { - thread->stats.FinishedOps(nullptr, db, 1, kOthers); + if (!success) { + fprintf(stderr, "Unexpected error: %s\n", + inserter.GetLastStatus().ToString().c_str()); + abort(); } + thread->stats.FinishedOps(nullptr, db_.db, 1, kOthers); transactions_done++; } @@ -3910,7 +3815,7 @@ class Benchmark { if (FLAGS_optimistic_transaction_db || FLAGS_transaction_db) { snprintf(msg, sizeof(msg), "( transactions:%" PRIu64 " aborts:%" PRIu64 ")", - transactions_done, transactions_aborted); + transactions_done, inserter.GetFailureCount()); } else { snprintf(msg, sizeof(msg), "( batches:%" PRIu64 " )", transactions_done); } @@ -3930,50 +3835,14 @@ class Benchmark { return; } - uint64_t prev_total = 0; + Status s = + RandomTransactionInserter::Verify(db_.db, FLAGS_transaction_sets); - // For each set of keys with the same prefix, sum all the values - for (uint32_t i = 0; i < FLAGS_transaction_sets; i++) { - char prefix_buf[5]; - snprintf(prefix_buf, sizeof(prefix_buf), "%04u", i + 1); - uint64_t total = 0; - - Iterator* iter = db_.db->NewIterator(ReadOptions()); - - for (iter->Seek(Slice(prefix_buf, 4)); iter->Valid(); iter->Next()) { - Slice key = iter->key(); - - // stop when we reach a different prefix - if (key.ToString().compare(0, 4, prefix_buf) != 0) { - break; - } - - Slice value = iter->value(); - uint64_t int_value = std::stoull(value.ToString()); - if (int_value == 0 || int_value == ULONG_MAX) { - fprintf(stderr, "Iter returned unexpected value: %s\n", - value.ToString().c_str()); - abort(); - } - - total += int_value; - } - delete iter; - - if (i > 0) { - if (total != prev_total) { - fprintf(stderr, - "RandomTransactionVerify found inconsistent totals. " - "Set[%" PRIu32 "]: %" PRIu64 ", Set[%" PRIu32 "]: %" PRIu64 - " \n", - i - 1, prev_total, i, total); - abort(); - } - } - prev_total = total; + if (s.ok()) { + fprintf(stdout, "RandomTransactionVerify Success.\n"); + } else { + fprintf(stdout, "RandomTransactionVerify FAILED!!\n"); } - - fprintf(stdout, "RandomTransactionVerify Success!\n"); } #endif // ROCKSDB_LITE diff --git a/util/transaction_test_util.cc b/util/transaction_test_util.cc new file mode 100644 index 0000000000..7ec990374f --- /dev/null +++ b/util/transaction_test_util.cc @@ -0,0 +1,237 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +#ifndef ROCKSDB_LITE + +#ifndef __STDC_FORMAT_MACROS +#define __STDC_FORMAT_MACROS +#endif + +#include "util/transaction_test_util.h" + +#include +#include + +#include "rocksdb/db.h" +#include "rocksdb/utilities/optimistic_transaction_db.h" +#include "rocksdb/utilities/transaction.h" +#include "rocksdb/utilities/transaction_db.h" +#include "util/random.h" +#include "util/string_util.h" + +namespace rocksdb { + +RandomTransactionInserter::RandomTransactionInserter( + Random64* rand, const WriteOptions& write_options, + const ReadOptions& read_options, uint64_t num_keys, uint16_t num_sets) + : rand_(rand), + write_options_(write_options), + read_options_(read_options), + num_keys_(num_keys), + num_sets_(num_sets) {} + +RandomTransactionInserter::~RandomTransactionInserter() { + if (txn_ != nullptr) { + delete txn_; + } + if (optimistic_txn_ != nullptr) { + delete optimistic_txn_; + } +} + +bool RandomTransactionInserter::TransactionDBInsert( + TransactionDB* db, const TransactionOptions& txn_options) { + txn_ = db->BeginTransaction(write_options_, txn_options, txn_); + + return DoInsert(nullptr, txn_, false); +} + +bool RandomTransactionInserter::OptimisticTransactionDBInsert( + OptimisticTransactionDB* db, + const OptimisticTransactionOptions& txn_options) { + optimistic_txn_ = + db->BeginTransaction(write_options_, txn_options, optimistic_txn_); + + return DoInsert(nullptr, optimistic_txn_, true); +} + +bool RandomTransactionInserter::DBInsert(DB* db) { + return DoInsert(db, nullptr, false); +} + +bool RandomTransactionInserter::DoInsert(DB* db, Transaction* txn, + bool is_optimistic) { + Status s; + WriteBatch batch; + std::string value; + + // pick a random number to use to increment a key in each set + uint64_t incr = (rand_->Next() % 100) + 1; + + bool unexpected_error = false; + + // For each set, pick a key at random and increment it + for (uint8_t i = 0; i < num_sets_; i++) { + uint64_t int_value = 0; + char prefix_buf[5]; + // prefix_buf needs to be large enough to hold a uint16 in string form + + // key format: [SET#][random#] + std::string rand_key = ToString(rand_->Next() % num_keys_); + Slice base_key(rand_key); + + // Pad prefix appropriately so we can iterate over each set + snprintf(prefix_buf, sizeof(prefix_buf), "%.4u", i + 1); + std::string full_key = std::string(prefix_buf) + base_key.ToString(); + Slice key(full_key); + + if (txn != nullptr) { + s = txn->GetForUpdate(read_options_, key, &value); + } else { + s = db->Get(read_options_, key, &value); + } + + if (s.ok()) { + // Found key, parse its value + int_value = std::stoull(value); + + if (int_value == 0 || int_value == ULONG_MAX) { + unexpected_error = true; + fprintf(stderr, "Get returned unexpected value: %s\n", value.c_str()); + s = Status::Corruption(); + } + } else if (s.IsNotFound()) { + // Have not yet written to this key, so assume its value is 0 + int_value = 0; + s = Status::OK(); + } else { + // Optimistic transactions should never return non-ok status here. + // Non-optimistic transactions may return write-coflict/timeout errors. + if (is_optimistic || !(s.IsBusy() || s.IsTimedOut() || s.IsTryAgain())) { + fprintf(stderr, "Get returned an unexpected error: %s\n", + s.ToString().c_str()); + unexpected_error = true; + } + break; + } + + if (s.ok()) { + // Increment key + std::string sum = ToString(int_value + incr); + if (txn != nullptr) { + s = txn->Put(key, sum); + if (!s.ok()) { + // Since we did a GetForUpdate, Put should not fail. + fprintf(stderr, "Put returned an unexpected error: %s\n", + s.ToString().c_str()); + unexpected_error = true; + } + } else { + batch.Put(key, sum); + } + } + } + + if (s.ok()) { + if (txn != nullptr) { + s = txn->Commit(); + + if (!s.ok()) { + if (is_optimistic) { + // Optimistic transactions can have write-conflict errors on commit. + // Any other error is unexpected. + if (!(s.IsBusy() || s.IsTimedOut() || s.IsTryAgain())) { + unexpected_error = true; + } + } else { + // Non-optimistic transactions should only fail due to expiration + // or write failures. For testing purproses, we do not expect any + // write failures. + if (!s.IsExpired()) { + unexpected_error = true; + } + } + + if (unexpected_error) { + fprintf(stderr, "Commit returned an unexpected error: %s\n", + s.ToString().c_str()); + } + } + + } else { + s = db->Write(write_options_, &batch); + if (!s.ok()) { + unexpected_error = true; + fprintf(stderr, "Write returned an unexpected error: %s\n", + s.ToString().c_str()); + } + } + } else { + if (txn != nullptr) { + txn->Rollback(); + } + } + + if (s.ok()) { + success_count_++; + } else { + failure_count_++; + } + + last_status_ = s; + + // return success if we didn't get any unexpected errors + return !unexpected_error; +} + +Status RandomTransactionInserter::Verify(DB* db, uint16_t num_sets) { + uint64_t prev_total = 0; + + // For each set of keys with the same prefix, sum all the values + for (uint32_t i = 0; i < num_sets; i++) { + char prefix_buf[5]; + snprintf(prefix_buf, sizeof(prefix_buf), "%.4u", i + 1); + uint64_t total = 0; + + Iterator* iter = db->NewIterator(ReadOptions()); + + for (iter->Seek(Slice(prefix_buf, 4)); iter->Valid(); iter->Next()) { + Slice key = iter->key(); + + // stop when we reach a different prefix + if (key.ToString().compare(0, 4, prefix_buf) != 0) { + break; + } + + Slice value = iter->value(); + uint64_t int_value = std::stoull(value.ToString()); + if (int_value == 0 || int_value == ULONG_MAX) { + fprintf(stderr, "Iter returned unexpected value: %s\n", + value.ToString().c_str()); + return Status::Corruption(); + } + + total += int_value; + } + delete iter; + + if (i > 0) { + if (total != prev_total) { + fprintf(stderr, + "RandomTransactionVerify found inconsistent totals. " + "Set[%" PRIu32 "]: %" PRIu64 ", Set[%" PRIu32 "]: %" PRIu64 + " \n", + i - 1, prev_total, i, total); + return Status::Corruption(); + } + } + prev_total = total; + } + + return Status::OK(); +} + +} // namespace rocksdb + +#endif // ROCKSDB_LITE diff --git a/util/transaction_test_util.h b/util/transaction_test_util.h new file mode 100644 index 0000000000..c9885fc5f0 --- /dev/null +++ b/util/transaction_test_util.h @@ -0,0 +1,111 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#pragma once + +#ifndef ROCKSDB_LITE + +#include "rocksdb/options.h" +#include "rocksdb/utilities/optimistic_transaction_db.h" +#include "rocksdb/utilities/transaction_db.h" + +namespace rocksdb { + +class DB; +class Random64; + +// Utility class for stress testing transactions. Can be used to write many +// transactions in parallel and then validate that the data written is logically +// consistent. This class assumes the input DB is initially empty. +// +// Each call to TransactionDBInsert()/OptimisticTransactionDBInsert() will +// increment the value of a key in #num_sets sets of keys. Regardless of +// whether the transaction succeeds, the total sum of values of keys in each +// set is an invariant that should remain equal. +// +// After calling TransactionDBInsert()/OptimisticTransactionDBInsert() many +// times, Verify() can be called to validate that the invariant holds. +// +// To test writing Transaction in parallel, multiple threads can create a +// RandomTransactionInserter with similar arguments using the same DB. +class RandomTransactionInserter { + public: + // num_keys is the number of keys in each set. + // num_sets is the number of sets of keys. + explicit RandomTransactionInserter( + Random64* rand, const WriteOptions& write_options = WriteOptions(), + const ReadOptions& read_options = ReadOptions(), uint64_t num_keys = 1000, + uint16_t num_sets = 3); + + ~RandomTransactionInserter(); + + // Increment a key in each set using a Transaction on a TransactionDB. + // + // Returns true if the transaction succeeded OR if any error encountered was + // expected (eg a write-conflict). Error status may be obtained by calling + // GetLastStatus(); + bool TransactionDBInsert( + TransactionDB* db, + const TransactionOptions& txn_options = TransactionOptions()); + + // Increment a key in each set using a Transaction on an + // OptimisticTransactionDB + // + // Returns true if the transaction succeeded OR if any error encountered was + // expected (eg a write-conflict). Error status may be obtained by calling + // GetLastStatus(); + bool OptimisticTransactionDBInsert( + OptimisticTransactionDB* db, + const OptimisticTransactionOptions& txn_options = + OptimisticTransactionOptions()); + // Increment a key in each set without using a transaction. If this function + // is called in parallel, then Verify() may fail. + // + // Returns true if the write succeeds. + // Error status may be obtained by calling GetLastStatus(). + bool DBInsert(DB* db); + + // Returns OK if Invariant is true. + static Status Verify(DB* db, uint16_t num_sets); + + // Returns the status of the previous Insert operation + Status GetLastStatus() { return last_status_; } + + // Returns the number of successfully written calls to + // TransactionDBInsert/OptimisticTransactionDBInsert/DBInsert + uint64_t GetSuccessCount() { return success_count_; } + + // Returns the number of calls to + // TransactionDBInsert/OptimisticTransactionDBInsert/DBInsert that did not + // write any data. + uint64_t GetFailureCount() { return failure_count_; } + + private: + // Input options + Random64* rand_; + const WriteOptions write_options_; + const ReadOptions read_options_; + const uint64_t num_keys_; + const uint16_t num_sets_; + + // Number of successful insert batches performed + uint64_t success_count_ = 0; + + // Number of failed insert batches attempted + uint64_t failure_count_ = 0; + + // Status returned by most recent insert operation + Status last_status_; + + // optimization: re-use allocated transaction objects. + Transaction* txn_ = nullptr; + Transaction* optimistic_txn_ = nullptr; + + bool DoInsert(DB* db, Transaction* txn, bool is_optimistic); +}; + +} // namespace rocksdb + +#endif // ROCKSDB_LITE diff --git a/utilities/transactions/optimistic_transaction_test.cc b/utilities/transactions/optimistic_transaction_test.cc index b672b8722a..688f3d11a3 100644 --- a/utilities/transactions/optimistic_transaction_test.cc +++ b/utilities/transactions/optimistic_transaction_test.cc @@ -6,12 +6,16 @@ #ifndef ROCKSDB_LITE #include +#include #include "rocksdb/db.h" -#include "rocksdb/utilities/transaction.h" #include "rocksdb/utilities/optimistic_transaction_db.h" +#include "rocksdb/utilities/transaction.h" +#include "util/crc32c.h" #include "util/logging.h" +#include "util/random.h" #include "util/testharness.h" +#include "util/transaction_test_util.h" using std::string; @@ -1267,88 +1271,70 @@ TEST_F(OptimisticTransactionTest, UndoGetForUpdateTest) { delete txn1; } -TEST_F(OptimisticTransactionTest, ReinitializeTest) { +namespace { +Status OptimisticTransactionStressTestInserter(OptimisticTransactionDB* db, + const size_t num_transactions, + const size_t num_sets, + const size_t num_keys_per_set) { + size_t seed = std::hash()(std::this_thread::get_id()); + Random64 _rand(seed); WriteOptions write_options; ReadOptions read_options; OptimisticTransactionOptions txn_options; - string value; - Status s; - - Transaction* txn1 = txn_db->BeginTransaction(write_options, txn_options); - - txn1 = txn_db->BeginTransaction(write_options, txn_options, txn1); - - s = txn1->Put("Z", "z"); - ASSERT_OK(s); - - s = txn1->Commit(); - ASSERT_OK(s); - - txn1 = txn_db->BeginTransaction(write_options, txn_options, txn1); - - s = txn1->Put("Z", "zz"); - ASSERT_OK(s); - - // Reinitilize txn1 and verify that zz is not written - txn1 = txn_db->BeginTransaction(write_options, txn_options, txn1); - - s = txn1->Commit(); - ASSERT_OK(s); - s = db->Get(read_options, "Z", &value); - ASSERT_OK(s); - ASSERT_EQ(value, "z"); - - // Verify snapshots get reinitialized correctly - txn1->SetSnapshot(); - s = txn1->Put("Z", "zzzz"); - ASSERT_OK(s); - - s = txn1->Commit(); - ASSERT_OK(s); - - s = db->Get(read_options, "Z", &value); - ASSERT_OK(s); - ASSERT_EQ(value, "zzzz"); - - const Snapshot* snapshot = txn1->GetSnapshot(); - ASSERT_TRUE(snapshot); - - txn1 = txn_db->BeginTransaction(write_options, txn_options, txn1); - snapshot = txn1->GetSnapshot(); - ASSERT_FALSE(snapshot); - txn_options.set_snapshot = true; - txn1 = txn_db->BeginTransaction(write_options, txn_options, txn1); - snapshot = txn1->GetSnapshot(); - ASSERT_TRUE(snapshot); - s = txn1->Put("Z", "a"); + RandomTransactionInserter inserter(&_rand, write_options, read_options, + num_keys_per_set, num_sets); + + for (size_t t = 0; t < num_transactions; t++) { + bool success = inserter.OptimisticTransactionDBInsert(db, txn_options); + if (!success) { + // unexpected failure + return inserter.GetLastStatus(); + } + } + + // Make sure at least some of the transactions succeeded. It's ok if + // some failed due to write-conflicts. + if (inserter.GetFailureCount() > num_transactions / 2) { + return Status::TryAgain("Too many transactions failed! " + + std::to_string(inserter.GetFailureCount()) + " / " + + std::to_string(num_transactions)); + } + + return Status::OK(); +} +} // namespace + +TEST_F(OptimisticTransactionTest, OptimisticTransactionStressTest) { + const size_t num_threads = 4; + const size_t num_transactions_per_thread = 10000; + const size_t num_sets = 3; + const size_t num_keys_per_set = 100; + // Setting the key-space to be 100 keys should cause enough write-conflicts + // to make this test interesting. + + std::vector threads; + + std::function call_inserter = [&] { + ASSERT_OK(OptimisticTransactionStressTestInserter( + txn_db, num_transactions_per_thread, num_sets, num_keys_per_set)); + }; + + // Create N threads that use RandomTransactionInserter to write + // many transactions. + for (uint32_t i = 0; i < num_threads; i++) { + threads.emplace_back(call_inserter); + } + + // Wait for all threads to run + for (auto& t : threads) { + t.join(); + } + + // Verify that data is consistent + Status s = RandomTransactionInserter::Verify(db, num_sets); ASSERT_OK(s); - - txn1->Rollback(); - - s = txn1->Put("Y", "y"); - ASSERT_OK(s); - - txn_options.set_snapshot = false; - txn1 = txn_db->BeginTransaction(write_options, txn_options, txn1); - snapshot = txn1->GetSnapshot(); - ASSERT_FALSE(snapshot); - - s = txn1->Put("X", "x"); - ASSERT_OK(s); - - s = txn1->Commit(); - ASSERT_OK(s); - - s = db->Get(read_options, "Z", &value); - ASSERT_OK(s); - ASSERT_EQ(value, "zzzz"); - - s = db->Get(read_options, "Y", &value); - ASSERT_TRUE(s.IsNotFound()); - - delete txn1; } } // namespace rocksdb diff --git a/utilities/transactions/transaction_test.cc b/utilities/transactions/transaction_test.cc index 6f40e5e6a0..f9bb7d96d6 100644 --- a/utilities/transactions/transaction_test.cc +++ b/utilities/transactions/transaction_test.cc @@ -6,6 +6,7 @@ #ifndef ROCKSDB_LITE #include +#include #include "db/db_impl.h" #include "rocksdb/db.h" @@ -14,9 +15,11 @@ #include "rocksdb/utilities/transaction_db.h" #include "table/mock_table.h" #include "util/logging.h" +#include "util/random.h" #include "util/sync_point.h" #include "util/testharness.h" #include "util/testutil.h" +#include "util/transaction_test_util.h" #include "utilities/merge_operators.h" #include "utilities/merge_operators/string_append/stringappend.h" @@ -2980,6 +2983,72 @@ TEST_F(TransactionTest, ExpiredTransactionDataRace1) { rocksdb::SyncPoint::GetInstance()->DisableProcessing(); } +namespace { +Status TransactionStressTestInserter(TransactionDB* db, + const size_t num_transactions, + const size_t num_sets, + const size_t num_keys_per_set) { + size_t seed = std::hash()(std::this_thread::get_id()); + Random64 _rand(seed); + WriteOptions write_options; + ReadOptions read_options; + TransactionOptions txn_options; + txn_options.set_snapshot = true; + + RandomTransactionInserter inserter(&_rand, write_options, read_options, + num_keys_per_set, num_sets); + + for (size_t t = 0; t < num_transactions; t++) { + bool success = inserter.TransactionDBInsert(db, txn_options); + if (!success) { + // unexpected failure + return inserter.GetLastStatus(); + } + } + + // Make sure at least some of the transactions succeeded. It's ok if + // some failed due to write-conflicts. + if (inserter.GetFailureCount() > num_transactions / 2) { + return Status::TryAgain("Too many transactions failed! " + + std::to_string(inserter.GetFailureCount()) + " / " + + std::to_string(num_transactions)); + } + + return Status::OK(); +} +} // namespace + +TEST_F(TransactionTest, TransactionStressTest) { + const size_t num_threads = 4; + const size_t num_transactions_per_thread = 10000; + const size_t num_sets = 3; + const size_t num_keys_per_set = 100; + // Setting the key-space to be 100 keys should cause enough write-conflicts + // to make this test interesting. + + std::vector threads; + + std::function call_inserter = [&] { + ASSERT_OK(TransactionStressTestInserter(db, num_transactions_per_thread, + num_sets, num_keys_per_set)); + }; + + // Create N threads that use RandomTransactionInserter to write + // many transactions. + for (uint32_t i = 0; i < num_threads; i++) { + threads.emplace_back(call_inserter); + } + + // Wait for all threads to run + for (auto& t : threads) { + t.join(); + } + + // Verify that data is consistent + Status s = RandomTransactionInserter::Verify(db, num_sets); + ASSERT_OK(s); +} + } // namespace rocksdb int main(int argc, char** argv) {