diff --git a/db/db_impl/db_impl_open.cc b/db/db_impl/db_impl_open.cc index 7e5f08897a..8f073711ee 100644 --- a/db/db_impl/db_impl_open.cc +++ b/db/db_impl/db_impl_open.cc @@ -1275,7 +1275,8 @@ Status DBImpl::RecoverLogFiles(const std::vector& wal_numbers, reader.GetRecordedTimestampSize(); status = HandleWriteBatchTimestampSizeDifference( &batch, running_ts_sz, record_ts_sz, - TimestampSizeConsistencyMode::kReconcileInconsistency, &new_batch); + TimestampSizeConsistencyMode::kReconcileInconsistency, seq_per_batch_, + batch_per_txn_, &new_batch); if (!status.ok()) { return status; } diff --git a/db/db_impl/db_impl_secondary.cc b/db/db_impl/db_impl_secondary.cc index d333b70d4c..eb285e53b7 100644 --- a/db/db_impl/db_impl_secondary.cc +++ b/db/db_impl/db_impl_secondary.cc @@ -233,7 +233,8 @@ Status DBImplSecondary::RecoverLogFiles( reader->GetRecordedTimestampSize(); status = HandleWriteBatchTimestampSizeDifference( &batch, running_ts_sz, record_ts_sz, - TimestampSizeConsistencyMode::kVerifyConsistency); + TimestampSizeConsistencyMode::kVerifyConsistency, seq_per_batch_, + batch_per_txn_); if (!status.ok()) { break; } diff --git a/db/repair.cc b/db/repair.cc index 7b063e64ee..8c0f36d991 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -413,9 +413,12 @@ class Repairer { if (record_status.ok()) { const UnorderedMap& record_ts_sz = reader.GetRecordedTimestampSize(); + // Use same value for `seq_per_batch` and `batch_per_txn` as + // WriteBatchInternal::InsertInto does below. record_status = HandleWriteBatchTimestampSizeDifference( &batch, running_ts_sz, record_ts_sz, - TimestampSizeConsistencyMode::kVerifyConsistency); + TimestampSizeConsistencyMode::kVerifyConsistency, + /* seq_per_batch */ false, /* batch_per_txn */ true); if (record_status.ok()) { record_status = WriteBatchInternal::InsertInto(&batch, cf_mems, nullptr, nullptr); diff --git a/db/write_batch.cc b/db/write_batch.cc index 7cb8f6d11d..7a5ad78192 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -1160,6 +1160,34 @@ Status WriteBatchInternal::InsertNoop(WriteBatch* b) { return Status::OK(); } +ValueType WriteBatchInternal::GetBeginPrepareType(bool write_after_commit, + bool unprepared_batch) { + return write_after_commit + ? kTypeBeginPrepareXID + : (unprepared_batch ? kTypeBeginUnprepareXID + : kTypeBeginPersistedPrepareXID); +} + +Status WriteBatchInternal::InsertBeginPrepare(WriteBatch* b, + bool write_after_commit, + bool unprepared_batch) { + b->rep_.push_back(static_cast( + GetBeginPrepareType(write_after_commit, unprepared_batch))); + b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) | + ContentFlags::HAS_BEGIN_PREPARE, + std::memory_order_relaxed); + return Status::OK(); +} + +Status WriteBatchInternal::InsertEndPrepare(WriteBatch* b, const Slice& xid) { + b->rep_.push_back(static_cast(kTypeEndPrepareXID)); + PutLengthPrefixedSlice(&b->rep_, xid); + b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) | + ContentFlags::HAS_END_PREPARE, + std::memory_order_relaxed); + return Status::OK(); +} + Status WriteBatchInternal::MarkEndPrepare(WriteBatch* b, const Slice& xid, bool write_after_commit, bool unprepared_batch) { @@ -1175,13 +1203,8 @@ Status WriteBatchInternal::MarkEndPrepare(WriteBatch* b, const Slice& xid, // rewrite noop as begin marker b->rep_[12] = static_cast( - write_after_commit ? kTypeBeginPrepareXID - : (unprepared_batch ? kTypeBeginUnprepareXID - : kTypeBeginPersistedPrepareXID)); - b->rep_.push_back(static_cast(kTypeEndPrepareXID)); - PutLengthPrefixedSlice(&b->rep_, xid); + GetBeginPrepareType(write_after_commit, unprepared_batch)); b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) | - ContentFlags::HAS_END_PREPARE | ContentFlags::HAS_BEGIN_PREPARE, std::memory_order_relaxed); if (unprepared_batch) { @@ -1189,7 +1212,7 @@ Status WriteBatchInternal::MarkEndPrepare(WriteBatch* b, const Slice& xid, ContentFlags::HAS_BEGIN_UNPREPARE, std::memory_order_relaxed); } - return Status::OK(); + return WriteBatchInternal::InsertEndPrepare(b, xid); } Status WriteBatchInternal::MarkCommit(WriteBatch* b, const Slice& xid) { diff --git a/db/write_batch_internal.h b/db/write_batch_internal.h index e3388d2fc5..3cf3f4689a 100644 --- a/db/write_batch_internal.h +++ b/db/write_batch_internal.h @@ -122,6 +122,15 @@ class WriteBatchInternal { static Status PutBlobIndex(WriteBatch* batch, uint32_t column_family_id, const Slice& key, const Slice& value); + static ValueType GetBeginPrepareType(bool write_after_commit, + bool unprepared_batch); + + static Status InsertBeginPrepare(WriteBatch* batch, + const bool write_after_commit = true, + bool unprepared_batch = false); + + static Status InsertEndPrepare(WriteBatch* batch, const Slice& xid); + static Status MarkEndPrepare(WriteBatch* batch, const Slice& xid, const bool write_after_commit = true, const bool unprepared_batch = false); diff --git a/tools/ldb_cmd.cc b/tools/ldb_cmd.cc index 3ce811972e..ecf886ca72 100644 --- a/tools/ldb_cmd.cc +++ b/tools/ldb_cmd.cc @@ -3053,6 +3053,7 @@ void DumpWalFile(Options options, const std::string& wal_file, status = HandleWriteBatchTimestampSizeDifference( &batch, running_ts_sz, recorded_ts_sz, TimestampSizeConsistencyMode::kVerifyConsistency, + /* seq_per_batch */ false, /* batch_per_txn */ true, /*new_batch=*/nullptr); if (!status.ok()) { std::stringstream oss; diff --git a/unreleased_history/bug_fixes/write_committed_toggle_udt.md b/unreleased_history/bug_fixes/write_committed_toggle_udt.md new file mode 100644 index 0000000000..920d8a8a65 --- /dev/null +++ b/unreleased_history/bug_fixes/write_committed_toggle_udt.md @@ -0,0 +1 @@ +*Fix a bug for replaying WALs for WriteCommitted transaction DB when its user-defined timestamps setting is toggled on/off between DB sessions. \ No newline at end of file diff --git a/util/udt_util.cc b/util/udt_util.cc index c349ddf586..7a0eeb2e3d 100644 --- a/util/udt_util.cc +++ b/util/udt_util.cc @@ -139,9 +139,17 @@ ToggleUDT CompareComparator(const Comparator* new_comparator, TimestampRecoveryHandler::TimestampRecoveryHandler( const UnorderedMap& running_ts_sz, - const UnorderedMap& record_ts_sz) + const UnorderedMap& record_ts_sz, bool seq_per_batch, + bool batch_per_txn) : running_ts_sz_(running_ts_sz), record_ts_sz_(record_ts_sz), + // Write after commit currently uses one seq per key (instead of per + // batch). So seq_per_batch being false indicates write_after_commit + // approach. + write_after_commit_(!seq_per_batch), + // WriteUnprepared can write multiple WriteBatches per transaction, so + // batch_per_txn being false indicates write_before_prepare. + write_before_prepare_(!batch_per_txn), new_batch_(new WriteBatch()), handler_valid_(true), new_batch_diff_from_orig_batch_(false) {} @@ -258,6 +266,43 @@ Status TimestampRecoveryHandler::PutBlobIndexCF(uint32_t cf, const Slice& key, return WriteBatchInternal::PutBlobIndex(new_batch_.get(), cf, new_key, value); } +Status TimestampRecoveryHandler::MarkBeginPrepare(bool unprepare) { + // Transaction policy change requires empty WAL and User-defined timestamp is + // only supported for write committed txns. + // WriteBatch::Iterate has will handle this based on + // handler->WriteAfterCommit() and handler->WriteBeforePrepare(). + if (unprepare) { + return Status::InvalidArgument( + "Handle user defined timestamp setting change is not supported for" + "write unprepared policy. The WAL must be emptied."); + } + return WriteBatchInternal::InsertBeginPrepare(new_batch_.get(), + write_after_commit_, + /* unprepared_batch */ false); +} + +Status TimestampRecoveryHandler::MarkEndPrepare(const Slice& name) { + return WriteBatchInternal::InsertEndPrepare(new_batch_.get(), name); +} + +Status TimestampRecoveryHandler::MarkCommit(const Slice& name) { + return WriteBatchInternal::MarkCommit(new_batch_.get(), name); +} + +Status TimestampRecoveryHandler::MarkCommitWithTimestamp( + const Slice& name, const Slice& commit_ts) { + return WriteBatchInternal::MarkCommitWithTimestamp(new_batch_.get(), name, + commit_ts); +} + +Status TimestampRecoveryHandler::MarkRollback(const Slice& name) { + return WriteBatchInternal::MarkRollback(new_batch_.get(), name); +} + +Status TimestampRecoveryHandler::MarkNoop(bool /*empty_batch*/) { + return WriteBatchInternal::InsertNoop(new_batch_.get()); +} + Status TimestampRecoveryHandler::ReconcileTimestampDiscrepancy( uint32_t cf, const Slice& key, std::string* new_key_buf, Slice* new_key) { assert(handler_valid_); @@ -304,8 +349,8 @@ Status HandleWriteBatchTimestampSizeDifference( const WriteBatch* batch, const UnorderedMap& running_ts_sz, const UnorderedMap& record_ts_sz, - TimestampSizeConsistencyMode check_mode, - std::unique_ptr* new_batch) { + TimestampSizeConsistencyMode check_mode, bool seq_per_batch, + bool batch_per_txn, std::unique_ptr* new_batch) { // Quick path to bypass checking the WriteBatch. if (AllRunningColumnFamiliesConsistent(running_ts_sz, record_ts_sz)) { return Status::OK(); @@ -318,7 +363,8 @@ Status HandleWriteBatchTimestampSizeDifference( } else if (need_recovery) { assert(new_batch); SequenceNumber sequence = WriteBatchInternal::Sequence(batch); - TimestampRecoveryHandler recovery_handler(running_ts_sz, record_ts_sz); + TimestampRecoveryHandler recovery_handler(running_ts_sz, record_ts_sz, + seq_per_batch, batch_per_txn); status = batch->Iterate(&recovery_handler); if (!status.ok()) { return status; diff --git a/util/udt_util.h b/util/udt_util.h index a926e8e812..51ea76e854 100644 --- a/util/udt_util.h +++ b/util/udt_util.h @@ -105,7 +105,8 @@ class UserDefinedTimestampSizeRecord { class TimestampRecoveryHandler : public WriteBatch::Handler { public: TimestampRecoveryHandler(const UnorderedMap& running_ts_sz, - const UnorderedMap& record_ts_sz); + const UnorderedMap& record_ts_sz, + bool seq_per_batch, bool batch_per_txn); ~TimestampRecoveryHandler() override {} @@ -135,19 +136,18 @@ class TimestampRecoveryHandler : public WriteBatch::Handler { Status PutBlobIndexCF(uint32_t cf, const Slice& key, const Slice& value) override; - Status MarkBeginPrepare(bool) override { return Status::OK(); } + Status MarkBeginPrepare(bool unprepare) override; - Status MarkEndPrepare(const Slice&) override { return Status::OK(); } + Status MarkEndPrepare(const Slice& name) override; - Status MarkCommit(const Slice&) override { return Status::OK(); } + Status MarkCommit(const Slice& name) override; - Status MarkCommitWithTimestamp(const Slice&, const Slice&) override { - return Status::OK(); - } + Status MarkCommitWithTimestamp(const Slice& name, + const Slice& commit_ts) override; - Status MarkRollback(const Slice&) override { return Status::OK(); } + Status MarkRollback(const Slice& name) override; - Status MarkNoop(bool /*empty_batch*/) override { return Status::OK(); } + Status MarkNoop(bool empty_batch) override; std::unique_ptr&& TransferNewBatch() { assert(new_batch_diff_from_orig_batch_); @@ -155,6 +155,16 @@ class TimestampRecoveryHandler : public WriteBatch::Handler { return std::move(new_batch_); } + protected: + Handler::OptionState WriteBeforePrepare() const override { + return write_before_prepare_ ? Handler::OptionState::kEnabled + : Handler::OptionState::kDisabled; + } + Handler::OptionState WriteAfterCommit() const override { + return write_after_commit_ ? Handler::OptionState::kEnabled + : Handler::OptionState::kDisabled; + } + private: Status ReconcileTimestampDiscrepancy(uint32_t cf, const Slice& key, std::string* new_key_buf, @@ -168,6 +178,9 @@ class TimestampRecoveryHandler : public WriteBatch::Handler { // in the WAL. This only contains non-zero user-defined timestamp size. const UnorderedMap& record_ts_sz_; + bool write_after_commit_; + bool write_before_prepare_; + std::unique_ptr new_batch_; // Handler is valid upon creation and becomes invalid after its `new_batch_` // is transferred. @@ -220,8 +233,8 @@ Status HandleWriteBatchTimestampSizeDifference( const WriteBatch* batch, const UnorderedMap& running_ts_sz, const UnorderedMap& record_ts_sz, - TimestampSizeConsistencyMode check_mode, - std::unique_ptr* new_batch = nullptr); + TimestampSizeConsistencyMode check_mode, bool seq_per_batch, + bool batch_per_txn, std::unique_ptr* new_batch = nullptr); // This util function is used when opening an existing column family and // processing its VersionEdit. It does a sanity check for the column family's diff --git a/util/udt_util_test.cc b/util/udt_util_test.cc index ecd5e1773f..b3248ddbf3 100644 --- a/util/udt_util_test.cc +++ b/util/udt_util_test.cc @@ -226,11 +226,13 @@ TEST_F(HandleTimestampSizeDifferenceTest, AllColumnFamiliesConsistent) { // All `check_mode` pass with OK status and `batch` not checked or updated. ASSERT_OK(HandleWriteBatchTimestampSizeDifference( &batch, running_ts_sz, record_ts_sz, - TimestampSizeConsistencyMode::kVerifyConsistency)); + TimestampSizeConsistencyMode::kVerifyConsistency, + /* seq_per_batch */ false, /* batch_per_txn */ true)); std::unique_ptr new_batch(nullptr); ASSERT_OK(HandleWriteBatchTimestampSizeDifference( &batch, running_ts_sz, record_ts_sz, - TimestampSizeConsistencyMode::kReconcileInconsistency, &new_batch)); + TimestampSizeConsistencyMode::kReconcileInconsistency, + /* seq_per_batch */ false, /* batch_per_txn */ true, &new_batch)); ASSERT_TRUE(new_batch.get() == nullptr); } @@ -245,11 +247,13 @@ TEST_F(HandleTimestampSizeDifferenceTest, // All `check_mode` pass with OK status and `batch` not checked or updated. ASSERT_OK(HandleWriteBatchTimestampSizeDifference( &batch, running_ts_sz, record_ts_sz, - TimestampSizeConsistencyMode::kVerifyConsistency)); + TimestampSizeConsistencyMode::kVerifyConsistency, + /* seq_per_batch */ false, /* batch_per_txn */ true)); std::unique_ptr new_batch(nullptr); ASSERT_OK(HandleWriteBatchTimestampSizeDifference( &batch, running_ts_sz, record_ts_sz, - TimestampSizeConsistencyMode::kReconcileInconsistency, &new_batch)); + TimestampSizeConsistencyMode::kReconcileInconsistency, + /* seq_per_batch */ false, /* batch_per_txn */ true, &new_batch)); ASSERT_TRUE(new_batch.get() == nullptr); } @@ -263,11 +267,13 @@ TEST_F(HandleTimestampSizeDifferenceTest, InvolvedColumnFamiliesConsistent) { // All `check_mode` pass with OK status and `batch` not updated. ASSERT_OK(HandleWriteBatchTimestampSizeDifference( &batch, running_ts_sz, record_ts_sz, - TimestampSizeConsistencyMode::kVerifyConsistency)); + TimestampSizeConsistencyMode::kVerifyConsistency, + /* seq_per_batch */ false, /* batch_per_txn */ true)); std::unique_ptr new_batch(nullptr); ASSERT_OK(HandleWriteBatchTimestampSizeDifference( &batch, running_ts_sz, record_ts_sz, - TimestampSizeConsistencyMode::kReconcileInconsistency, &new_batch)); + TimestampSizeConsistencyMode::kReconcileInconsistency, + /* seq_per_batch */ false, /* batch_per_txn */ true, &new_batch)); ASSERT_TRUE(new_batch.get() == nullptr); } @@ -282,13 +288,15 @@ TEST_F(HandleTimestampSizeDifferenceTest, // families. ASSERT_TRUE(HandleWriteBatchTimestampSizeDifference( &batch, running_ts_sz, record_ts_sz, - TimestampSizeConsistencyMode::kVerifyConsistency) + TimestampSizeConsistencyMode::kVerifyConsistency, + /* seq_per_batch */ false, /* batch_per_txn */ true) .IsInvalidArgument()); std::unique_ptr new_batch(nullptr); ASSERT_OK(HandleWriteBatchTimestampSizeDifference( &batch, running_ts_sz, record_ts_sz, - TimestampSizeConsistencyMode::kReconcileInconsistency, &new_batch)); + TimestampSizeConsistencyMode::kReconcileInconsistency, + /* seq_per_batch */ false, /* batch_per_txn */ true, &new_batch)); ASSERT_TRUE(new_batch.get() != nullptr); CheckContentsWithTimestampStripping(batch, *new_batch, sizeof(uint64_t), std::nullopt /* dropped_cf */); @@ -307,13 +315,15 @@ TEST_F(HandleTimestampSizeDifferenceTest, // families. ASSERT_TRUE(HandleWriteBatchTimestampSizeDifference( &batch, running_ts_sz, record_ts_sz, - TimestampSizeConsistencyMode::kVerifyConsistency) + TimestampSizeConsistencyMode::kVerifyConsistency, + /* seq_per_batch */ false, /* batch_per_txn */ true) .IsInvalidArgument()); std::unique_ptr new_batch(nullptr); ASSERT_OK(HandleWriteBatchTimestampSizeDifference( &batch, running_ts_sz, record_ts_sz, - TimestampSizeConsistencyMode::kReconcileInconsistency, &new_batch)); + TimestampSizeConsistencyMode::kReconcileInconsistency, + /* seq_per_batch */ false, /* batch_per_txn */ true, &new_batch)); ASSERT_TRUE(new_batch.get() != nullptr); CheckContentsWithTimestampPadding(batch, *new_batch, sizeof(uint64_t)); } @@ -331,7 +341,8 @@ TEST_F(HandleTimestampSizeDifferenceTest, // and all related entries copied over to the new WriteBatch. ASSERT_OK(HandleWriteBatchTimestampSizeDifference( &batch, running_ts_sz, record_ts_sz, - TimestampSizeConsistencyMode::kReconcileInconsistency, &new_batch)); + TimestampSizeConsistencyMode::kReconcileInconsistency, + /* seq_per_batch */ false, /* batch_per_txn */ true, &new_batch)); ASSERT_TRUE(new_batch.get() != nullptr); CheckContentsWithTimestampStripping(batch, *new_batch, sizeof(uint64_t), @@ -346,12 +357,14 @@ TEST_F(HandleTimestampSizeDifferenceTest, UnrecoverableInconsistency) { ASSERT_TRUE(HandleWriteBatchTimestampSizeDifference( &batch, running_ts_sz, record_ts_sz, - TimestampSizeConsistencyMode::kVerifyConsistency) + TimestampSizeConsistencyMode::kVerifyConsistency, + /* seq_per_batch */ false, /* batch_per_txn */ true) .IsInvalidArgument()); ASSERT_TRUE(HandleWriteBatchTimestampSizeDifference( &batch, running_ts_sz, record_ts_sz, - TimestampSizeConsistencyMode::kReconcileInconsistency) + TimestampSizeConsistencyMode::kReconcileInconsistency, + /* seq_per_batch */ false, /* batch_per_txn */ true) .IsInvalidArgument()); } diff --git a/utilities/transactions/write_committed_transaction_ts_test.cc b/utilities/transactions/write_committed_transaction_ts_test.cc index 47b1a0df4d..d73371f80f 100644 --- a/utilities/transactions/write_committed_transaction_ts_test.cc +++ b/utilities/transactions/write_committed_transaction_ts_test.cc @@ -418,6 +418,21 @@ TEST_P(WriteCommittedTxnWithTsTest, RecoverFromWal) { ASSERT_OK(txn3->Prepare()); txn3.reset(); + std::unique_ptr txn4( + NewTxn(WriteOptions(), TransactionOptions())); + assert(txn4); + ASSERT_OK(txn4->SetName("no_op_txn")); + txn4.reset(); + + std::unique_ptr rolled_back_txn( + NewTxn(write_opts, TransactionOptions())); + ASSERT_NE(nullptr, rolled_back_txn); + ASSERT_OK(rolled_back_txn->Put("non_exist0", "donotcare")); + ASSERT_OK(rolled_back_txn->Put(handles_[1], "non_exist1", "donotcare")); + ASSERT_OK(rolled_back_txn->SetName("rolled_back_txn")); + ASSERT_OK(rolled_back_txn->Rollback()); + rolled_back_txn.reset(); + ASSERT_OK(ReOpenNoDelete(cf_descs, &handles_)); { @@ -452,9 +467,318 @@ TEST_P(WriteCommittedTxnWithTsTest, RecoverFromWal) { s = GetFromDb(ReadOptions(), handles_[1], "baz", /*ts=*/24, &value); ASSERT_TRUE(s.IsNotFound()); + + Transaction* no_op_txn = db->GetTransactionByName("no_op_txn"); + ASSERT_EQ(nullptr, no_op_txn); + + s = db->Get(ReadOptions(), handles_[0], "non_exist0", &value); + ASSERT_TRUE(s.IsNotFound()); + + s = GetFromDb(ReadOptions(), handles_[1], "non_exist1", /*ts=*/24, &value); + ASSERT_TRUE(s.IsNotFound()); } } +TEST_P(WriteCommittedTxnWithTsTest, EnabledUDTDisabledRecoverFromWal) { + // This feature is not compatible with UDT in memtable only. + options.allow_concurrent_memtable_write = false; + ASSERT_OK(ReOpenNoDelete()); + + ColumnFamilyOptions cf_opts; + cf_opts.comparator = test::BytewiseComparatorWithU64TsWrapper(); + cf_opts.persist_user_defined_timestamps = false; + const std::string test_cf_name = "test_cf"; + ColumnFamilyHandle* cfh = nullptr; + assert(db); + ASSERT_OK(db->CreateColumnFamily(cf_opts, test_cf_name, &cfh)); + delete cfh; + cfh = nullptr; + + std::vector cf_descs; + cf_descs.emplace_back(kDefaultColumnFamilyName, options); + cf_descs.emplace_back(test_cf_name, cf_opts); + options.avoid_flush_during_shutdown = true; + ASSERT_OK(ReOpenNoDelete(cf_descs, &handles_)); + + std::unique_ptr no_op_txn( + NewTxn(WriteOptions(), TransactionOptions())); + ASSERT_NE(nullptr, no_op_txn); + ASSERT_OK(no_op_txn->SetName("no_op_txn")); + no_op_txn.reset(); + + std::unique_ptr prepared_but_uncommitted_txn( + NewTxn(WriteOptions(), TransactionOptions())); + ASSERT_NE(nullptr, prepared_but_uncommitted_txn); + ASSERT_OK(prepared_but_uncommitted_txn->Put("foo0", "foo_value_0")); + ASSERT_OK( + prepared_but_uncommitted_txn->Put(handles_[1], "foo1", "foo_value_1")); + ASSERT_OK( + prepared_but_uncommitted_txn->SetName("prepared_but_uncommitted_txn")); + ASSERT_OK(prepared_but_uncommitted_txn->Prepare()); + + prepared_but_uncommitted_txn.reset(); + + WriteOptions write_opts; + write_opts.sync = true; + std::unique_ptr committed_txn( + NewTxn(write_opts, TransactionOptions())); + ASSERT_NE(nullptr, committed_txn); + ASSERT_OK(committed_txn->Put("bar0", "bar_value_0")); + ASSERT_OK(committed_txn->Put(handles_[1], "bar1", "bar_value_1")); + ASSERT_OK(committed_txn->SetName("committed_txn")); + ASSERT_OK(committed_txn->Prepare()); + ASSERT_OK(committed_txn->SetCommitTimestamp(/*ts=*/23)); + ASSERT_OK(committed_txn->Commit()); + committed_txn.reset(); + + std::unique_ptr committed_without_prepare_txn( + NewTxn(write_opts, TransactionOptions())); + ASSERT_NE(nullptr, committed_without_prepare_txn); + ASSERT_OK(committed_without_prepare_txn->Put("baz0", "baz_value_0")); + ASSERT_OK( + committed_without_prepare_txn->Put(handles_[1], "baz1", "baz_value_1")); + ASSERT_OK( + committed_without_prepare_txn->SetName("committed_without_prepare_txn")); + ASSERT_OK(committed_without_prepare_txn->SetCommitTimestamp(/*ts=*/24)); + ASSERT_OK(committed_without_prepare_txn->Commit()); + committed_without_prepare_txn.reset(); + + std::unique_ptr rolled_back_txn( + NewTxn(write_opts, TransactionOptions())); + assert(rolled_back_txn); + ASSERT_OK(rolled_back_txn->Put("non_exist0", "donotcare")); + ASSERT_OK(rolled_back_txn->Put(handles_[1], "non_exist1", "donotcare")); + ASSERT_OK(rolled_back_txn->SetName("rolled_back_txn")); + ASSERT_OK(rolled_back_txn->Rollback()); + rolled_back_txn.reset(); + + // Reopen and disable UDT to replay WAL entries. + cf_descs[1].options.comparator = BytewiseComparator(); + ASSERT_OK(ReOpenNoDelete(cf_descs, &handles_)); + + { + Transaction* recovered_txn0 = db->GetTransactionByName("no_op_txn"); + ASSERT_EQ(nullptr, recovered_txn0); + + Transaction* recovered_txn1 = + db->GetTransactionByName("prepared_but_uncommitted_txn"); + ASSERT_NE(nullptr, recovered_txn1); + std::string value; + ASSERT_OK(recovered_txn1->Commit()); + Status s = db->Get(ReadOptions(), handles_[0], "foo0", &value); + ASSERT_OK(s); + ASSERT_EQ("foo_value_0", value); + s = db->Get(ReadOptions(), handles_[1], "foo1", &value); + ASSERT_OK(s); + ASSERT_EQ("foo_value_1", value); + delete recovered_txn1; + + ASSERT_EQ(nullptr, db->GetTransactionByName("committed_txn")); + s = db->Get(ReadOptions(), handles_[0], "bar0", &value); + ASSERT_OK(s); + ASSERT_EQ("bar_value_0", value); + s = db->Get(ReadOptions(), handles_[1], "bar1", &value); + ASSERT_OK(s); + ASSERT_EQ("bar_value_1", value); + + ASSERT_EQ(nullptr, + db->GetTransactionByName("committed_without_prepare_txn")); + s = db->Get(ReadOptions(), handles_[0], "baz0", &value); + ASSERT_OK(s); + ASSERT_EQ("baz_value_0", value); + s = db->Get(ReadOptions(), handles_[1], "baz1", &value); + ASSERT_OK(s); + ASSERT_EQ("baz_value_1", value); + + ASSERT_EQ(nullptr, db->GetTransactionByName("rolled_back_txn")); + s = db->Get(ReadOptions(), handles_[0], "non_exist0", &value); + ASSERT_TRUE(s.IsNotFound()); + s = db->Get(ReadOptions(), handles_[1], "non_exist1", &value); + ASSERT_TRUE(s.IsNotFound()); + } +} + +TEST_P(WriteCommittedTxnWithTsTest, UDTNewlyEnabledRecoverFromWal) { + ASSERT_OK(ReOpenNoDelete()); + + ColumnFamilyOptions cf_opts; + const std::string test_cf_name = "test_cf"; + ColumnFamilyHandle* cfh = nullptr; + assert(db); + ASSERT_OK(db->CreateColumnFamily(cf_opts, test_cf_name, &cfh)); + delete cfh; + cfh = nullptr; + + std::vector cf_descs; + cf_descs.emplace_back(kDefaultColumnFamilyName, options); + cf_descs.emplace_back(test_cf_name, cf_opts); + options.avoid_flush_during_shutdown = true; + ASSERT_OK(ReOpenNoDelete(cf_descs, &handles_)); + + std::unique_ptr no_op_txn( + NewTxn(WriteOptions(), TransactionOptions())); + ASSERT_NE(nullptr, no_op_txn); + ASSERT_OK(no_op_txn->SetName("no_op_txn")); + no_op_txn.reset(); + + std::unique_ptr prepared_but_uncommitted_txn( + NewTxn(WriteOptions(), TransactionOptions())); + ASSERT_NE(nullptr, prepared_but_uncommitted_txn); + ASSERT_OK( + prepared_but_uncommitted_txn->Put(handles_[0], "foo0", "foo_value_0")); + ASSERT_OK( + prepared_but_uncommitted_txn->Put(handles_[1], "foo1", "foo_value_1")); + ASSERT_OK( + prepared_but_uncommitted_txn->SetName("prepared_but_uncommitted_txn")); + ASSERT_OK(prepared_but_uncommitted_txn->Prepare()); + + prepared_but_uncommitted_txn.reset(); + + WriteOptions write_opts; + write_opts.sync = true; + std::unique_ptr committed_txn( + NewTxn(write_opts, TransactionOptions())); + ASSERT_NE(nullptr, committed_txn); + ASSERT_OK(committed_txn->Put("bar0", "bar_value_0")); + ASSERT_OK(committed_txn->Put(handles_[1], "bar1", "bar_value_1")); + ASSERT_OK(committed_txn->SetName("committed_txn")); + ASSERT_OK(committed_txn->Prepare()); + ASSERT_OK(committed_txn->Commit()); + committed_txn.reset(); + + std::unique_ptr committed_without_prepare_txn( + NewTxn(write_opts, TransactionOptions())); + assert(committed_without_prepare_txn); + ASSERT_OK(committed_without_prepare_txn->Put("baz0", "baz_value_0")); + ASSERT_OK( + committed_without_prepare_txn->Put(handles_[1], "baz1", "baz_value_1")); + ASSERT_OK( + committed_without_prepare_txn->SetName("committed_without_prepare_txn")); + ASSERT_OK(committed_without_prepare_txn->Commit()); + committed_without_prepare_txn.reset(); + + std::unique_ptr rolled_back_txn( + NewTxn(write_opts, TransactionOptions())); + ASSERT_NE(nullptr, rolled_back_txn); + ASSERT_OK(rolled_back_txn->Put("non_exist0", "donotcare")); + ASSERT_OK(rolled_back_txn->Put(handles_[1], "non_exist1", "donotcare")); + ASSERT_OK(rolled_back_txn->SetName("rolled_back_txn")); + ASSERT_OK(rolled_back_txn->Rollback()); + rolled_back_txn.reset(); + + // Reopen and enable UDT to replay WAL entries. + options.allow_concurrent_memtable_write = false; + cf_descs[1].options.comparator = test::BytewiseComparatorWithU64TsWrapper(); + cf_descs[1].options.persist_user_defined_timestamps = false; + ASSERT_OK(ReOpenNoDelete(cf_descs, &handles_)); + + { + Transaction* recovered_txn1 = + db->GetTransactionByName("prepared_but_uncommitted_txn"); + ASSERT_NE(nullptr, recovered_txn1); + std::string value; + ASSERT_OK(recovered_txn1->SetCommitTimestamp(23)); + ASSERT_OK(recovered_txn1->Commit()); + Status s = db->Get(ReadOptions(), handles_[0], "foo0", &value); + ASSERT_OK(s); + ASSERT_EQ("foo_value_0", value); + s = GetFromDb(ReadOptions(), handles_[1], "foo1", /*ts=*/23, &value); + ASSERT_OK(s); + ASSERT_EQ("foo_value_1", value); + delete recovered_txn1; + + ASSERT_EQ(nullptr, db->GetTransactionByName("committed_txn")); + s = db->Get(ReadOptions(), handles_[0], "bar0", &value); + ASSERT_OK(s); + ASSERT_EQ("bar_value_0", value); + s = GetFromDb(ReadOptions(), handles_[1], "bar1", /*ts=*/23, &value); + ASSERT_OK(s); + ASSERT_EQ("bar_value_1", value); + + ASSERT_EQ(nullptr, + db->GetTransactionByName("committed_without_prepare_txn")); + s = db->Get(ReadOptions(), handles_[0], "baz0", &value); + ASSERT_OK(s); + ASSERT_EQ("baz_value_0", value); + s = GetFromDb(ReadOptions(), handles_[1], "baz1", /*ts=*/23, &value); + ASSERT_OK(s); + ASSERT_EQ("baz_value_1", value); + + ASSERT_EQ(nullptr, db->GetTransactionByName("rolled_back_txn")); + s = db->Get(ReadOptions(), handles_[0], "non_exist0", &value); + ASSERT_TRUE(s.IsNotFound()); + s = GetFromDb(ReadOptions(), handles_[1], "non_exist1", /*ts=*/23, &value); + ASSERT_TRUE(s.IsNotFound()); + } +} + +TEST_P(WriteCommittedTxnWithTsTest, ChangeFromWriteCommittedAndDisableUDT) { + // This feature is not compatible with UDT in memtable only. + options.allow_concurrent_memtable_write = false; + ASSERT_OK(ReOpenNoDelete()); + + ColumnFamilyOptions cf_opts; + cf_opts.comparator = test::BytewiseComparatorWithU64TsWrapper(); + cf_opts.persist_user_defined_timestamps = false; + const std::string test_cf_name = "test_cf"; + ColumnFamilyHandle* cfh = nullptr; + assert(db); + ASSERT_OK(db->CreateColumnFamily(cf_opts, test_cf_name, &cfh)); + delete cfh; + cfh = nullptr; + + std::vector cf_descs; + cf_descs.emplace_back(kDefaultColumnFamilyName, options); + cf_descs.emplace_back(test_cf_name, cf_opts); + options.avoid_flush_during_shutdown = true; + ASSERT_OK(ReOpenNoDelete(cf_descs, &handles_)); + + std::unique_ptr prepared_but_uncommitted_txn( + NewTxn(WriteOptions(), TransactionOptions())); + assert(prepared_but_uncommitted_txn); + ASSERT_OK(prepared_but_uncommitted_txn->Put("foo0", "foo_value_0")); + ASSERT_OK( + prepared_but_uncommitted_txn->Put(handles_[1], "foo1", "foo_value_1")); + ASSERT_OK( + prepared_but_uncommitted_txn->SetName("prepared_but_uncommitted_txn")); + ASSERT_OK(prepared_but_uncommitted_txn->Prepare()); + + prepared_but_uncommitted_txn.reset(); + + WriteOptions write_opts; + write_opts.sync = true; + std::unique_ptr committed_txn( + NewTxn(write_opts, TransactionOptions())); + assert(committed_txn); + ASSERT_OK(committed_txn->Put("bar0", "bar_value_0")); + ASSERT_OK(committed_txn->Put(handles_[1], "bar1", "bar_value_1")); + ASSERT_OK(committed_txn->SetName("committed_txn")); + ASSERT_OK(committed_txn->Prepare()); + ASSERT_OK(committed_txn->SetCommitTimestamp(/*ts=*/23)); + ASSERT_OK(committed_txn->Commit()); + committed_txn.reset(); + + std::unique_ptr committed_without_prepare_txn( + NewTxn(write_opts, TransactionOptions())); + assert(committed_without_prepare_txn); + ASSERT_OK(committed_without_prepare_txn->Put("baz0", "baz_value_0")); + ASSERT_OK( + committed_without_prepare_txn->Put(handles_[1], "baz1", "baz_value_1")); + ASSERT_OK( + committed_without_prepare_txn->SetName("committed_without_prepare_txn")); + ASSERT_OK(committed_without_prepare_txn->SetCommitTimestamp(/*ts=*/24)); + ASSERT_OK(committed_without_prepare_txn->Commit()); + committed_without_prepare_txn.reset(); + + // Disable UDT and change write policy. + cf_descs[1].options.comparator = BytewiseComparator(); + txn_db_options.write_policy = TxnDBWritePolicy::WRITE_PREPARED; + ASSERT_NOK(ReOpenNoDelete(cf_descs, &handles_)); + + txn_db_options.write_policy = TxnDBWritePolicy::WRITE_UNPREPARED; + ASSERT_NOK(ReOpenNoDelete(cf_descs, &handles_)); +} + TEST_P(WriteCommittedTxnWithTsTest, TransactionDbLevelApi) { ASSERT_OK(ReOpenNoDelete());