Fix write committed transactions replay when UDT setting toggles (#13121)

Summary:
This PR adds some missing pieces in order to handle UDT setting toggles while replay WALs for WriteCommitted transactions DB. Specifically, all the transaction markers for no op, prepare, commit, rollback are currently not carried over from the original WriteBatch to the new WriteBatch when there is a timestamp setting difference detected. This PR fills that gap.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/13121

Test Plan: Added unit tests

Reviewed By: ltamasi

Differential Revision: D65558801

Pulled By: jowlyzhang

fbshipit-source-id: 8176882637b95f6dc0dad10d7fe21056fa5173d1
This commit is contained in:
Yu Zhang 2024-11-06 17:32:03 -08:00 committed by Facebook GitHub Bot
parent 2ba4dceb4c
commit 282f5a463b
11 changed files with 473 additions and 38 deletions

View File

@ -1275,7 +1275,8 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& wal_numbers,
reader.GetRecordedTimestampSize(); reader.GetRecordedTimestampSize();
status = HandleWriteBatchTimestampSizeDifference( status = HandleWriteBatchTimestampSizeDifference(
&batch, running_ts_sz, record_ts_sz, &batch, running_ts_sz, record_ts_sz,
TimestampSizeConsistencyMode::kReconcileInconsistency, &new_batch); TimestampSizeConsistencyMode::kReconcileInconsistency, seq_per_batch_,
batch_per_txn_, &new_batch);
if (!status.ok()) { if (!status.ok()) {
return status; return status;
} }

View File

@ -233,7 +233,8 @@ Status DBImplSecondary::RecoverLogFiles(
reader->GetRecordedTimestampSize(); reader->GetRecordedTimestampSize();
status = HandleWriteBatchTimestampSizeDifference( status = HandleWriteBatchTimestampSizeDifference(
&batch, running_ts_sz, record_ts_sz, &batch, running_ts_sz, record_ts_sz,
TimestampSizeConsistencyMode::kVerifyConsistency); TimestampSizeConsistencyMode::kVerifyConsistency, seq_per_batch_,
batch_per_txn_);
if (!status.ok()) { if (!status.ok()) {
break; break;
} }

View File

@ -413,9 +413,12 @@ class Repairer {
if (record_status.ok()) { if (record_status.ok()) {
const UnorderedMap<uint32_t, size_t>& record_ts_sz = const UnorderedMap<uint32_t, size_t>& record_ts_sz =
reader.GetRecordedTimestampSize(); reader.GetRecordedTimestampSize();
// Use same value for `seq_per_batch` and `batch_per_txn` as
// WriteBatchInternal::InsertInto does below.
record_status = HandleWriteBatchTimestampSizeDifference( record_status = HandleWriteBatchTimestampSizeDifference(
&batch, running_ts_sz, record_ts_sz, &batch, running_ts_sz, record_ts_sz,
TimestampSizeConsistencyMode::kVerifyConsistency); TimestampSizeConsistencyMode::kVerifyConsistency,
/* seq_per_batch */ false, /* batch_per_txn */ true);
if (record_status.ok()) { if (record_status.ok()) {
record_status = record_status =
WriteBatchInternal::InsertInto(&batch, cf_mems, nullptr, nullptr); WriteBatchInternal::InsertInto(&batch, cf_mems, nullptr, nullptr);

View File

@ -1160,6 +1160,34 @@ Status WriteBatchInternal::InsertNoop(WriteBatch* b) {
return Status::OK(); return Status::OK();
} }
ValueType WriteBatchInternal::GetBeginPrepareType(bool write_after_commit,
bool unprepared_batch) {
return write_after_commit
? kTypeBeginPrepareXID
: (unprepared_batch ? kTypeBeginUnprepareXID
: kTypeBeginPersistedPrepareXID);
}
Status WriteBatchInternal::InsertBeginPrepare(WriteBatch* b,
bool write_after_commit,
bool unprepared_batch) {
b->rep_.push_back(static_cast<char>(
GetBeginPrepareType(write_after_commit, unprepared_batch)));
b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
ContentFlags::HAS_BEGIN_PREPARE,
std::memory_order_relaxed);
return Status::OK();
}
Status WriteBatchInternal::InsertEndPrepare(WriteBatch* b, const Slice& xid) {
b->rep_.push_back(static_cast<char>(kTypeEndPrepareXID));
PutLengthPrefixedSlice(&b->rep_, xid);
b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
ContentFlags::HAS_END_PREPARE,
std::memory_order_relaxed);
return Status::OK();
}
Status WriteBatchInternal::MarkEndPrepare(WriteBatch* b, const Slice& xid, Status WriteBatchInternal::MarkEndPrepare(WriteBatch* b, const Slice& xid,
bool write_after_commit, bool write_after_commit,
bool unprepared_batch) { bool unprepared_batch) {
@ -1175,13 +1203,8 @@ Status WriteBatchInternal::MarkEndPrepare(WriteBatch* b, const Slice& xid,
// rewrite noop as begin marker // rewrite noop as begin marker
b->rep_[12] = static_cast<char>( b->rep_[12] = static_cast<char>(
write_after_commit ? kTypeBeginPrepareXID GetBeginPrepareType(write_after_commit, unprepared_batch));
: (unprepared_batch ? kTypeBeginUnprepareXID
: kTypeBeginPersistedPrepareXID));
b->rep_.push_back(static_cast<char>(kTypeEndPrepareXID));
PutLengthPrefixedSlice(&b->rep_, xid);
b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) | b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
ContentFlags::HAS_END_PREPARE |
ContentFlags::HAS_BEGIN_PREPARE, ContentFlags::HAS_BEGIN_PREPARE,
std::memory_order_relaxed); std::memory_order_relaxed);
if (unprepared_batch) { if (unprepared_batch) {
@ -1189,7 +1212,7 @@ Status WriteBatchInternal::MarkEndPrepare(WriteBatch* b, const Slice& xid,
ContentFlags::HAS_BEGIN_UNPREPARE, ContentFlags::HAS_BEGIN_UNPREPARE,
std::memory_order_relaxed); std::memory_order_relaxed);
} }
return Status::OK(); return WriteBatchInternal::InsertEndPrepare(b, xid);
} }
Status WriteBatchInternal::MarkCommit(WriteBatch* b, const Slice& xid) { Status WriteBatchInternal::MarkCommit(WriteBatch* b, const Slice& xid) {

View File

@ -122,6 +122,15 @@ class WriteBatchInternal {
static Status PutBlobIndex(WriteBatch* batch, uint32_t column_family_id, static Status PutBlobIndex(WriteBatch* batch, uint32_t column_family_id,
const Slice& key, const Slice& value); const Slice& key, const Slice& value);
static ValueType GetBeginPrepareType(bool write_after_commit,
bool unprepared_batch);
static Status InsertBeginPrepare(WriteBatch* batch,
const bool write_after_commit = true,
bool unprepared_batch = false);
static Status InsertEndPrepare(WriteBatch* batch, const Slice& xid);
static Status MarkEndPrepare(WriteBatch* batch, const Slice& xid, static Status MarkEndPrepare(WriteBatch* batch, const Slice& xid,
const bool write_after_commit = true, const bool write_after_commit = true,
const bool unprepared_batch = false); const bool unprepared_batch = false);

View File

@ -3053,6 +3053,7 @@ void DumpWalFile(Options options, const std::string& wal_file,
status = HandleWriteBatchTimestampSizeDifference( status = HandleWriteBatchTimestampSizeDifference(
&batch, running_ts_sz, recorded_ts_sz, &batch, running_ts_sz, recorded_ts_sz,
TimestampSizeConsistencyMode::kVerifyConsistency, TimestampSizeConsistencyMode::kVerifyConsistency,
/* seq_per_batch */ false, /* batch_per_txn */ true,
/*new_batch=*/nullptr); /*new_batch=*/nullptr);
if (!status.ok()) { if (!status.ok()) {
std::stringstream oss; std::stringstream oss;

View File

@ -0,0 +1 @@
*Fix a bug for replaying WALs for WriteCommitted transaction DB when its user-defined timestamps setting is toggled on/off between DB sessions.

View File

@ -139,9 +139,17 @@ ToggleUDT CompareComparator(const Comparator* new_comparator,
TimestampRecoveryHandler::TimestampRecoveryHandler( TimestampRecoveryHandler::TimestampRecoveryHandler(
const UnorderedMap<uint32_t, size_t>& running_ts_sz, const UnorderedMap<uint32_t, size_t>& running_ts_sz,
const UnorderedMap<uint32_t, size_t>& record_ts_sz) const UnorderedMap<uint32_t, size_t>& record_ts_sz, bool seq_per_batch,
bool batch_per_txn)
: running_ts_sz_(running_ts_sz), : running_ts_sz_(running_ts_sz),
record_ts_sz_(record_ts_sz), record_ts_sz_(record_ts_sz),
// Write after commit currently uses one seq per key (instead of per
// batch). So seq_per_batch being false indicates write_after_commit
// approach.
write_after_commit_(!seq_per_batch),
// WriteUnprepared can write multiple WriteBatches per transaction, so
// batch_per_txn being false indicates write_before_prepare.
write_before_prepare_(!batch_per_txn),
new_batch_(new WriteBatch()), new_batch_(new WriteBatch()),
handler_valid_(true), handler_valid_(true),
new_batch_diff_from_orig_batch_(false) {} new_batch_diff_from_orig_batch_(false) {}
@ -258,6 +266,43 @@ Status TimestampRecoveryHandler::PutBlobIndexCF(uint32_t cf, const Slice& key,
return WriteBatchInternal::PutBlobIndex(new_batch_.get(), cf, new_key, value); return WriteBatchInternal::PutBlobIndex(new_batch_.get(), cf, new_key, value);
} }
Status TimestampRecoveryHandler::MarkBeginPrepare(bool unprepare) {
// Transaction policy change requires empty WAL and User-defined timestamp is
// only supported for write committed txns.
// WriteBatch::Iterate has will handle this based on
// handler->WriteAfterCommit() and handler->WriteBeforePrepare().
if (unprepare) {
return Status::InvalidArgument(
"Handle user defined timestamp setting change is not supported for"
"write unprepared policy. The WAL must be emptied.");
}
return WriteBatchInternal::InsertBeginPrepare(new_batch_.get(),
write_after_commit_,
/* unprepared_batch */ false);
}
Status TimestampRecoveryHandler::MarkEndPrepare(const Slice& name) {
return WriteBatchInternal::InsertEndPrepare(new_batch_.get(), name);
}
Status TimestampRecoveryHandler::MarkCommit(const Slice& name) {
return WriteBatchInternal::MarkCommit(new_batch_.get(), name);
}
Status TimestampRecoveryHandler::MarkCommitWithTimestamp(
const Slice& name, const Slice& commit_ts) {
return WriteBatchInternal::MarkCommitWithTimestamp(new_batch_.get(), name,
commit_ts);
}
Status TimestampRecoveryHandler::MarkRollback(const Slice& name) {
return WriteBatchInternal::MarkRollback(new_batch_.get(), name);
}
Status TimestampRecoveryHandler::MarkNoop(bool /*empty_batch*/) {
return WriteBatchInternal::InsertNoop(new_batch_.get());
}
Status TimestampRecoveryHandler::ReconcileTimestampDiscrepancy( Status TimestampRecoveryHandler::ReconcileTimestampDiscrepancy(
uint32_t cf, const Slice& key, std::string* new_key_buf, Slice* new_key) { uint32_t cf, const Slice& key, std::string* new_key_buf, Slice* new_key) {
assert(handler_valid_); assert(handler_valid_);
@ -304,8 +349,8 @@ Status HandleWriteBatchTimestampSizeDifference(
const WriteBatch* batch, const WriteBatch* batch,
const UnorderedMap<uint32_t, size_t>& running_ts_sz, const UnorderedMap<uint32_t, size_t>& running_ts_sz,
const UnorderedMap<uint32_t, size_t>& record_ts_sz, const UnorderedMap<uint32_t, size_t>& record_ts_sz,
TimestampSizeConsistencyMode check_mode, TimestampSizeConsistencyMode check_mode, bool seq_per_batch,
std::unique_ptr<WriteBatch>* new_batch) { bool batch_per_txn, std::unique_ptr<WriteBatch>* new_batch) {
// Quick path to bypass checking the WriteBatch. // Quick path to bypass checking the WriteBatch.
if (AllRunningColumnFamiliesConsistent(running_ts_sz, record_ts_sz)) { if (AllRunningColumnFamiliesConsistent(running_ts_sz, record_ts_sz)) {
return Status::OK(); return Status::OK();
@ -318,7 +363,8 @@ Status HandleWriteBatchTimestampSizeDifference(
} else if (need_recovery) { } else if (need_recovery) {
assert(new_batch); assert(new_batch);
SequenceNumber sequence = WriteBatchInternal::Sequence(batch); SequenceNumber sequence = WriteBatchInternal::Sequence(batch);
TimestampRecoveryHandler recovery_handler(running_ts_sz, record_ts_sz); TimestampRecoveryHandler recovery_handler(running_ts_sz, record_ts_sz,
seq_per_batch, batch_per_txn);
status = batch->Iterate(&recovery_handler); status = batch->Iterate(&recovery_handler);
if (!status.ok()) { if (!status.ok()) {
return status; return status;

View File

@ -105,7 +105,8 @@ class UserDefinedTimestampSizeRecord {
class TimestampRecoveryHandler : public WriteBatch::Handler { class TimestampRecoveryHandler : public WriteBatch::Handler {
public: public:
TimestampRecoveryHandler(const UnorderedMap<uint32_t, size_t>& running_ts_sz, TimestampRecoveryHandler(const UnorderedMap<uint32_t, size_t>& running_ts_sz,
const UnorderedMap<uint32_t, size_t>& record_ts_sz); const UnorderedMap<uint32_t, size_t>& record_ts_sz,
bool seq_per_batch, bool batch_per_txn);
~TimestampRecoveryHandler() override {} ~TimestampRecoveryHandler() override {}
@ -135,19 +136,18 @@ class TimestampRecoveryHandler : public WriteBatch::Handler {
Status PutBlobIndexCF(uint32_t cf, const Slice& key, Status PutBlobIndexCF(uint32_t cf, const Slice& key,
const Slice& value) override; const Slice& value) override;
Status MarkBeginPrepare(bool) override { return Status::OK(); } Status MarkBeginPrepare(bool unprepare) override;
Status MarkEndPrepare(const Slice&) override { return Status::OK(); } Status MarkEndPrepare(const Slice& name) override;
Status MarkCommit(const Slice&) override { return Status::OK(); } Status MarkCommit(const Slice& name) override;
Status MarkCommitWithTimestamp(const Slice&, const Slice&) override { Status MarkCommitWithTimestamp(const Slice& name,
return Status::OK(); const Slice& commit_ts) override;
}
Status MarkRollback(const Slice&) override { return Status::OK(); } Status MarkRollback(const Slice& name) override;
Status MarkNoop(bool /*empty_batch*/) override { return Status::OK(); } Status MarkNoop(bool empty_batch) override;
std::unique_ptr<WriteBatch>&& TransferNewBatch() { std::unique_ptr<WriteBatch>&& TransferNewBatch() {
assert(new_batch_diff_from_orig_batch_); assert(new_batch_diff_from_orig_batch_);
@ -155,6 +155,16 @@ class TimestampRecoveryHandler : public WriteBatch::Handler {
return std::move(new_batch_); return std::move(new_batch_);
} }
protected:
Handler::OptionState WriteBeforePrepare() const override {
return write_before_prepare_ ? Handler::OptionState::kEnabled
: Handler::OptionState::kDisabled;
}
Handler::OptionState WriteAfterCommit() const override {
return write_after_commit_ ? Handler::OptionState::kEnabled
: Handler::OptionState::kDisabled;
}
private: private:
Status ReconcileTimestampDiscrepancy(uint32_t cf, const Slice& key, Status ReconcileTimestampDiscrepancy(uint32_t cf, const Slice& key,
std::string* new_key_buf, std::string* new_key_buf,
@ -168,6 +178,9 @@ class TimestampRecoveryHandler : public WriteBatch::Handler {
// in the WAL. This only contains non-zero user-defined timestamp size. // in the WAL. This only contains non-zero user-defined timestamp size.
const UnorderedMap<uint32_t, size_t>& record_ts_sz_; const UnorderedMap<uint32_t, size_t>& record_ts_sz_;
bool write_after_commit_;
bool write_before_prepare_;
std::unique_ptr<WriteBatch> new_batch_; std::unique_ptr<WriteBatch> new_batch_;
// Handler is valid upon creation and becomes invalid after its `new_batch_` // Handler is valid upon creation and becomes invalid after its `new_batch_`
// is transferred. // is transferred.
@ -220,8 +233,8 @@ Status HandleWriteBatchTimestampSizeDifference(
const WriteBatch* batch, const WriteBatch* batch,
const UnorderedMap<uint32_t, size_t>& running_ts_sz, const UnorderedMap<uint32_t, size_t>& running_ts_sz,
const UnorderedMap<uint32_t, size_t>& record_ts_sz, const UnorderedMap<uint32_t, size_t>& record_ts_sz,
TimestampSizeConsistencyMode check_mode, TimestampSizeConsistencyMode check_mode, bool seq_per_batch,
std::unique_ptr<WriteBatch>* new_batch = nullptr); bool batch_per_txn, std::unique_ptr<WriteBatch>* new_batch = nullptr);
// This util function is used when opening an existing column family and // This util function is used when opening an existing column family and
// processing its VersionEdit. It does a sanity check for the column family's // processing its VersionEdit. It does a sanity check for the column family's

View File

@ -226,11 +226,13 @@ TEST_F(HandleTimestampSizeDifferenceTest, AllColumnFamiliesConsistent) {
// All `check_mode` pass with OK status and `batch` not checked or updated. // All `check_mode` pass with OK status and `batch` not checked or updated.
ASSERT_OK(HandleWriteBatchTimestampSizeDifference( ASSERT_OK(HandleWriteBatchTimestampSizeDifference(
&batch, running_ts_sz, record_ts_sz, &batch, running_ts_sz, record_ts_sz,
TimestampSizeConsistencyMode::kVerifyConsistency)); TimestampSizeConsistencyMode::kVerifyConsistency,
/* seq_per_batch */ false, /* batch_per_txn */ true));
std::unique_ptr<WriteBatch> new_batch(nullptr); std::unique_ptr<WriteBatch> new_batch(nullptr);
ASSERT_OK(HandleWriteBatchTimestampSizeDifference( ASSERT_OK(HandleWriteBatchTimestampSizeDifference(
&batch, running_ts_sz, record_ts_sz, &batch, running_ts_sz, record_ts_sz,
TimestampSizeConsistencyMode::kReconcileInconsistency, &new_batch)); TimestampSizeConsistencyMode::kReconcileInconsistency,
/* seq_per_batch */ false, /* batch_per_txn */ true, &new_batch));
ASSERT_TRUE(new_batch.get() == nullptr); ASSERT_TRUE(new_batch.get() == nullptr);
} }
@ -245,11 +247,13 @@ TEST_F(HandleTimestampSizeDifferenceTest,
// All `check_mode` pass with OK status and `batch` not checked or updated. // All `check_mode` pass with OK status and `batch` not checked or updated.
ASSERT_OK(HandleWriteBatchTimestampSizeDifference( ASSERT_OK(HandleWriteBatchTimestampSizeDifference(
&batch, running_ts_sz, record_ts_sz, &batch, running_ts_sz, record_ts_sz,
TimestampSizeConsistencyMode::kVerifyConsistency)); TimestampSizeConsistencyMode::kVerifyConsistency,
/* seq_per_batch */ false, /* batch_per_txn */ true));
std::unique_ptr<WriteBatch> new_batch(nullptr); std::unique_ptr<WriteBatch> new_batch(nullptr);
ASSERT_OK(HandleWriteBatchTimestampSizeDifference( ASSERT_OK(HandleWriteBatchTimestampSizeDifference(
&batch, running_ts_sz, record_ts_sz, &batch, running_ts_sz, record_ts_sz,
TimestampSizeConsistencyMode::kReconcileInconsistency, &new_batch)); TimestampSizeConsistencyMode::kReconcileInconsistency,
/* seq_per_batch */ false, /* batch_per_txn */ true, &new_batch));
ASSERT_TRUE(new_batch.get() == nullptr); ASSERT_TRUE(new_batch.get() == nullptr);
} }
@ -263,11 +267,13 @@ TEST_F(HandleTimestampSizeDifferenceTest, InvolvedColumnFamiliesConsistent) {
// All `check_mode` pass with OK status and `batch` not updated. // All `check_mode` pass with OK status and `batch` not updated.
ASSERT_OK(HandleWriteBatchTimestampSizeDifference( ASSERT_OK(HandleWriteBatchTimestampSizeDifference(
&batch, running_ts_sz, record_ts_sz, &batch, running_ts_sz, record_ts_sz,
TimestampSizeConsistencyMode::kVerifyConsistency)); TimestampSizeConsistencyMode::kVerifyConsistency,
/* seq_per_batch */ false, /* batch_per_txn */ true));
std::unique_ptr<WriteBatch> new_batch(nullptr); std::unique_ptr<WriteBatch> new_batch(nullptr);
ASSERT_OK(HandleWriteBatchTimestampSizeDifference( ASSERT_OK(HandleWriteBatchTimestampSizeDifference(
&batch, running_ts_sz, record_ts_sz, &batch, running_ts_sz, record_ts_sz,
TimestampSizeConsistencyMode::kReconcileInconsistency, &new_batch)); TimestampSizeConsistencyMode::kReconcileInconsistency,
/* seq_per_batch */ false, /* batch_per_txn */ true, &new_batch));
ASSERT_TRUE(new_batch.get() == nullptr); ASSERT_TRUE(new_batch.get() == nullptr);
} }
@ -282,13 +288,15 @@ TEST_F(HandleTimestampSizeDifferenceTest,
// families. // families.
ASSERT_TRUE(HandleWriteBatchTimestampSizeDifference( ASSERT_TRUE(HandleWriteBatchTimestampSizeDifference(
&batch, running_ts_sz, record_ts_sz, &batch, running_ts_sz, record_ts_sz,
TimestampSizeConsistencyMode::kVerifyConsistency) TimestampSizeConsistencyMode::kVerifyConsistency,
/* seq_per_batch */ false, /* batch_per_txn */ true)
.IsInvalidArgument()); .IsInvalidArgument());
std::unique_ptr<WriteBatch> new_batch(nullptr); std::unique_ptr<WriteBatch> new_batch(nullptr);
ASSERT_OK(HandleWriteBatchTimestampSizeDifference( ASSERT_OK(HandleWriteBatchTimestampSizeDifference(
&batch, running_ts_sz, record_ts_sz, &batch, running_ts_sz, record_ts_sz,
TimestampSizeConsistencyMode::kReconcileInconsistency, &new_batch)); TimestampSizeConsistencyMode::kReconcileInconsistency,
/* seq_per_batch */ false, /* batch_per_txn */ true, &new_batch));
ASSERT_TRUE(new_batch.get() != nullptr); ASSERT_TRUE(new_batch.get() != nullptr);
CheckContentsWithTimestampStripping(batch, *new_batch, sizeof(uint64_t), CheckContentsWithTimestampStripping(batch, *new_batch, sizeof(uint64_t),
std::nullopt /* dropped_cf */); std::nullopt /* dropped_cf */);
@ -307,13 +315,15 @@ TEST_F(HandleTimestampSizeDifferenceTest,
// families. // families.
ASSERT_TRUE(HandleWriteBatchTimestampSizeDifference( ASSERT_TRUE(HandleWriteBatchTimestampSizeDifference(
&batch, running_ts_sz, record_ts_sz, &batch, running_ts_sz, record_ts_sz,
TimestampSizeConsistencyMode::kVerifyConsistency) TimestampSizeConsistencyMode::kVerifyConsistency,
/* seq_per_batch */ false, /* batch_per_txn */ true)
.IsInvalidArgument()); .IsInvalidArgument());
std::unique_ptr<WriteBatch> new_batch(nullptr); std::unique_ptr<WriteBatch> new_batch(nullptr);
ASSERT_OK(HandleWriteBatchTimestampSizeDifference( ASSERT_OK(HandleWriteBatchTimestampSizeDifference(
&batch, running_ts_sz, record_ts_sz, &batch, running_ts_sz, record_ts_sz,
TimestampSizeConsistencyMode::kReconcileInconsistency, &new_batch)); TimestampSizeConsistencyMode::kReconcileInconsistency,
/* seq_per_batch */ false, /* batch_per_txn */ true, &new_batch));
ASSERT_TRUE(new_batch.get() != nullptr); ASSERT_TRUE(new_batch.get() != nullptr);
CheckContentsWithTimestampPadding(batch, *new_batch, sizeof(uint64_t)); CheckContentsWithTimestampPadding(batch, *new_batch, sizeof(uint64_t));
} }
@ -331,7 +341,8 @@ TEST_F(HandleTimestampSizeDifferenceTest,
// and all related entries copied over to the new WriteBatch. // and all related entries copied over to the new WriteBatch.
ASSERT_OK(HandleWriteBatchTimestampSizeDifference( ASSERT_OK(HandleWriteBatchTimestampSizeDifference(
&batch, running_ts_sz, record_ts_sz, &batch, running_ts_sz, record_ts_sz,
TimestampSizeConsistencyMode::kReconcileInconsistency, &new_batch)); TimestampSizeConsistencyMode::kReconcileInconsistency,
/* seq_per_batch */ false, /* batch_per_txn */ true, &new_batch));
ASSERT_TRUE(new_batch.get() != nullptr); ASSERT_TRUE(new_batch.get() != nullptr);
CheckContentsWithTimestampStripping(batch, *new_batch, sizeof(uint64_t), CheckContentsWithTimestampStripping(batch, *new_batch, sizeof(uint64_t),
@ -346,12 +357,14 @@ TEST_F(HandleTimestampSizeDifferenceTest, UnrecoverableInconsistency) {
ASSERT_TRUE(HandleWriteBatchTimestampSizeDifference( ASSERT_TRUE(HandleWriteBatchTimestampSizeDifference(
&batch, running_ts_sz, record_ts_sz, &batch, running_ts_sz, record_ts_sz,
TimestampSizeConsistencyMode::kVerifyConsistency) TimestampSizeConsistencyMode::kVerifyConsistency,
/* seq_per_batch */ false, /* batch_per_txn */ true)
.IsInvalidArgument()); .IsInvalidArgument());
ASSERT_TRUE(HandleWriteBatchTimestampSizeDifference( ASSERT_TRUE(HandleWriteBatchTimestampSizeDifference(
&batch, running_ts_sz, record_ts_sz, &batch, running_ts_sz, record_ts_sz,
TimestampSizeConsistencyMode::kReconcileInconsistency) TimestampSizeConsistencyMode::kReconcileInconsistency,
/* seq_per_batch */ false, /* batch_per_txn */ true)
.IsInvalidArgument()); .IsInvalidArgument());
} }

View File

@ -418,6 +418,21 @@ TEST_P(WriteCommittedTxnWithTsTest, RecoverFromWal) {
ASSERT_OK(txn3->Prepare()); ASSERT_OK(txn3->Prepare());
txn3.reset(); txn3.reset();
std::unique_ptr<Transaction> txn4(
NewTxn(WriteOptions(), TransactionOptions()));
assert(txn4);
ASSERT_OK(txn4->SetName("no_op_txn"));
txn4.reset();
std::unique_ptr<Transaction> rolled_back_txn(
NewTxn(write_opts, TransactionOptions()));
ASSERT_NE(nullptr, rolled_back_txn);
ASSERT_OK(rolled_back_txn->Put("non_exist0", "donotcare"));
ASSERT_OK(rolled_back_txn->Put(handles_[1], "non_exist1", "donotcare"));
ASSERT_OK(rolled_back_txn->SetName("rolled_back_txn"));
ASSERT_OK(rolled_back_txn->Rollback());
rolled_back_txn.reset();
ASSERT_OK(ReOpenNoDelete(cf_descs, &handles_)); ASSERT_OK(ReOpenNoDelete(cf_descs, &handles_));
{ {
@ -452,9 +467,318 @@ TEST_P(WriteCommittedTxnWithTsTest, RecoverFromWal) {
s = GetFromDb(ReadOptions(), handles_[1], "baz", /*ts=*/24, &value); s = GetFromDb(ReadOptions(), handles_[1], "baz", /*ts=*/24, &value);
ASSERT_TRUE(s.IsNotFound()); ASSERT_TRUE(s.IsNotFound());
Transaction* no_op_txn = db->GetTransactionByName("no_op_txn");
ASSERT_EQ(nullptr, no_op_txn);
s = db->Get(ReadOptions(), handles_[0], "non_exist0", &value);
ASSERT_TRUE(s.IsNotFound());
s = GetFromDb(ReadOptions(), handles_[1], "non_exist1", /*ts=*/24, &value);
ASSERT_TRUE(s.IsNotFound());
} }
} }
TEST_P(WriteCommittedTxnWithTsTest, EnabledUDTDisabledRecoverFromWal) {
// This feature is not compatible with UDT in memtable only.
options.allow_concurrent_memtable_write = false;
ASSERT_OK(ReOpenNoDelete());
ColumnFamilyOptions cf_opts;
cf_opts.comparator = test::BytewiseComparatorWithU64TsWrapper();
cf_opts.persist_user_defined_timestamps = false;
const std::string test_cf_name = "test_cf";
ColumnFamilyHandle* cfh = nullptr;
assert(db);
ASSERT_OK(db->CreateColumnFamily(cf_opts, test_cf_name, &cfh));
delete cfh;
cfh = nullptr;
std::vector<ColumnFamilyDescriptor> cf_descs;
cf_descs.emplace_back(kDefaultColumnFamilyName, options);
cf_descs.emplace_back(test_cf_name, cf_opts);
options.avoid_flush_during_shutdown = true;
ASSERT_OK(ReOpenNoDelete(cf_descs, &handles_));
std::unique_ptr<Transaction> no_op_txn(
NewTxn(WriteOptions(), TransactionOptions()));
ASSERT_NE(nullptr, no_op_txn);
ASSERT_OK(no_op_txn->SetName("no_op_txn"));
no_op_txn.reset();
std::unique_ptr<Transaction> prepared_but_uncommitted_txn(
NewTxn(WriteOptions(), TransactionOptions()));
ASSERT_NE(nullptr, prepared_but_uncommitted_txn);
ASSERT_OK(prepared_but_uncommitted_txn->Put("foo0", "foo_value_0"));
ASSERT_OK(
prepared_but_uncommitted_txn->Put(handles_[1], "foo1", "foo_value_1"));
ASSERT_OK(
prepared_but_uncommitted_txn->SetName("prepared_but_uncommitted_txn"));
ASSERT_OK(prepared_but_uncommitted_txn->Prepare());
prepared_but_uncommitted_txn.reset();
WriteOptions write_opts;
write_opts.sync = true;
std::unique_ptr<Transaction> committed_txn(
NewTxn(write_opts, TransactionOptions()));
ASSERT_NE(nullptr, committed_txn);
ASSERT_OK(committed_txn->Put("bar0", "bar_value_0"));
ASSERT_OK(committed_txn->Put(handles_[1], "bar1", "bar_value_1"));
ASSERT_OK(committed_txn->SetName("committed_txn"));
ASSERT_OK(committed_txn->Prepare());
ASSERT_OK(committed_txn->SetCommitTimestamp(/*ts=*/23));
ASSERT_OK(committed_txn->Commit());
committed_txn.reset();
std::unique_ptr<Transaction> committed_without_prepare_txn(
NewTxn(write_opts, TransactionOptions()));
ASSERT_NE(nullptr, committed_without_prepare_txn);
ASSERT_OK(committed_without_prepare_txn->Put("baz0", "baz_value_0"));
ASSERT_OK(
committed_without_prepare_txn->Put(handles_[1], "baz1", "baz_value_1"));
ASSERT_OK(
committed_without_prepare_txn->SetName("committed_without_prepare_txn"));
ASSERT_OK(committed_without_prepare_txn->SetCommitTimestamp(/*ts=*/24));
ASSERT_OK(committed_without_prepare_txn->Commit());
committed_without_prepare_txn.reset();
std::unique_ptr<Transaction> rolled_back_txn(
NewTxn(write_opts, TransactionOptions()));
assert(rolled_back_txn);
ASSERT_OK(rolled_back_txn->Put("non_exist0", "donotcare"));
ASSERT_OK(rolled_back_txn->Put(handles_[1], "non_exist1", "donotcare"));
ASSERT_OK(rolled_back_txn->SetName("rolled_back_txn"));
ASSERT_OK(rolled_back_txn->Rollback());
rolled_back_txn.reset();
// Reopen and disable UDT to replay WAL entries.
cf_descs[1].options.comparator = BytewiseComparator();
ASSERT_OK(ReOpenNoDelete(cf_descs, &handles_));
{
Transaction* recovered_txn0 = db->GetTransactionByName("no_op_txn");
ASSERT_EQ(nullptr, recovered_txn0);
Transaction* recovered_txn1 =
db->GetTransactionByName("prepared_but_uncommitted_txn");
ASSERT_NE(nullptr, recovered_txn1);
std::string value;
ASSERT_OK(recovered_txn1->Commit());
Status s = db->Get(ReadOptions(), handles_[0], "foo0", &value);
ASSERT_OK(s);
ASSERT_EQ("foo_value_0", value);
s = db->Get(ReadOptions(), handles_[1], "foo1", &value);
ASSERT_OK(s);
ASSERT_EQ("foo_value_1", value);
delete recovered_txn1;
ASSERT_EQ(nullptr, db->GetTransactionByName("committed_txn"));
s = db->Get(ReadOptions(), handles_[0], "bar0", &value);
ASSERT_OK(s);
ASSERT_EQ("bar_value_0", value);
s = db->Get(ReadOptions(), handles_[1], "bar1", &value);
ASSERT_OK(s);
ASSERT_EQ("bar_value_1", value);
ASSERT_EQ(nullptr,
db->GetTransactionByName("committed_without_prepare_txn"));
s = db->Get(ReadOptions(), handles_[0], "baz0", &value);
ASSERT_OK(s);
ASSERT_EQ("baz_value_0", value);
s = db->Get(ReadOptions(), handles_[1], "baz1", &value);
ASSERT_OK(s);
ASSERT_EQ("baz_value_1", value);
ASSERT_EQ(nullptr, db->GetTransactionByName("rolled_back_txn"));
s = db->Get(ReadOptions(), handles_[0], "non_exist0", &value);
ASSERT_TRUE(s.IsNotFound());
s = db->Get(ReadOptions(), handles_[1], "non_exist1", &value);
ASSERT_TRUE(s.IsNotFound());
}
}
TEST_P(WriteCommittedTxnWithTsTest, UDTNewlyEnabledRecoverFromWal) {
ASSERT_OK(ReOpenNoDelete());
ColumnFamilyOptions cf_opts;
const std::string test_cf_name = "test_cf";
ColumnFamilyHandle* cfh = nullptr;
assert(db);
ASSERT_OK(db->CreateColumnFamily(cf_opts, test_cf_name, &cfh));
delete cfh;
cfh = nullptr;
std::vector<ColumnFamilyDescriptor> cf_descs;
cf_descs.emplace_back(kDefaultColumnFamilyName, options);
cf_descs.emplace_back(test_cf_name, cf_opts);
options.avoid_flush_during_shutdown = true;
ASSERT_OK(ReOpenNoDelete(cf_descs, &handles_));
std::unique_ptr<Transaction> no_op_txn(
NewTxn(WriteOptions(), TransactionOptions()));
ASSERT_NE(nullptr, no_op_txn);
ASSERT_OK(no_op_txn->SetName("no_op_txn"));
no_op_txn.reset();
std::unique_ptr<Transaction> prepared_but_uncommitted_txn(
NewTxn(WriteOptions(), TransactionOptions()));
ASSERT_NE(nullptr, prepared_but_uncommitted_txn);
ASSERT_OK(
prepared_but_uncommitted_txn->Put(handles_[0], "foo0", "foo_value_0"));
ASSERT_OK(
prepared_but_uncommitted_txn->Put(handles_[1], "foo1", "foo_value_1"));
ASSERT_OK(
prepared_but_uncommitted_txn->SetName("prepared_but_uncommitted_txn"));
ASSERT_OK(prepared_but_uncommitted_txn->Prepare());
prepared_but_uncommitted_txn.reset();
WriteOptions write_opts;
write_opts.sync = true;
std::unique_ptr<Transaction> committed_txn(
NewTxn(write_opts, TransactionOptions()));
ASSERT_NE(nullptr, committed_txn);
ASSERT_OK(committed_txn->Put("bar0", "bar_value_0"));
ASSERT_OK(committed_txn->Put(handles_[1], "bar1", "bar_value_1"));
ASSERT_OK(committed_txn->SetName("committed_txn"));
ASSERT_OK(committed_txn->Prepare());
ASSERT_OK(committed_txn->Commit());
committed_txn.reset();
std::unique_ptr<Transaction> committed_without_prepare_txn(
NewTxn(write_opts, TransactionOptions()));
assert(committed_without_prepare_txn);
ASSERT_OK(committed_without_prepare_txn->Put("baz0", "baz_value_0"));
ASSERT_OK(
committed_without_prepare_txn->Put(handles_[1], "baz1", "baz_value_1"));
ASSERT_OK(
committed_without_prepare_txn->SetName("committed_without_prepare_txn"));
ASSERT_OK(committed_without_prepare_txn->Commit());
committed_without_prepare_txn.reset();
std::unique_ptr<Transaction> rolled_back_txn(
NewTxn(write_opts, TransactionOptions()));
ASSERT_NE(nullptr, rolled_back_txn);
ASSERT_OK(rolled_back_txn->Put("non_exist0", "donotcare"));
ASSERT_OK(rolled_back_txn->Put(handles_[1], "non_exist1", "donotcare"));
ASSERT_OK(rolled_back_txn->SetName("rolled_back_txn"));
ASSERT_OK(rolled_back_txn->Rollback());
rolled_back_txn.reset();
// Reopen and enable UDT to replay WAL entries.
options.allow_concurrent_memtable_write = false;
cf_descs[1].options.comparator = test::BytewiseComparatorWithU64TsWrapper();
cf_descs[1].options.persist_user_defined_timestamps = false;
ASSERT_OK(ReOpenNoDelete(cf_descs, &handles_));
{
Transaction* recovered_txn1 =
db->GetTransactionByName("prepared_but_uncommitted_txn");
ASSERT_NE(nullptr, recovered_txn1);
std::string value;
ASSERT_OK(recovered_txn1->SetCommitTimestamp(23));
ASSERT_OK(recovered_txn1->Commit());
Status s = db->Get(ReadOptions(), handles_[0], "foo0", &value);
ASSERT_OK(s);
ASSERT_EQ("foo_value_0", value);
s = GetFromDb(ReadOptions(), handles_[1], "foo1", /*ts=*/23, &value);
ASSERT_OK(s);
ASSERT_EQ("foo_value_1", value);
delete recovered_txn1;
ASSERT_EQ(nullptr, db->GetTransactionByName("committed_txn"));
s = db->Get(ReadOptions(), handles_[0], "bar0", &value);
ASSERT_OK(s);
ASSERT_EQ("bar_value_0", value);
s = GetFromDb(ReadOptions(), handles_[1], "bar1", /*ts=*/23, &value);
ASSERT_OK(s);
ASSERT_EQ("bar_value_1", value);
ASSERT_EQ(nullptr,
db->GetTransactionByName("committed_without_prepare_txn"));
s = db->Get(ReadOptions(), handles_[0], "baz0", &value);
ASSERT_OK(s);
ASSERT_EQ("baz_value_0", value);
s = GetFromDb(ReadOptions(), handles_[1], "baz1", /*ts=*/23, &value);
ASSERT_OK(s);
ASSERT_EQ("baz_value_1", value);
ASSERT_EQ(nullptr, db->GetTransactionByName("rolled_back_txn"));
s = db->Get(ReadOptions(), handles_[0], "non_exist0", &value);
ASSERT_TRUE(s.IsNotFound());
s = GetFromDb(ReadOptions(), handles_[1], "non_exist1", /*ts=*/23, &value);
ASSERT_TRUE(s.IsNotFound());
}
}
TEST_P(WriteCommittedTxnWithTsTest, ChangeFromWriteCommittedAndDisableUDT) {
// This feature is not compatible with UDT in memtable only.
options.allow_concurrent_memtable_write = false;
ASSERT_OK(ReOpenNoDelete());
ColumnFamilyOptions cf_opts;
cf_opts.comparator = test::BytewiseComparatorWithU64TsWrapper();
cf_opts.persist_user_defined_timestamps = false;
const std::string test_cf_name = "test_cf";
ColumnFamilyHandle* cfh = nullptr;
assert(db);
ASSERT_OK(db->CreateColumnFamily(cf_opts, test_cf_name, &cfh));
delete cfh;
cfh = nullptr;
std::vector<ColumnFamilyDescriptor> cf_descs;
cf_descs.emplace_back(kDefaultColumnFamilyName, options);
cf_descs.emplace_back(test_cf_name, cf_opts);
options.avoid_flush_during_shutdown = true;
ASSERT_OK(ReOpenNoDelete(cf_descs, &handles_));
std::unique_ptr<Transaction> prepared_but_uncommitted_txn(
NewTxn(WriteOptions(), TransactionOptions()));
assert(prepared_but_uncommitted_txn);
ASSERT_OK(prepared_but_uncommitted_txn->Put("foo0", "foo_value_0"));
ASSERT_OK(
prepared_but_uncommitted_txn->Put(handles_[1], "foo1", "foo_value_1"));
ASSERT_OK(
prepared_but_uncommitted_txn->SetName("prepared_but_uncommitted_txn"));
ASSERT_OK(prepared_but_uncommitted_txn->Prepare());
prepared_but_uncommitted_txn.reset();
WriteOptions write_opts;
write_opts.sync = true;
std::unique_ptr<Transaction> committed_txn(
NewTxn(write_opts, TransactionOptions()));
assert(committed_txn);
ASSERT_OK(committed_txn->Put("bar0", "bar_value_0"));
ASSERT_OK(committed_txn->Put(handles_[1], "bar1", "bar_value_1"));
ASSERT_OK(committed_txn->SetName("committed_txn"));
ASSERT_OK(committed_txn->Prepare());
ASSERT_OK(committed_txn->SetCommitTimestamp(/*ts=*/23));
ASSERT_OK(committed_txn->Commit());
committed_txn.reset();
std::unique_ptr<Transaction> committed_without_prepare_txn(
NewTxn(write_opts, TransactionOptions()));
assert(committed_without_prepare_txn);
ASSERT_OK(committed_without_prepare_txn->Put("baz0", "baz_value_0"));
ASSERT_OK(
committed_without_prepare_txn->Put(handles_[1], "baz1", "baz_value_1"));
ASSERT_OK(
committed_without_prepare_txn->SetName("committed_without_prepare_txn"));
ASSERT_OK(committed_without_prepare_txn->SetCommitTimestamp(/*ts=*/24));
ASSERT_OK(committed_without_prepare_txn->Commit());
committed_without_prepare_txn.reset();
// Disable UDT and change write policy.
cf_descs[1].options.comparator = BytewiseComparator();
txn_db_options.write_policy = TxnDBWritePolicy::WRITE_PREPARED;
ASSERT_NOK(ReOpenNoDelete(cf_descs, &handles_));
txn_db_options.write_policy = TxnDBWritePolicy::WRITE_UNPREPARED;
ASSERT_NOK(ReOpenNoDelete(cf_descs, &handles_));
}
TEST_P(WriteCommittedTxnWithTsTest, TransactionDbLevelApi) { TEST_P(WriteCommittedTxnWithTsTest, TransactionDbLevelApi) {
ASSERT_OK(ReOpenNoDelete()); ASSERT_OK(ReOpenNoDelete());