From 282f5a463ba0ebc0ad47d8f5aa3564046eb0eb5c Mon Sep 17 00:00:00 2001 From: Yu Zhang Date: Wed, 6 Nov 2024 17:32:03 -0800 Subject: [PATCH] Fix write committed transactions replay when UDT setting toggles (#13121) Summary: This PR adds some missing pieces in order to handle UDT setting toggles while replay WALs for WriteCommitted transactions DB. Specifically, all the transaction markers for no op, prepare, commit, rollback are currently not carried over from the original WriteBatch to the new WriteBatch when there is a timestamp setting difference detected. This PR fills that gap. Pull Request resolved: https://github.com/facebook/rocksdb/pull/13121 Test Plan: Added unit tests Reviewed By: ltamasi Differential Revision: D65558801 Pulled By: jowlyzhang fbshipit-source-id: 8176882637b95f6dc0dad10d7fe21056fa5173d1 --- db/db_impl/db_impl_open.cc | 3 +- db/db_impl/db_impl_secondary.cc | 3 +- db/repair.cc | 5 +- db/write_batch.cc | 37 +- db/write_batch_internal.h | 9 + tools/ldb_cmd.cc | 1 + .../bug_fixes/write_committed_toggle_udt.md | 1 + util/udt_util.cc | 54 ++- util/udt_util.h | 35 +- util/udt_util_test.cc | 39 ++- .../write_committed_transaction_ts_test.cc | 324 ++++++++++++++++++ 11 files changed, 473 insertions(+), 38 deletions(-) create mode 100644 unreleased_history/bug_fixes/write_committed_toggle_udt.md diff --git a/db/db_impl/db_impl_open.cc b/db/db_impl/db_impl_open.cc index 7e5f08897a..8f073711ee 100644 --- a/db/db_impl/db_impl_open.cc +++ b/db/db_impl/db_impl_open.cc @@ -1275,7 +1275,8 @@ Status DBImpl::RecoverLogFiles(const std::vector& wal_numbers, reader.GetRecordedTimestampSize(); status = HandleWriteBatchTimestampSizeDifference( &batch, running_ts_sz, record_ts_sz, - TimestampSizeConsistencyMode::kReconcileInconsistency, &new_batch); + TimestampSizeConsistencyMode::kReconcileInconsistency, seq_per_batch_, + batch_per_txn_, &new_batch); if (!status.ok()) { return status; } diff --git a/db/db_impl/db_impl_secondary.cc b/db/db_impl/db_impl_secondary.cc index d333b70d4c..eb285e53b7 100644 --- a/db/db_impl/db_impl_secondary.cc +++ b/db/db_impl/db_impl_secondary.cc @@ -233,7 +233,8 @@ Status DBImplSecondary::RecoverLogFiles( reader->GetRecordedTimestampSize(); status = HandleWriteBatchTimestampSizeDifference( &batch, running_ts_sz, record_ts_sz, - TimestampSizeConsistencyMode::kVerifyConsistency); + TimestampSizeConsistencyMode::kVerifyConsistency, seq_per_batch_, + batch_per_txn_); if (!status.ok()) { break; } diff --git a/db/repair.cc b/db/repair.cc index 7b063e64ee..8c0f36d991 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -413,9 +413,12 @@ class Repairer { if (record_status.ok()) { const UnorderedMap& record_ts_sz = reader.GetRecordedTimestampSize(); + // Use same value for `seq_per_batch` and `batch_per_txn` as + // WriteBatchInternal::InsertInto does below. record_status = HandleWriteBatchTimestampSizeDifference( &batch, running_ts_sz, record_ts_sz, - TimestampSizeConsistencyMode::kVerifyConsistency); + TimestampSizeConsistencyMode::kVerifyConsistency, + /* seq_per_batch */ false, /* batch_per_txn */ true); if (record_status.ok()) { record_status = WriteBatchInternal::InsertInto(&batch, cf_mems, nullptr, nullptr); diff --git a/db/write_batch.cc b/db/write_batch.cc index 7cb8f6d11d..7a5ad78192 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -1160,6 +1160,34 @@ Status WriteBatchInternal::InsertNoop(WriteBatch* b) { return Status::OK(); } +ValueType WriteBatchInternal::GetBeginPrepareType(bool write_after_commit, + bool unprepared_batch) { + return write_after_commit + ? kTypeBeginPrepareXID + : (unprepared_batch ? kTypeBeginUnprepareXID + : kTypeBeginPersistedPrepareXID); +} + +Status WriteBatchInternal::InsertBeginPrepare(WriteBatch* b, + bool write_after_commit, + bool unprepared_batch) { + b->rep_.push_back(static_cast( + GetBeginPrepareType(write_after_commit, unprepared_batch))); + b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) | + ContentFlags::HAS_BEGIN_PREPARE, + std::memory_order_relaxed); + return Status::OK(); +} + +Status WriteBatchInternal::InsertEndPrepare(WriteBatch* b, const Slice& xid) { + b->rep_.push_back(static_cast(kTypeEndPrepareXID)); + PutLengthPrefixedSlice(&b->rep_, xid); + b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) | + ContentFlags::HAS_END_PREPARE, + std::memory_order_relaxed); + return Status::OK(); +} + Status WriteBatchInternal::MarkEndPrepare(WriteBatch* b, const Slice& xid, bool write_after_commit, bool unprepared_batch) { @@ -1175,13 +1203,8 @@ Status WriteBatchInternal::MarkEndPrepare(WriteBatch* b, const Slice& xid, // rewrite noop as begin marker b->rep_[12] = static_cast( - write_after_commit ? kTypeBeginPrepareXID - : (unprepared_batch ? kTypeBeginUnprepareXID - : kTypeBeginPersistedPrepareXID)); - b->rep_.push_back(static_cast(kTypeEndPrepareXID)); - PutLengthPrefixedSlice(&b->rep_, xid); + GetBeginPrepareType(write_after_commit, unprepared_batch)); b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) | - ContentFlags::HAS_END_PREPARE | ContentFlags::HAS_BEGIN_PREPARE, std::memory_order_relaxed); if (unprepared_batch) { @@ -1189,7 +1212,7 @@ Status WriteBatchInternal::MarkEndPrepare(WriteBatch* b, const Slice& xid, ContentFlags::HAS_BEGIN_UNPREPARE, std::memory_order_relaxed); } - return Status::OK(); + return WriteBatchInternal::InsertEndPrepare(b, xid); } Status WriteBatchInternal::MarkCommit(WriteBatch* b, const Slice& xid) { diff --git a/db/write_batch_internal.h b/db/write_batch_internal.h index e3388d2fc5..3cf3f4689a 100644 --- a/db/write_batch_internal.h +++ b/db/write_batch_internal.h @@ -122,6 +122,15 @@ class WriteBatchInternal { static Status PutBlobIndex(WriteBatch* batch, uint32_t column_family_id, const Slice& key, const Slice& value); + static ValueType GetBeginPrepareType(bool write_after_commit, + bool unprepared_batch); + + static Status InsertBeginPrepare(WriteBatch* batch, + const bool write_after_commit = true, + bool unprepared_batch = false); + + static Status InsertEndPrepare(WriteBatch* batch, const Slice& xid); + static Status MarkEndPrepare(WriteBatch* batch, const Slice& xid, const bool write_after_commit = true, const bool unprepared_batch = false); diff --git a/tools/ldb_cmd.cc b/tools/ldb_cmd.cc index 3ce811972e..ecf886ca72 100644 --- a/tools/ldb_cmd.cc +++ b/tools/ldb_cmd.cc @@ -3053,6 +3053,7 @@ void DumpWalFile(Options options, const std::string& wal_file, status = HandleWriteBatchTimestampSizeDifference( &batch, running_ts_sz, recorded_ts_sz, TimestampSizeConsistencyMode::kVerifyConsistency, + /* seq_per_batch */ false, /* batch_per_txn */ true, /*new_batch=*/nullptr); if (!status.ok()) { std::stringstream oss; diff --git a/unreleased_history/bug_fixes/write_committed_toggle_udt.md b/unreleased_history/bug_fixes/write_committed_toggle_udt.md new file mode 100644 index 0000000000..920d8a8a65 --- /dev/null +++ b/unreleased_history/bug_fixes/write_committed_toggle_udt.md @@ -0,0 +1 @@ +*Fix a bug for replaying WALs for WriteCommitted transaction DB when its user-defined timestamps setting is toggled on/off between DB sessions. \ No newline at end of file diff --git a/util/udt_util.cc b/util/udt_util.cc index c349ddf586..7a0eeb2e3d 100644 --- a/util/udt_util.cc +++ b/util/udt_util.cc @@ -139,9 +139,17 @@ ToggleUDT CompareComparator(const Comparator* new_comparator, TimestampRecoveryHandler::TimestampRecoveryHandler( const UnorderedMap& running_ts_sz, - const UnorderedMap& record_ts_sz) + const UnorderedMap& record_ts_sz, bool seq_per_batch, + bool batch_per_txn) : running_ts_sz_(running_ts_sz), record_ts_sz_(record_ts_sz), + // Write after commit currently uses one seq per key (instead of per + // batch). So seq_per_batch being false indicates write_after_commit + // approach. + write_after_commit_(!seq_per_batch), + // WriteUnprepared can write multiple WriteBatches per transaction, so + // batch_per_txn being false indicates write_before_prepare. + write_before_prepare_(!batch_per_txn), new_batch_(new WriteBatch()), handler_valid_(true), new_batch_diff_from_orig_batch_(false) {} @@ -258,6 +266,43 @@ Status TimestampRecoveryHandler::PutBlobIndexCF(uint32_t cf, const Slice& key, return WriteBatchInternal::PutBlobIndex(new_batch_.get(), cf, new_key, value); } +Status TimestampRecoveryHandler::MarkBeginPrepare(bool unprepare) { + // Transaction policy change requires empty WAL and User-defined timestamp is + // only supported for write committed txns. + // WriteBatch::Iterate has will handle this based on + // handler->WriteAfterCommit() and handler->WriteBeforePrepare(). + if (unprepare) { + return Status::InvalidArgument( + "Handle user defined timestamp setting change is not supported for" + "write unprepared policy. The WAL must be emptied."); + } + return WriteBatchInternal::InsertBeginPrepare(new_batch_.get(), + write_after_commit_, + /* unprepared_batch */ false); +} + +Status TimestampRecoveryHandler::MarkEndPrepare(const Slice& name) { + return WriteBatchInternal::InsertEndPrepare(new_batch_.get(), name); +} + +Status TimestampRecoveryHandler::MarkCommit(const Slice& name) { + return WriteBatchInternal::MarkCommit(new_batch_.get(), name); +} + +Status TimestampRecoveryHandler::MarkCommitWithTimestamp( + const Slice& name, const Slice& commit_ts) { + return WriteBatchInternal::MarkCommitWithTimestamp(new_batch_.get(), name, + commit_ts); +} + +Status TimestampRecoveryHandler::MarkRollback(const Slice& name) { + return WriteBatchInternal::MarkRollback(new_batch_.get(), name); +} + +Status TimestampRecoveryHandler::MarkNoop(bool /*empty_batch*/) { + return WriteBatchInternal::InsertNoop(new_batch_.get()); +} + Status TimestampRecoveryHandler::ReconcileTimestampDiscrepancy( uint32_t cf, const Slice& key, std::string* new_key_buf, Slice* new_key) { assert(handler_valid_); @@ -304,8 +349,8 @@ Status HandleWriteBatchTimestampSizeDifference( const WriteBatch* batch, const UnorderedMap& running_ts_sz, const UnorderedMap& record_ts_sz, - TimestampSizeConsistencyMode check_mode, - std::unique_ptr* new_batch) { + TimestampSizeConsistencyMode check_mode, bool seq_per_batch, + bool batch_per_txn, std::unique_ptr* new_batch) { // Quick path to bypass checking the WriteBatch. if (AllRunningColumnFamiliesConsistent(running_ts_sz, record_ts_sz)) { return Status::OK(); @@ -318,7 +363,8 @@ Status HandleWriteBatchTimestampSizeDifference( } else if (need_recovery) { assert(new_batch); SequenceNumber sequence = WriteBatchInternal::Sequence(batch); - TimestampRecoveryHandler recovery_handler(running_ts_sz, record_ts_sz); + TimestampRecoveryHandler recovery_handler(running_ts_sz, record_ts_sz, + seq_per_batch, batch_per_txn); status = batch->Iterate(&recovery_handler); if (!status.ok()) { return status; diff --git a/util/udt_util.h b/util/udt_util.h index a926e8e812..51ea76e854 100644 --- a/util/udt_util.h +++ b/util/udt_util.h @@ -105,7 +105,8 @@ class UserDefinedTimestampSizeRecord { class TimestampRecoveryHandler : public WriteBatch::Handler { public: TimestampRecoveryHandler(const UnorderedMap& running_ts_sz, - const UnorderedMap& record_ts_sz); + const UnorderedMap& record_ts_sz, + bool seq_per_batch, bool batch_per_txn); ~TimestampRecoveryHandler() override {} @@ -135,19 +136,18 @@ class TimestampRecoveryHandler : public WriteBatch::Handler { Status PutBlobIndexCF(uint32_t cf, const Slice& key, const Slice& value) override; - Status MarkBeginPrepare(bool) override { return Status::OK(); } + Status MarkBeginPrepare(bool unprepare) override; - Status MarkEndPrepare(const Slice&) override { return Status::OK(); } + Status MarkEndPrepare(const Slice& name) override; - Status MarkCommit(const Slice&) override { return Status::OK(); } + Status MarkCommit(const Slice& name) override; - Status MarkCommitWithTimestamp(const Slice&, const Slice&) override { - return Status::OK(); - } + Status MarkCommitWithTimestamp(const Slice& name, + const Slice& commit_ts) override; - Status MarkRollback(const Slice&) override { return Status::OK(); } + Status MarkRollback(const Slice& name) override; - Status MarkNoop(bool /*empty_batch*/) override { return Status::OK(); } + Status MarkNoop(bool empty_batch) override; std::unique_ptr&& TransferNewBatch() { assert(new_batch_diff_from_orig_batch_); @@ -155,6 +155,16 @@ class TimestampRecoveryHandler : public WriteBatch::Handler { return std::move(new_batch_); } + protected: + Handler::OptionState WriteBeforePrepare() const override { + return write_before_prepare_ ? Handler::OptionState::kEnabled + : Handler::OptionState::kDisabled; + } + Handler::OptionState WriteAfterCommit() const override { + return write_after_commit_ ? Handler::OptionState::kEnabled + : Handler::OptionState::kDisabled; + } + private: Status ReconcileTimestampDiscrepancy(uint32_t cf, const Slice& key, std::string* new_key_buf, @@ -168,6 +178,9 @@ class TimestampRecoveryHandler : public WriteBatch::Handler { // in the WAL. This only contains non-zero user-defined timestamp size. const UnorderedMap& record_ts_sz_; + bool write_after_commit_; + bool write_before_prepare_; + std::unique_ptr new_batch_; // Handler is valid upon creation and becomes invalid after its `new_batch_` // is transferred. @@ -220,8 +233,8 @@ Status HandleWriteBatchTimestampSizeDifference( const WriteBatch* batch, const UnorderedMap& running_ts_sz, const UnorderedMap& record_ts_sz, - TimestampSizeConsistencyMode check_mode, - std::unique_ptr* new_batch = nullptr); + TimestampSizeConsistencyMode check_mode, bool seq_per_batch, + bool batch_per_txn, std::unique_ptr* new_batch = nullptr); // This util function is used when opening an existing column family and // processing its VersionEdit. It does a sanity check for the column family's diff --git a/util/udt_util_test.cc b/util/udt_util_test.cc index ecd5e1773f..b3248ddbf3 100644 --- a/util/udt_util_test.cc +++ b/util/udt_util_test.cc @@ -226,11 +226,13 @@ TEST_F(HandleTimestampSizeDifferenceTest, AllColumnFamiliesConsistent) { // All `check_mode` pass with OK status and `batch` not checked or updated. ASSERT_OK(HandleWriteBatchTimestampSizeDifference( &batch, running_ts_sz, record_ts_sz, - TimestampSizeConsistencyMode::kVerifyConsistency)); + TimestampSizeConsistencyMode::kVerifyConsistency, + /* seq_per_batch */ false, /* batch_per_txn */ true)); std::unique_ptr new_batch(nullptr); ASSERT_OK(HandleWriteBatchTimestampSizeDifference( &batch, running_ts_sz, record_ts_sz, - TimestampSizeConsistencyMode::kReconcileInconsistency, &new_batch)); + TimestampSizeConsistencyMode::kReconcileInconsistency, + /* seq_per_batch */ false, /* batch_per_txn */ true, &new_batch)); ASSERT_TRUE(new_batch.get() == nullptr); } @@ -245,11 +247,13 @@ TEST_F(HandleTimestampSizeDifferenceTest, // All `check_mode` pass with OK status and `batch` not checked or updated. ASSERT_OK(HandleWriteBatchTimestampSizeDifference( &batch, running_ts_sz, record_ts_sz, - TimestampSizeConsistencyMode::kVerifyConsistency)); + TimestampSizeConsistencyMode::kVerifyConsistency, + /* seq_per_batch */ false, /* batch_per_txn */ true)); std::unique_ptr new_batch(nullptr); ASSERT_OK(HandleWriteBatchTimestampSizeDifference( &batch, running_ts_sz, record_ts_sz, - TimestampSizeConsistencyMode::kReconcileInconsistency, &new_batch)); + TimestampSizeConsistencyMode::kReconcileInconsistency, + /* seq_per_batch */ false, /* batch_per_txn */ true, &new_batch)); ASSERT_TRUE(new_batch.get() == nullptr); } @@ -263,11 +267,13 @@ TEST_F(HandleTimestampSizeDifferenceTest, InvolvedColumnFamiliesConsistent) { // All `check_mode` pass with OK status and `batch` not updated. ASSERT_OK(HandleWriteBatchTimestampSizeDifference( &batch, running_ts_sz, record_ts_sz, - TimestampSizeConsistencyMode::kVerifyConsistency)); + TimestampSizeConsistencyMode::kVerifyConsistency, + /* seq_per_batch */ false, /* batch_per_txn */ true)); std::unique_ptr new_batch(nullptr); ASSERT_OK(HandleWriteBatchTimestampSizeDifference( &batch, running_ts_sz, record_ts_sz, - TimestampSizeConsistencyMode::kReconcileInconsistency, &new_batch)); + TimestampSizeConsistencyMode::kReconcileInconsistency, + /* seq_per_batch */ false, /* batch_per_txn */ true, &new_batch)); ASSERT_TRUE(new_batch.get() == nullptr); } @@ -282,13 +288,15 @@ TEST_F(HandleTimestampSizeDifferenceTest, // families. ASSERT_TRUE(HandleWriteBatchTimestampSizeDifference( &batch, running_ts_sz, record_ts_sz, - TimestampSizeConsistencyMode::kVerifyConsistency) + TimestampSizeConsistencyMode::kVerifyConsistency, + /* seq_per_batch */ false, /* batch_per_txn */ true) .IsInvalidArgument()); std::unique_ptr new_batch(nullptr); ASSERT_OK(HandleWriteBatchTimestampSizeDifference( &batch, running_ts_sz, record_ts_sz, - TimestampSizeConsistencyMode::kReconcileInconsistency, &new_batch)); + TimestampSizeConsistencyMode::kReconcileInconsistency, + /* seq_per_batch */ false, /* batch_per_txn */ true, &new_batch)); ASSERT_TRUE(new_batch.get() != nullptr); CheckContentsWithTimestampStripping(batch, *new_batch, sizeof(uint64_t), std::nullopt /* dropped_cf */); @@ -307,13 +315,15 @@ TEST_F(HandleTimestampSizeDifferenceTest, // families. ASSERT_TRUE(HandleWriteBatchTimestampSizeDifference( &batch, running_ts_sz, record_ts_sz, - TimestampSizeConsistencyMode::kVerifyConsistency) + TimestampSizeConsistencyMode::kVerifyConsistency, + /* seq_per_batch */ false, /* batch_per_txn */ true) .IsInvalidArgument()); std::unique_ptr new_batch(nullptr); ASSERT_OK(HandleWriteBatchTimestampSizeDifference( &batch, running_ts_sz, record_ts_sz, - TimestampSizeConsistencyMode::kReconcileInconsistency, &new_batch)); + TimestampSizeConsistencyMode::kReconcileInconsistency, + /* seq_per_batch */ false, /* batch_per_txn */ true, &new_batch)); ASSERT_TRUE(new_batch.get() != nullptr); CheckContentsWithTimestampPadding(batch, *new_batch, sizeof(uint64_t)); } @@ -331,7 +341,8 @@ TEST_F(HandleTimestampSizeDifferenceTest, // and all related entries copied over to the new WriteBatch. ASSERT_OK(HandleWriteBatchTimestampSizeDifference( &batch, running_ts_sz, record_ts_sz, - TimestampSizeConsistencyMode::kReconcileInconsistency, &new_batch)); + TimestampSizeConsistencyMode::kReconcileInconsistency, + /* seq_per_batch */ false, /* batch_per_txn */ true, &new_batch)); ASSERT_TRUE(new_batch.get() != nullptr); CheckContentsWithTimestampStripping(batch, *new_batch, sizeof(uint64_t), @@ -346,12 +357,14 @@ TEST_F(HandleTimestampSizeDifferenceTest, UnrecoverableInconsistency) { ASSERT_TRUE(HandleWriteBatchTimestampSizeDifference( &batch, running_ts_sz, record_ts_sz, - TimestampSizeConsistencyMode::kVerifyConsistency) + TimestampSizeConsistencyMode::kVerifyConsistency, + /* seq_per_batch */ false, /* batch_per_txn */ true) .IsInvalidArgument()); ASSERT_TRUE(HandleWriteBatchTimestampSizeDifference( &batch, running_ts_sz, record_ts_sz, - TimestampSizeConsistencyMode::kReconcileInconsistency) + TimestampSizeConsistencyMode::kReconcileInconsistency, + /* seq_per_batch */ false, /* batch_per_txn */ true) .IsInvalidArgument()); } diff --git a/utilities/transactions/write_committed_transaction_ts_test.cc b/utilities/transactions/write_committed_transaction_ts_test.cc index 47b1a0df4d..d73371f80f 100644 --- a/utilities/transactions/write_committed_transaction_ts_test.cc +++ b/utilities/transactions/write_committed_transaction_ts_test.cc @@ -418,6 +418,21 @@ TEST_P(WriteCommittedTxnWithTsTest, RecoverFromWal) { ASSERT_OK(txn3->Prepare()); txn3.reset(); + std::unique_ptr txn4( + NewTxn(WriteOptions(), TransactionOptions())); + assert(txn4); + ASSERT_OK(txn4->SetName("no_op_txn")); + txn4.reset(); + + std::unique_ptr rolled_back_txn( + NewTxn(write_opts, TransactionOptions())); + ASSERT_NE(nullptr, rolled_back_txn); + ASSERT_OK(rolled_back_txn->Put("non_exist0", "donotcare")); + ASSERT_OK(rolled_back_txn->Put(handles_[1], "non_exist1", "donotcare")); + ASSERT_OK(rolled_back_txn->SetName("rolled_back_txn")); + ASSERT_OK(rolled_back_txn->Rollback()); + rolled_back_txn.reset(); + ASSERT_OK(ReOpenNoDelete(cf_descs, &handles_)); { @@ -452,9 +467,318 @@ TEST_P(WriteCommittedTxnWithTsTest, RecoverFromWal) { s = GetFromDb(ReadOptions(), handles_[1], "baz", /*ts=*/24, &value); ASSERT_TRUE(s.IsNotFound()); + + Transaction* no_op_txn = db->GetTransactionByName("no_op_txn"); + ASSERT_EQ(nullptr, no_op_txn); + + s = db->Get(ReadOptions(), handles_[0], "non_exist0", &value); + ASSERT_TRUE(s.IsNotFound()); + + s = GetFromDb(ReadOptions(), handles_[1], "non_exist1", /*ts=*/24, &value); + ASSERT_TRUE(s.IsNotFound()); } } +TEST_P(WriteCommittedTxnWithTsTest, EnabledUDTDisabledRecoverFromWal) { + // This feature is not compatible with UDT in memtable only. + options.allow_concurrent_memtable_write = false; + ASSERT_OK(ReOpenNoDelete()); + + ColumnFamilyOptions cf_opts; + cf_opts.comparator = test::BytewiseComparatorWithU64TsWrapper(); + cf_opts.persist_user_defined_timestamps = false; + const std::string test_cf_name = "test_cf"; + ColumnFamilyHandle* cfh = nullptr; + assert(db); + ASSERT_OK(db->CreateColumnFamily(cf_opts, test_cf_name, &cfh)); + delete cfh; + cfh = nullptr; + + std::vector cf_descs; + cf_descs.emplace_back(kDefaultColumnFamilyName, options); + cf_descs.emplace_back(test_cf_name, cf_opts); + options.avoid_flush_during_shutdown = true; + ASSERT_OK(ReOpenNoDelete(cf_descs, &handles_)); + + std::unique_ptr no_op_txn( + NewTxn(WriteOptions(), TransactionOptions())); + ASSERT_NE(nullptr, no_op_txn); + ASSERT_OK(no_op_txn->SetName("no_op_txn")); + no_op_txn.reset(); + + std::unique_ptr prepared_but_uncommitted_txn( + NewTxn(WriteOptions(), TransactionOptions())); + ASSERT_NE(nullptr, prepared_but_uncommitted_txn); + ASSERT_OK(prepared_but_uncommitted_txn->Put("foo0", "foo_value_0")); + ASSERT_OK( + prepared_but_uncommitted_txn->Put(handles_[1], "foo1", "foo_value_1")); + ASSERT_OK( + prepared_but_uncommitted_txn->SetName("prepared_but_uncommitted_txn")); + ASSERT_OK(prepared_but_uncommitted_txn->Prepare()); + + prepared_but_uncommitted_txn.reset(); + + WriteOptions write_opts; + write_opts.sync = true; + std::unique_ptr committed_txn( + NewTxn(write_opts, TransactionOptions())); + ASSERT_NE(nullptr, committed_txn); + ASSERT_OK(committed_txn->Put("bar0", "bar_value_0")); + ASSERT_OK(committed_txn->Put(handles_[1], "bar1", "bar_value_1")); + ASSERT_OK(committed_txn->SetName("committed_txn")); + ASSERT_OK(committed_txn->Prepare()); + ASSERT_OK(committed_txn->SetCommitTimestamp(/*ts=*/23)); + ASSERT_OK(committed_txn->Commit()); + committed_txn.reset(); + + std::unique_ptr committed_without_prepare_txn( + NewTxn(write_opts, TransactionOptions())); + ASSERT_NE(nullptr, committed_without_prepare_txn); + ASSERT_OK(committed_without_prepare_txn->Put("baz0", "baz_value_0")); + ASSERT_OK( + committed_without_prepare_txn->Put(handles_[1], "baz1", "baz_value_1")); + ASSERT_OK( + committed_without_prepare_txn->SetName("committed_without_prepare_txn")); + ASSERT_OK(committed_without_prepare_txn->SetCommitTimestamp(/*ts=*/24)); + ASSERT_OK(committed_without_prepare_txn->Commit()); + committed_without_prepare_txn.reset(); + + std::unique_ptr rolled_back_txn( + NewTxn(write_opts, TransactionOptions())); + assert(rolled_back_txn); + ASSERT_OK(rolled_back_txn->Put("non_exist0", "donotcare")); + ASSERT_OK(rolled_back_txn->Put(handles_[1], "non_exist1", "donotcare")); + ASSERT_OK(rolled_back_txn->SetName("rolled_back_txn")); + ASSERT_OK(rolled_back_txn->Rollback()); + rolled_back_txn.reset(); + + // Reopen and disable UDT to replay WAL entries. + cf_descs[1].options.comparator = BytewiseComparator(); + ASSERT_OK(ReOpenNoDelete(cf_descs, &handles_)); + + { + Transaction* recovered_txn0 = db->GetTransactionByName("no_op_txn"); + ASSERT_EQ(nullptr, recovered_txn0); + + Transaction* recovered_txn1 = + db->GetTransactionByName("prepared_but_uncommitted_txn"); + ASSERT_NE(nullptr, recovered_txn1); + std::string value; + ASSERT_OK(recovered_txn1->Commit()); + Status s = db->Get(ReadOptions(), handles_[0], "foo0", &value); + ASSERT_OK(s); + ASSERT_EQ("foo_value_0", value); + s = db->Get(ReadOptions(), handles_[1], "foo1", &value); + ASSERT_OK(s); + ASSERT_EQ("foo_value_1", value); + delete recovered_txn1; + + ASSERT_EQ(nullptr, db->GetTransactionByName("committed_txn")); + s = db->Get(ReadOptions(), handles_[0], "bar0", &value); + ASSERT_OK(s); + ASSERT_EQ("bar_value_0", value); + s = db->Get(ReadOptions(), handles_[1], "bar1", &value); + ASSERT_OK(s); + ASSERT_EQ("bar_value_1", value); + + ASSERT_EQ(nullptr, + db->GetTransactionByName("committed_without_prepare_txn")); + s = db->Get(ReadOptions(), handles_[0], "baz0", &value); + ASSERT_OK(s); + ASSERT_EQ("baz_value_0", value); + s = db->Get(ReadOptions(), handles_[1], "baz1", &value); + ASSERT_OK(s); + ASSERT_EQ("baz_value_1", value); + + ASSERT_EQ(nullptr, db->GetTransactionByName("rolled_back_txn")); + s = db->Get(ReadOptions(), handles_[0], "non_exist0", &value); + ASSERT_TRUE(s.IsNotFound()); + s = db->Get(ReadOptions(), handles_[1], "non_exist1", &value); + ASSERT_TRUE(s.IsNotFound()); + } +} + +TEST_P(WriteCommittedTxnWithTsTest, UDTNewlyEnabledRecoverFromWal) { + ASSERT_OK(ReOpenNoDelete()); + + ColumnFamilyOptions cf_opts; + const std::string test_cf_name = "test_cf"; + ColumnFamilyHandle* cfh = nullptr; + assert(db); + ASSERT_OK(db->CreateColumnFamily(cf_opts, test_cf_name, &cfh)); + delete cfh; + cfh = nullptr; + + std::vector cf_descs; + cf_descs.emplace_back(kDefaultColumnFamilyName, options); + cf_descs.emplace_back(test_cf_name, cf_opts); + options.avoid_flush_during_shutdown = true; + ASSERT_OK(ReOpenNoDelete(cf_descs, &handles_)); + + std::unique_ptr no_op_txn( + NewTxn(WriteOptions(), TransactionOptions())); + ASSERT_NE(nullptr, no_op_txn); + ASSERT_OK(no_op_txn->SetName("no_op_txn")); + no_op_txn.reset(); + + std::unique_ptr prepared_but_uncommitted_txn( + NewTxn(WriteOptions(), TransactionOptions())); + ASSERT_NE(nullptr, prepared_but_uncommitted_txn); + ASSERT_OK( + prepared_but_uncommitted_txn->Put(handles_[0], "foo0", "foo_value_0")); + ASSERT_OK( + prepared_but_uncommitted_txn->Put(handles_[1], "foo1", "foo_value_1")); + ASSERT_OK( + prepared_but_uncommitted_txn->SetName("prepared_but_uncommitted_txn")); + ASSERT_OK(prepared_but_uncommitted_txn->Prepare()); + + prepared_but_uncommitted_txn.reset(); + + WriteOptions write_opts; + write_opts.sync = true; + std::unique_ptr committed_txn( + NewTxn(write_opts, TransactionOptions())); + ASSERT_NE(nullptr, committed_txn); + ASSERT_OK(committed_txn->Put("bar0", "bar_value_0")); + ASSERT_OK(committed_txn->Put(handles_[1], "bar1", "bar_value_1")); + ASSERT_OK(committed_txn->SetName("committed_txn")); + ASSERT_OK(committed_txn->Prepare()); + ASSERT_OK(committed_txn->Commit()); + committed_txn.reset(); + + std::unique_ptr committed_without_prepare_txn( + NewTxn(write_opts, TransactionOptions())); + assert(committed_without_prepare_txn); + ASSERT_OK(committed_without_prepare_txn->Put("baz0", "baz_value_0")); + ASSERT_OK( + committed_without_prepare_txn->Put(handles_[1], "baz1", "baz_value_1")); + ASSERT_OK( + committed_without_prepare_txn->SetName("committed_without_prepare_txn")); + ASSERT_OK(committed_without_prepare_txn->Commit()); + committed_without_prepare_txn.reset(); + + std::unique_ptr rolled_back_txn( + NewTxn(write_opts, TransactionOptions())); + ASSERT_NE(nullptr, rolled_back_txn); + ASSERT_OK(rolled_back_txn->Put("non_exist0", "donotcare")); + ASSERT_OK(rolled_back_txn->Put(handles_[1], "non_exist1", "donotcare")); + ASSERT_OK(rolled_back_txn->SetName("rolled_back_txn")); + ASSERT_OK(rolled_back_txn->Rollback()); + rolled_back_txn.reset(); + + // Reopen and enable UDT to replay WAL entries. + options.allow_concurrent_memtable_write = false; + cf_descs[1].options.comparator = test::BytewiseComparatorWithU64TsWrapper(); + cf_descs[1].options.persist_user_defined_timestamps = false; + ASSERT_OK(ReOpenNoDelete(cf_descs, &handles_)); + + { + Transaction* recovered_txn1 = + db->GetTransactionByName("prepared_but_uncommitted_txn"); + ASSERT_NE(nullptr, recovered_txn1); + std::string value; + ASSERT_OK(recovered_txn1->SetCommitTimestamp(23)); + ASSERT_OK(recovered_txn1->Commit()); + Status s = db->Get(ReadOptions(), handles_[0], "foo0", &value); + ASSERT_OK(s); + ASSERT_EQ("foo_value_0", value); + s = GetFromDb(ReadOptions(), handles_[1], "foo1", /*ts=*/23, &value); + ASSERT_OK(s); + ASSERT_EQ("foo_value_1", value); + delete recovered_txn1; + + ASSERT_EQ(nullptr, db->GetTransactionByName("committed_txn")); + s = db->Get(ReadOptions(), handles_[0], "bar0", &value); + ASSERT_OK(s); + ASSERT_EQ("bar_value_0", value); + s = GetFromDb(ReadOptions(), handles_[1], "bar1", /*ts=*/23, &value); + ASSERT_OK(s); + ASSERT_EQ("bar_value_1", value); + + ASSERT_EQ(nullptr, + db->GetTransactionByName("committed_without_prepare_txn")); + s = db->Get(ReadOptions(), handles_[0], "baz0", &value); + ASSERT_OK(s); + ASSERT_EQ("baz_value_0", value); + s = GetFromDb(ReadOptions(), handles_[1], "baz1", /*ts=*/23, &value); + ASSERT_OK(s); + ASSERT_EQ("baz_value_1", value); + + ASSERT_EQ(nullptr, db->GetTransactionByName("rolled_back_txn")); + s = db->Get(ReadOptions(), handles_[0], "non_exist0", &value); + ASSERT_TRUE(s.IsNotFound()); + s = GetFromDb(ReadOptions(), handles_[1], "non_exist1", /*ts=*/23, &value); + ASSERT_TRUE(s.IsNotFound()); + } +} + +TEST_P(WriteCommittedTxnWithTsTest, ChangeFromWriteCommittedAndDisableUDT) { + // This feature is not compatible with UDT in memtable only. + options.allow_concurrent_memtable_write = false; + ASSERT_OK(ReOpenNoDelete()); + + ColumnFamilyOptions cf_opts; + cf_opts.comparator = test::BytewiseComparatorWithU64TsWrapper(); + cf_opts.persist_user_defined_timestamps = false; + const std::string test_cf_name = "test_cf"; + ColumnFamilyHandle* cfh = nullptr; + assert(db); + ASSERT_OK(db->CreateColumnFamily(cf_opts, test_cf_name, &cfh)); + delete cfh; + cfh = nullptr; + + std::vector cf_descs; + cf_descs.emplace_back(kDefaultColumnFamilyName, options); + cf_descs.emplace_back(test_cf_name, cf_opts); + options.avoid_flush_during_shutdown = true; + ASSERT_OK(ReOpenNoDelete(cf_descs, &handles_)); + + std::unique_ptr prepared_but_uncommitted_txn( + NewTxn(WriteOptions(), TransactionOptions())); + assert(prepared_but_uncommitted_txn); + ASSERT_OK(prepared_but_uncommitted_txn->Put("foo0", "foo_value_0")); + ASSERT_OK( + prepared_but_uncommitted_txn->Put(handles_[1], "foo1", "foo_value_1")); + ASSERT_OK( + prepared_but_uncommitted_txn->SetName("prepared_but_uncommitted_txn")); + ASSERT_OK(prepared_but_uncommitted_txn->Prepare()); + + prepared_but_uncommitted_txn.reset(); + + WriteOptions write_opts; + write_opts.sync = true; + std::unique_ptr committed_txn( + NewTxn(write_opts, TransactionOptions())); + assert(committed_txn); + ASSERT_OK(committed_txn->Put("bar0", "bar_value_0")); + ASSERT_OK(committed_txn->Put(handles_[1], "bar1", "bar_value_1")); + ASSERT_OK(committed_txn->SetName("committed_txn")); + ASSERT_OK(committed_txn->Prepare()); + ASSERT_OK(committed_txn->SetCommitTimestamp(/*ts=*/23)); + ASSERT_OK(committed_txn->Commit()); + committed_txn.reset(); + + std::unique_ptr committed_without_prepare_txn( + NewTxn(write_opts, TransactionOptions())); + assert(committed_without_prepare_txn); + ASSERT_OK(committed_without_prepare_txn->Put("baz0", "baz_value_0")); + ASSERT_OK( + committed_without_prepare_txn->Put(handles_[1], "baz1", "baz_value_1")); + ASSERT_OK( + committed_without_prepare_txn->SetName("committed_without_prepare_txn")); + ASSERT_OK(committed_without_prepare_txn->SetCommitTimestamp(/*ts=*/24)); + ASSERT_OK(committed_without_prepare_txn->Commit()); + committed_without_prepare_txn.reset(); + + // Disable UDT and change write policy. + cf_descs[1].options.comparator = BytewiseComparator(); + txn_db_options.write_policy = TxnDBWritePolicy::WRITE_PREPARED; + ASSERT_NOK(ReOpenNoDelete(cf_descs, &handles_)); + + txn_db_options.write_policy = TxnDBWritePolicy::WRITE_UNPREPARED; + ASSERT_NOK(ReOpenNoDelete(cf_descs, &handles_)); +} + TEST_P(WriteCommittedTxnWithTsTest, TransactionDbLevelApi) { ASSERT_OK(ReOpenNoDelete());