diff --git a/db/db_impl_write.cc b/db/db_impl_write.cc index b0abd7c4a5..6cb748b4a3 100644 --- a/db/db_impl_write.cc +++ b/db/db_impl_write.cc @@ -112,7 +112,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, w.status = WriteBatchInternal::InsertInto( &w, w.sequence, &column_family_memtables, &flush_scheduler_, write_options.ignore_missing_column_families, 0 /*log_number*/, this, - true /*concurrent_memtable_writes*/); + true /*concurrent_memtable_writes*/, seq_per_batch_); } if (write_thread_.CompleteParallelMemTableWriter(&w)) { @@ -286,7 +286,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, w.status = WriteBatchInternal::InsertInto( &w, w.sequence, &column_family_memtables, &flush_scheduler_, write_options.ignore_missing_column_families, 0 /*log_number*/, - this, true /*concurrent_memtable_writes*/); + this, true /*concurrent_memtable_writes*/, seq_per_batch_); } } if (seq_used != nullptr) { diff --git a/db/write_batch.cc b/db/write_batch.cc index d3107f5513..39d18c694b 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -1421,8 +1421,8 @@ Status WriteBatchInternal::InsertInto( nullptr /*has_valid_writes*/, seq_per_batch); for (auto w : write_group) { if (!w->ShouldWriteToMemtable()) { - inserter.MaybeAdvanceSeq(true); w->sequence = inserter.sequence(); + inserter.MaybeAdvanceSeq(true); continue; } SetSequence(w->batch, inserter.sequence()); @@ -1436,17 +1436,16 @@ Status WriteBatchInternal::InsertInto( return Status::OK(); } -Status WriteBatchInternal::InsertInto(WriteThread::Writer* writer, - SequenceNumber sequence, - ColumnFamilyMemTables* memtables, - FlushScheduler* flush_scheduler, - bool ignore_missing_column_families, - uint64_t log_number, DB* db, - bool concurrent_memtable_writes) { +Status WriteBatchInternal::InsertInto( + WriteThread::Writer* writer, SequenceNumber sequence, + ColumnFamilyMemTables* memtables, FlushScheduler* flush_scheduler, + bool ignore_missing_column_families, uint64_t log_number, DB* db, + bool concurrent_memtable_writes, bool seq_per_batch) { assert(writer->ShouldWriteToMemtable()); MemTableInserter inserter(sequence, memtables, flush_scheduler, ignore_missing_column_families, log_number, db, - concurrent_memtable_writes); + concurrent_memtable_writes, + nullptr /*has_valid_writes*/, seq_per_batch); SetSequence(writer->batch, sequence); inserter.set_log_number_ref(writer->log_ref); Status s = writer->batch->Iterate(&inserter); diff --git a/db/write_batch_internal.h b/db/write_batch_internal.h index abe70751f5..7c2f42d493 100644 --- a/db/write_batch_internal.h +++ b/db/write_batch_internal.h @@ -180,7 +180,8 @@ class WriteBatchInternal { FlushScheduler* flush_scheduler, bool ignore_missing_column_families = false, uint64_t log_number = 0, DB* db = nullptr, - bool concurrent_memtable_writes = false); + bool concurrent_memtable_writes = false, + bool seq_per_batch = false); static Status Append(WriteBatch* dst, const WriteBatch* src, const bool WAL_only = false); diff --git a/utilities/transactions/transaction_base.cc b/utilities/transactions/transaction_base.cc index adc7dd5ce0..a76446ba95 100644 --- a/utilities/transactions/transaction_base.cc +++ b/utilities/transactions/transaction_base.cc @@ -55,6 +55,7 @@ void TransactionBaseImpl::Reinitialize(DB* db, const WriteOptions& write_options) { Clear(); ClearSnapshot(); + id_ = 0; db_ = db; name_.clear(); log_number_ = 0; diff --git a/utilities/transactions/transaction_test.cc b/utilities/transactions/transaction_test.cc index bb03a22b5e..0c73c422b1 100644 --- a/utilities/transactions/transaction_test.cc +++ b/utilities/transactions/transaction_test.cc @@ -39,20 +39,23 @@ using std::string; namespace rocksdb { -// TODO(myabandeh): Instantiate the tests with other write policies -INSTANTIATE_TEST_CASE_P(DBAsBaseDB, TransactionTest, - ::testing::Values(std::make_tuple(false, false, - WRITE_COMMITTED))); -INSTANTIATE_TEST_CASE_P(StackableDBAsBaseDB, TransactionTest, - ::testing::Values(std::make_tuple(true, false, - WRITE_COMMITTED))); +// TODO(myabandeh): Instantiate the tests with concurrent_prepare +INSTANTIATE_TEST_CASE_P( + DBAsBaseDB, TransactionTest, + ::testing::Values(std::make_tuple(false, false, WRITE_COMMITTED), + std::make_tuple(false, false, WRITE_PREPARED))); +INSTANTIATE_TEST_CASE_P( + StackableDBAsBaseDB, TransactionTest, + ::testing::Values(std::make_tuple(true, false, WRITE_COMMITTED), + std::make_tuple(true, false, WRITE_PREPARED))); INSTANTIATE_TEST_CASE_P( MySQLStyleTransactionTest, MySQLStyleTransactionTest, ::testing::Values(std::make_tuple(false, false, WRITE_COMMITTED), std::make_tuple(false, true, WRITE_COMMITTED), std::make_tuple(true, false, WRITE_COMMITTED), - std::make_tuple(true, true, WRITE_COMMITTED))); - + std::make_tuple(true, true, WRITE_COMMITTED), + std::make_tuple(false, false, WRITE_PREPARED), + std::make_tuple(true, false, WRITE_PREPARED))); TEST_P(TransactionTest, DoubleEmptyWrite) { WriteOptions write_options; @@ -784,9 +787,20 @@ TEST_P(TransactionTest, SimpleTwoPhaseTransactionTest) { // heap should not care about prepared section anymore ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), 0); - // but now our memtable should be referencing the prep section - ASSERT_EQ(log_containing_prep, - db_impl->TEST_FindMinPrepLogReferencedByMemTable()); + switch (txn_db_options.write_policy) { + case WRITE_COMMITTED: + // but now our memtable should be referencing the prep section + ASSERT_EQ(log_containing_prep, + db_impl->TEST_FindMinPrepLogReferencedByMemTable()); + break; + case WRITE_PREPARED: + case WRITE_UNPREPARED: + // In these modes memtable do not ref the prep sections + ASSERT_EQ(0, db_impl->TEST_FindMinPrepLogReferencedByMemTable()); + break; + default: + assert(false); + } db_impl->TEST_FlushMemTable(true); @@ -1096,9 +1110,20 @@ TEST_P(TransactionTest, PersistentTwoPhaseTransactionTest) { // heap should not care about prepared section anymore ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), 0); - // but now our memtable should be referencing the prep section - ASSERT_EQ(log_containing_prep, - db_impl->TEST_FindMinPrepLogReferencedByMemTable()); + switch (txn_db_options.write_policy) { + case WRITE_COMMITTED: + // but now our memtable should be referencing the prep section + ASSERT_EQ(log_containing_prep, + db_impl->TEST_FindMinPrepLogReferencedByMemTable()); + break; + case WRITE_PREPARED: + case WRITE_UNPREPARED: + // In these modes memtable do not ref the prep sections + ASSERT_EQ(0, db_impl->TEST_FindMinPrepLogReferencedByMemTable()); + break; + default: + assert(false); + } db_impl->TEST_FlushMemTable(true); @@ -1443,9 +1468,20 @@ TEST_P(TransactionTest, TwoPhaseLogRollingTest) { ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), txn2->GetLogNumber()); - // we should see txn1s log refernced by the memtables - ASSERT_EQ(db_impl->TEST_FindMinPrepLogReferencedByMemTable(), - txn1->GetLogNumber()); + switch (txn_db_options.write_policy) { + case WRITE_COMMITTED: + // we should see txn1s log refernced by the memtables + ASSERT_EQ(txn1->GetLogNumber(), + db_impl->TEST_FindMinPrepLogReferencedByMemTable()); + break; + case WRITE_PREPARED: + case WRITE_UNPREPARED: + // In these modes memtable do not ref the prep sections + ASSERT_EQ(0, db_impl->TEST_FindMinPrepLogReferencedByMemTable()); + break; + default: + assert(false); + } // flush default cf to crate new log s = db->Put(wopts, "foo", "bar2"); @@ -1463,17 +1499,39 @@ TEST_P(TransactionTest, TwoPhaseLogRollingTest) { // heap should not show any logs ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), 0); - // should show the first txn log - ASSERT_EQ(db_impl->TEST_FindMinPrepLogReferencedByMemTable(), - txn1->GetLogNumber()); + switch (txn_db_options.write_policy) { + case WRITE_COMMITTED: + // should show the first txn log + ASSERT_EQ(txn1->GetLogNumber(), + db_impl->TEST_FindMinPrepLogReferencedByMemTable()); + break; + case WRITE_PREPARED: + case WRITE_UNPREPARED: + // In these modes memtable do not ref the prep sections + ASSERT_EQ(0, db_impl->TEST_FindMinPrepLogReferencedByMemTable()); + break; + default: + assert(false); + } // flush only cfa memtable s = db_impl->TEST_FlushMemTable(true, cfa); ASSERT_OK(s); - // should show the first txn log - ASSERT_EQ(db_impl->TEST_FindMinPrepLogReferencedByMemTable(), - txn2->GetLogNumber()); + switch (txn_db_options.write_policy) { + case WRITE_COMMITTED: + // should show the first txn log + ASSERT_EQ(txn2->GetLogNumber(), + db_impl->TEST_FindMinPrepLogReferencedByMemTable()); + break; + case WRITE_PREPARED: + case WRITE_UNPREPARED: + // In these modes memtable do not ref the prep sections + ASSERT_EQ(0, db_impl->TEST_FindMinPrepLogReferencedByMemTable()); + break; + default: + assert(false); + } // flush only cfb memtable s = db_impl->TEST_FlushMemTable(true, cfb); @@ -1545,8 +1603,20 @@ TEST_P(TransactionTest, TwoPhaseLogRollingTest2) { ASSERT_OK(s); ASSERT_GT(db_impl->TEST_LogfileNumber(), prepare_log_no); - ASSERT_GT(cfh_a->cfd()->GetLogNumber(), prepare_log_no); - ASSERT_EQ(cfh_a->cfd()->GetLogNumber(), db_impl->TEST_LogfileNumber()); + switch (txn_db_options.write_policy) { + case WRITE_COMMITTED: + // This cf is empty and should ref the latest log + ASSERT_GT(cfh_a->cfd()->GetLogNumber(), prepare_log_no); + ASSERT_EQ(cfh_a->cfd()->GetLogNumber(), db_impl->TEST_LogfileNumber()); + break; + case WRITE_PREPARED: + // This cf is not flushed yet and should ref the log that has its data + ASSERT_EQ(cfh_a->cfd()->GetLogNumber(), prepare_log_no); + break; + case WRITE_UNPREPARED: + default: + assert(false); + } ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), prepare_log_no); ASSERT_EQ(db_impl->TEST_FindMinPrepLogReferencedByMemTable(), 0); @@ -1555,7 +1625,19 @@ TEST_P(TransactionTest, TwoPhaseLogRollingTest2) { s = txn1->Commit(); ASSERT_OK(s); - ASSERT_EQ(db_impl->TEST_FindMinPrepLogReferencedByMemTable(), prepare_log_no); + switch (txn_db_options.write_policy) { + case WRITE_COMMITTED: + ASSERT_EQ(db_impl->TEST_FindMinPrepLogReferencedByMemTable(), + prepare_log_no); + break; + case WRITE_PREPARED: + case WRITE_UNPREPARED: + // In these modes memtable do not ref the prep sections + ASSERT_EQ(db_impl->TEST_FindMinPrepLogReferencedByMemTable(), 0); + break; + default: + assert(false); + } ASSERT_TRUE(!db_impl->TEST_UnableToFlushOldestLog()); @@ -1569,8 +1651,19 @@ TEST_P(TransactionTest, TwoPhaseLogRollingTest2) { // assert that cfa has a flush requested ASSERT_TRUE(cfh_a->cfd()->imm()->HasFlushRequested()); - // cfb should not be flushed becuse it has no data from LOG A - ASSERT_TRUE(!cfh_b->cfd()->imm()->HasFlushRequested()); + switch (txn_db_options.write_policy) { + case WRITE_COMMITTED: + // cfb should not be flushed becuse it has no data from LOG A + ASSERT_TRUE(!cfh_b->cfd()->imm()->HasFlushRequested()); + break; + case WRITE_PREPARED: + case WRITE_UNPREPARED: + // cfb should be flushed becuse it has prepared data from LOG A + ASSERT_TRUE(cfh_b->cfd()->imm()->HasFlushRequested()); + break; + default: + assert(false); + } // cfb now has data from LOG A s = txn2->Commit(); @@ -2690,8 +2783,12 @@ TEST_P(TransactionTest, UntrackedWrites) { // Untracked writes should succeed even though key was written after snapshot s = txn->PutUntracked("untracked", "1"); ASSERT_OK(s); - s = txn->MergeUntracked("untracked", "2"); - ASSERT_OK(s); + if (txn_db_options.write_policy != WRITE_PREPARED) { + // WRITE_PREPARED does not currently support dup merge keys. + // TODO(myabandeh): remove this if-then when the support is added + s = txn->MergeUntracked("untracked", "2"); + ASSERT_OK(s); + } s = txn->DeleteUntracked("untracked"); ASSERT_OK(s); @@ -4062,6 +4159,11 @@ TEST_P(TransactionTest, SingleDeleteTest) { } TEST_P(TransactionTest, MergeTest) { + if (txn_db_options.write_policy == WRITE_PREPARED) { + // WRITE_PREPARED does not currently support dup merge keys. + // TODO(myabandeh): remove this if-then when the support is added + return; + } WriteOptions write_options; ReadOptions read_options; string value; diff --git a/utilities/transactions/transaction_test.h b/utilities/transactions/transaction_test.h index 1a9daaddc1..d776003dd2 100644 --- a/utilities/transactions/transaction_test.h +++ b/utilities/transactions/transaction_test.h @@ -113,6 +113,9 @@ class TransactionTest : public ::testing::TestWithParam< std::vector handles; DB* root_db; Options options_copy(options); + if (txn_db_options.write_policy == WRITE_PREPARED) { + options_copy.seq_per_batch = true; + } Status s = DB::Open(options_copy, dbname, column_families, &handles, &root_db); if (s.ok()) { diff --git a/utilities/transactions/write_prepared_transaction_test.cc b/utilities/transactions/write_prepared_transaction_test.cc index 8cae885aa0..3b82427168 100644 --- a/utilities/transactions/write_prepared_transaction_test.cc +++ b/utilities/transactions/write_prepared_transaction_test.cc @@ -1513,7 +1513,7 @@ TEST_P(WritePreparedTransactionTest, CompactionShouldKeepSnapshotVisibleKeys) { // A more complex test to verify compaction/flush should keep keys visible // to snapshots. TEST_P(WritePreparedTransactionTest, - DISABLED_CompactionShouldKeepSnapshotVisibleKeysRandomized) { + CompactionShouldKeepSnapshotVisibleKeysRandomized) { constexpr size_t kNumTransactions = 10; constexpr size_t kNumIterations = 1000; diff --git a/utilities/transactions/write_prepared_txn.cc b/utilities/transactions/write_prepared_txn.cc index 3458ef5a81..ef83c972db 100644 --- a/utilities/transactions/write_prepared_txn.cc +++ b/utilities/transactions/write_prepared_txn.cc @@ -26,14 +26,13 @@ WritePreparedTxn::WritePreparedTxn(WritePreparedTxnDB* txn_db, const TransactionOptions& txn_options) : PessimisticTransaction(txn_db, write_options, txn_options), wpt_db_(txn_db) { - PessimisticTransaction::Initialize(txn_options); GetWriteBatch()->DisableDuplicateMergeKeys(); } Status WritePreparedTxn::Get(const ReadOptions& read_options, ColumnFamilyHandle* column_family, const Slice& key, PinnableSlice* pinnable_val) { - auto snapshot = GetSnapshot(); + auto snapshot = read_options.snapshot; auto snap_seq = snapshot != nullptr ? snapshot->GetSequenceNumber() : kMaxSequenceNumber; @@ -95,9 +94,7 @@ Status WritePreparedTxn::CommitInternal() { // We take the commit-time batch and append the Commit marker. // The Memtable will ignore the Commit marker in non-recovery mode WriteBatch* working_batch = GetCommitTimeWriteBatch(); - // TODO(myabandeh): prevent the users from writing to txn after the prepare - // phase - assert(working_batch->Count() == 0); + const bool empty = working_batch->Count() == 0; WriteBatchInternal::MarkCommit(working_batch, name_); // any operations appended to this working_batch will be ignored from WAL @@ -109,14 +106,21 @@ Status WritePreparedTxn::CommitInternal() { // a connection between the memtable and its WAL, so there is no need to // redundantly reference the log that contains the prepared data. const uint64_t zero_log_number = 0ull; - auto s = db_impl_->WriteImpl(write_options_, working_batch, nullptr, nullptr, - zero_log_number, disable_memtable, &seq_used); + auto s = db_impl_->WriteImpl( + write_options_, working_batch, nullptr, nullptr, zero_log_number, + empty ? disable_memtable : !disable_memtable, &seq_used); assert(seq_used != kMaxSequenceNumber); uint64_t& commit_seq = seq_used; // TODO(myabandeh): Reject a commit request if AddCommitted cannot encode // commit_seq. This happens if prep_seq <<< commit_seq. auto prepare_seq = GetId(); wpt_db_->AddCommitted(prepare_seq, commit_seq); + if (!empty) { + // Commit the data that is accompnaied with the commit marker + // TODO(myabandeh): skip AddPrepared + wpt_db_->AddPrepared(commit_seq); + wpt_db_->AddCommitted(commit_seq, commit_seq); + } return s; }