diff --git a/include/rocksdb/utilities/transaction_db.h b/include/rocksdb/utilities/transaction_db.h index db32ba0bc3..33826bab86 100644 --- a/include/rocksdb/utilities/transaction_db.h +++ b/include/rocksdb/utilities/transaction_db.h @@ -101,6 +101,11 @@ struct TransactionDBOptions { // ordering rather than concurrency control. bool skip_concurrency_control = false; + // This option is only valid for write unprepared. If a write batch exceeds + // this threshold, then the transaction will implicitly flush the currently + // pending writes into the database. A value of 0 or less means no limit. + ssize_t default_write_batch_flush_threshold = 0; + private: // 128 entries size_t wp_snapshot_cache_bits = static_cast(7); @@ -162,6 +167,11 @@ struct TransactionOptions { // back/commit before new transactions start. // Default: false bool skip_concurrency_control = false; + + // See TransactionDBOptions::default_write_batch_flush_threshold for + // description. If a negative value is specified, then the default value from + // TransactionDBOptions is used. + ssize_t write_batch_flush_threshold = -1; }; // The per-write optimizations that do not involve transactions. TransactionDB diff --git a/utilities/transactions/transaction_test.cc b/utilities/transactions/transaction_test.cc index 534103a545..98548dd955 100644 --- a/utilities/transactions/transaction_test.cc +++ b/utilities/transactions/transaction_test.cc @@ -5303,16 +5303,8 @@ TEST_P(TransactionTest, MemoryLimitTest) { ASSERT_EQ(2, txn->GetNumPuts()); s = txn->Put(Slice("b"), Slice("....")); - auto pdb = reinterpret_cast(db); - // For write unprepared, write batches exceeding max_write_batch_size will - // just flush to DB instead of returning a memory limit error. - if (pdb->GetTxnDBOptions().write_policy != WRITE_UNPREPARED) { - ASSERT_TRUE(s.IsMemoryLimit()); - ASSERT_EQ(2, txn->GetNumPuts()); - } else { - ASSERT_OK(s); - ASSERT_EQ(3, txn->GetNumPuts()); - } + ASSERT_TRUE(s.IsMemoryLimit()); + ASSERT_EQ(2, txn->GetNumPuts()); txn->Rollback(); delete txn; diff --git a/utilities/transactions/write_unprepared_transaction_test.cc b/utilities/transactions/write_unprepared_transaction_test.cc index a2546229e4..feaedea067 100644 --- a/utilities/transactions/write_unprepared_transaction_test.cc +++ b/utilities/transactions/write_unprepared_transaction_test.cc @@ -157,7 +157,7 @@ TEST_P(WriteUnpreparedTransactionTest, ReadYourOwnWriteStress) { Transaction* txn; TransactionOptions txn_options; // batch_size of 1 causes writes to DB for every marker. - txn_options.max_write_batch_size = 1; + txn_options.write_batch_flush_threshold = 1; ReadOptions read_options; for (uint32_t i = 0; i < kNumIter; i++) { @@ -311,7 +311,7 @@ TEST_P(WriteUnpreparedTransactionTest, RecoveryTest) { // batch_size of 1 causes writes to DB for every marker. for (size_t batch_size : {1, 1000000}) { - txn_options.max_write_batch_size = batch_size; + txn_options.write_batch_flush_threshold = batch_size; for (bool empty : {true, false}) { for (Action a : {UNPREPARED, ROLLBACK, COMMIT}) { for (int num_batches = 1; num_batches < 10; num_batches++) { @@ -332,7 +332,7 @@ TEST_P(WriteUnpreparedTransactionTest, RecoveryTest) { txn->SetName("xid"); for (int i = 0; i < num_batches; i++) { ASSERT_OK(txn->Put("k" + ToString(i), "value" + ToString(i))); - if (txn_options.max_write_batch_size == 1) { + if (txn_options.write_batch_flush_threshold == 1) { ASSERT_EQ(wup_txn->GetUnpreparedSequenceNumbers().size(), i + 1); } else { ASSERT_EQ(wup_txn->GetUnpreparedSequenceNumbers().size(), 0); @@ -398,7 +398,7 @@ TEST_P(WriteUnpreparedTransactionTest, UnpreparedBatch) { // batch_size of 1 causes writes to DB for every marker. for (size_t batch_size : {1, 1000000}) { - txn_options.max_write_batch_size = batch_size; + txn_options.write_batch_flush_threshold = batch_size; for (bool prepare : {false, true}) { for (bool commit : {false, true}) { ReOpen(); @@ -408,7 +408,7 @@ TEST_P(WriteUnpreparedTransactionTest, UnpreparedBatch) { for (int i = 0; i < kNumKeys; i++) { txn->Put("k" + ToString(i), "v" + ToString(i)); - if (txn_options.max_write_batch_size == 1) { + if (txn_options.write_batch_flush_threshold == 1) { ASSERT_EQ(wup_txn->GetUnpreparedSequenceNumbers().size(), i + 1); } else { ASSERT_EQ(wup_txn->GetUnpreparedSequenceNumbers().size(), 0); @@ -457,7 +457,7 @@ TEST_P(WriteUnpreparedTransactionTest, MarkLogWithPrepSection) { WriteOptions write_options; TransactionOptions txn_options; // batch_size of 1 causes writes to DB for every marker. - txn_options.max_write_batch_size = 1; + txn_options.write_batch_flush_threshold = 1; const int kNumKeys = 10; WriteOptions wopts; diff --git a/utilities/transactions/write_unprepared_txn.cc b/utilities/transactions/write_unprepared_txn.cc index 9265c3d4af..c677013aa0 100644 --- a/utilities/transactions/write_unprepared_txn.cc +++ b/utilities/transactions/write_unprepared_txn.cc @@ -35,13 +35,12 @@ WriteUnpreparedTxn::WriteUnpreparedTxn(WriteUnpreparedTxnDB* txn_db, wupt_db_(txn_db), recovered_txn_(false), largest_validated_seq_(0) { - max_write_batch_size_ = txn_options.max_write_batch_size; - // We set max bytes to zero so that we don't get a memory limit error. - // Instead of trying to keep write batch strictly under the size limit, we - // just flush to DB when the limit is exceeded in write unprepared, to avoid - // having retry logic. This also allows very big key-value pairs that exceed - // max bytes to succeed. - write_batch_.SetMaxBytes(0); + if (txn_options.write_batch_flush_threshold < 0) { + write_batch_flush_threshold_ = + txn_db_impl_->GetTxnDBOptions().default_write_batch_flush_threshold; + } else { + write_batch_flush_threshold_ = txn_options.write_batch_flush_threshold; + } } WriteUnpreparedTxn::~WriteUnpreparedTxn() { @@ -71,8 +70,13 @@ WriteUnpreparedTxn::~WriteUnpreparedTxn() { void WriteUnpreparedTxn::Initialize(const TransactionOptions& txn_options) { PessimisticTransaction::Initialize(txn_options); - max_write_batch_size_ = txn_options.max_write_batch_size; - write_batch_.SetMaxBytes(0); + if (txn_options.write_batch_flush_threshold < 0) { + write_batch_flush_threshold_ = + txn_db_impl_->GetTxnDBOptions().default_write_batch_flush_threshold; + } else { + write_batch_flush_threshold_ = txn_options.write_batch_flush_threshold; + } + unprep_seqs_.clear(); recovered_txn_ = false; largest_validated_seq_ = 0; @@ -222,8 +226,9 @@ Status WriteUnpreparedTxn::RebuildFromWriteBatch(WriteBatch* wb) { Status WriteUnpreparedTxn::MaybeFlushWriteBatchToDB() { const bool kPrepared = true; Status s; - if (max_write_batch_size_ != 0 && - write_batch_.GetDataSize() > max_write_batch_size_) { + if (write_batch_flush_threshold_ > 0 && + write_batch_.GetDataSize() > + static_cast(write_batch_flush_threshold_)) { assert(GetState() != PREPARED); s = FlushWriteBatchToDB(!kPrepared); } diff --git a/utilities/transactions/write_unprepared_txn.h b/utilities/transactions/write_unprepared_txn.h index d81c30217d..feac749ee8 100644 --- a/utilities/transactions/write_unprepared_txn.h +++ b/utilities/transactions/write_unprepared_txn.h @@ -164,10 +164,10 @@ class WriteUnpreparedTxn : public WritePreparedTxn { Status HandleWrite(std::function do_write); // For write unprepared, we check on every writebatch append to see if - // max_write_batch_size_ has been exceeded, and then call + // write_batch_flush_threshold_ has been exceeded, and then call // FlushWriteBatchToDB if so. This logic is encapsulated in // MaybeFlushWriteBatchToDB. - size_t max_write_batch_size_; + ssize_t write_batch_flush_threshold_; WriteUnpreparedTxnDB* wupt_db_; // Ordered list of unprep_seq sequence numbers that we have already written