From 3b6dc049f788c7a562ee874fd751bb1abb12f21b Mon Sep 17 00:00:00 2001 From: Yanqin Jin Date: Tue, 8 Mar 2022 16:20:59 -0800 Subject: [PATCH] Support user-defined timestamps in write-committed txns (#9629) Summary: Pull Request resolved: https://github.com/facebook/rocksdb/pull/9629 Pessimistic transactions use pessimistic concurrency control, i.e. locking. Keys are locked upon first operation that writes the key or has the intention of writing. For example, `PessimisticTransaction::Put()`, `PessimisticTransaction::Delete()`, `PessimisticTransaction::SingleDelete()` will write to or delete a key, while `PessimisticTransaction::GetForUpdate()` is used by application to indicate to RocksDB that the transaction has the intention of performing write operation later in the same transaction. Pessimistic transactions support two-phase commit (2PC). A transaction can be `Prepared()`'ed and then `Commit()`. The prepare phase is similar to a promise: once `Prepare()` succeeds, the transaction has acquired the necessary resources to commit. The resources include locks, persistence of WAL, etc. Write-committed transaction is the default pessimistic transaction implementation. In RocksDB write-committed transaction, `Prepare()` will write data to the WAL as a prepare section. `Commit()` will write a commit marker to the WAL and then write data to the memtables. While writing to the memtables, different keys in the transaction's write batch will be assigned different sequence numbers in ascending order. Until commit/rollback, the transaction holds locks on the keys so that no other transaction can write to the same keys. Furthermore, the keys' sequence numbers represent the order in which they are committed and should be made visible. This is convenient for us to implement support for user-defined timestamps. Since column families with and without timestamps can co-exist in the same database, a transaction may or may not involve timestamps. Based on this observation, we add two optional members to each `PessimisticTransaction`, `read_timestamp_` and `commit_timestamp_`. If no key in the transaction's write batch has timestamp, then setting these two variables do not have any effect. For the rest of this commit, we discuss only the cases when these two variables are meaningful. read_timestamp_ is used mainly for validation, and should be set before first call to `GetForUpdate()`. Otherwise, the latter will return non-ok status. `GetForUpdate()` calls `TryLock()` that can verify if another transaction has written the same key since `read_timestamp_` till this call to `GetForUpdate()`. If another transaction has indeed written the same key, then validation fails, and RocksDB allows this transaction to refine `read_timestamp_` by increasing it. Note that a transaction can still use `Get()` with a different timestamp to read, but the result of the read should not be used to determine data that will be written later. commit_timestamp_ must be set after finishing writing and before transaction commit. This applies to both 2PC and non-2PC cases. In the case of 2PC, it's usually set after prepare phase succeeds. We currently require that the commit timestamp be chosen after all keys are locked. This means we disallow the `TransactionDB`-level APIs if user-defined timestamp is used by the transaction. Specifically, calling `PessimisticTransactionDB::Put()`, `PessimisticTransactionDB::Delete()`, `PessimisticTransactionDB::SingleDelete()`, etc. will return non-ok status because they specify timestamps before locking the keys. Users are also prompted to use the `Transaction` APIs when they receive the non-ok status. Reviewed By: ltamasi Differential Revision: D31822445 fbshipit-source-id: b82abf8e230216dc89cc519564a588224a88fd43 --- CMakeLists.txt | 1 + HISTORY.md | 1 + Makefile | 3 + TARGETS | 6 + db/db_impl/db_impl_write.cc | 13 +- src.mk | 1 + .../transactions/pessimistic_transaction.cc | 360 +++++++++++- .../transactions/pessimistic_transaction.h | 69 +++ .../pessimistic_transaction_db.cc | 34 +- .../transactions/pessimistic_transaction_db.h | 33 +- utilities/transactions/transaction_base.h | 5 +- utilities/transactions/transaction_test.cc | 22 +- utilities/transactions/transaction_test.h | 44 +- .../write_committed_transaction_ts_test.cc | 515 ++++++++++++++++++ 14 files changed, 1087 insertions(+), 20 deletions(-) create mode 100644 utilities/transactions/write_committed_transaction_ts_test.cc diff --git a/CMakeLists.txt b/CMakeLists.txt index 5de221b0ff..9b4f73050c 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1334,6 +1334,7 @@ if(WITH_TESTS) utilities/transactions/optimistic_transaction_test.cc utilities/transactions/transaction_test.cc utilities/transactions/lock/point/point_lock_manager_test.cc + utilities/transactions/write_committed_transaction_ts_test.cc utilities/transactions/write_prepared_transaction_test.cc utilities/transactions/write_unprepared_transaction_test.cc utilities/transactions/lock/range/range_locking_test.cc diff --git a/HISTORY.md b/HISTORY.md index 03720b69fc..ecc8bf1cbb 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -2,6 +2,7 @@ ## Unreleased ### New Features * Allow WriteBatchWithIndex to index a WriteBatch that includes keys with user-defined timestamps. The index itself does not have timestamp. +* Add support for user-defined timestamps to write-committed transaction without API change. The `TransactionDB` layer APIs do not allow timestamps because we require that all user-defined-timestamps-aware operations go through the `Transaction` APIs. * Added BlobDB options to `ldb` * `BlockBasedTableOptions::detect_filter_construct_corruption` can now be dynamically configured using `DB::SetOptions`. diff --git a/Makefile b/Makefile index 7cd92595f6..afc141bd2b 100644 --- a/Makefile +++ b/Makefile @@ -1866,6 +1866,9 @@ point_lock_manager_test: utilities/transactions/lock/point/point_lock_manager_te transaction_test: $(OBJ_DIR)/utilities/transactions/transaction_test.o $(TEST_LIBRARY) $(LIBRARY) $(AM_LINK) +write_committed_transaction_ts_test: $(OBJ_DIR)/utilities/transactions/write_committed_transaction_ts_test.o $(TEST_LIBRARY) $(LIBRARY) + $(AM_LINK) + write_prepared_transaction_test: $(OBJ_DIR)/utilities/transactions/write_prepared_transaction_test.o $(TEST_LIBRARY) $(LIBRARY) $(AM_LINK) diff --git a/TARGETS b/TARGETS index 126d3701a2..fdc71a1c4e 100644 --- a/TARGETS +++ b/TARGETS @@ -6120,6 +6120,12 @@ cpp_unittest_wrapper(name="write_callback_test", extra_compiler_flags=[]) +cpp_unittest_wrapper(name="write_committed_transaction_ts_test", + srcs=["utilities/transactions/write_committed_transaction_ts_test.cc"], + deps=[":rocksdb_test_lib"], + extra_compiler_flags=[]) + + cpp_unittest_wrapper(name="write_controller_test", srcs=["db/write_controller_test.cc"], deps=[":rocksdb_test_lib"], diff --git a/db/db_impl/db_impl_write.cc b/db/db_impl/db_impl_write.cc index 61910ede41..bb9eae90ff 100644 --- a/db/db_impl/db_impl_write.cc +++ b/db/db_impl/db_impl_write.cc @@ -130,7 +130,18 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, assert(!seq_per_batch_ || batch_cnt != 0); if (my_batch == nullptr) { return Status::Corruption("Batch is nullptr!"); - } else if (WriteBatchInternal::TimestampsUpdateNeeded(*my_batch)) { + } else if (!disable_memtable && + WriteBatchInternal::TimestampsUpdateNeeded(*my_batch)) { + // If writing to memtable, then we require the caller to set/update the + // timestamps for the keys in the write batch. + // Otherwise, it means we are just writing to the WAL, and we allow + // timestamps unset for the keys in the write batch. This can happen if we + // use TransactionDB with write-committed policy, and we currently do not + // support user-defined timestamp with other policies. + // In the prepare phase, a transaction can write the batch to the WAL + // without inserting to memtable. The keys in the batch do not have to be + // assigned timestamps because they will be used only during recovery if + // there is a commit marker which includes their commit timestamp. return Status::InvalidArgument("write batch must have timestamp(s) set"); } else if (write_options.rate_limiter_priority != Env::IO_TOTAL && write_options.rate_limiter_priority != Env::IO_USER) { diff --git a/src.mk b/src.mk index f8c1f03079..705c6980ce 100644 --- a/src.mk +++ b/src.mk @@ -584,6 +584,7 @@ TEST_MAIN_SOURCES = \ utilities/transactions/lock/point/point_lock_manager_test.cc \ utilities/transactions/write_prepared_transaction_test.cc \ utilities/transactions/write_unprepared_transaction_test.cc \ + utilities/transactions/write_committed_transaction_ts_test.cc \ utilities/ttl/ttl_test.cc \ utilities/util_merge_operators_test.cc \ utilities/write_batch_with_index/write_batch_with_index_test.cc \ diff --git a/utilities/transactions/pessimistic_transaction.cc b/utilities/transactions/pessimistic_transaction.cc index d59a62fc19..9e45b71e92 100644 --- a/utilities/transactions/pessimistic_transaction.cc +++ b/utilities/transactions/pessimistic_transaction.cc @@ -25,6 +25,7 @@ #include "util/string_util.h" #include "utilities/transactions/pessimistic_transaction_db.h" #include "utilities/transactions/transaction_util.h" +#include "utilities/write_batch_with_index/write_batch_with_index_internal.h" namespace ROCKSDB_NAMESPACE { @@ -135,6 +136,274 @@ WriteCommittedTxn::WriteCommittedTxn(TransactionDB* txn_db, const TransactionOptions& txn_options) : PessimisticTransaction(txn_db, write_options, txn_options) {} +Status WriteCommittedTxn::GetForUpdate(const ReadOptions& read_options, + ColumnFamilyHandle* column_family, + const Slice& key, std::string* value, + bool exclusive, const bool do_validate) { + return GetForUpdateImpl(read_options, column_family, key, value, exclusive, + do_validate); +} + +Status WriteCommittedTxn::GetForUpdate(const ReadOptions& read_options, + ColumnFamilyHandle* column_family, + const Slice& key, + PinnableSlice* pinnable_val, + bool exclusive, const bool do_validate) { + return GetForUpdateImpl(read_options, column_family, key, pinnable_val, + exclusive, do_validate); +} + +template +inline Status WriteCommittedTxn::GetForUpdateImpl( + const ReadOptions& read_options, ColumnFamilyHandle* column_family, + const Slice& key, TValue* value, bool exclusive, const bool do_validate) { + column_family = + column_family ? column_family : db_impl_->DefaultColumnFamily(); + assert(column_family); + if (!read_options.timestamp) { + const Comparator* const ucmp = column_family->GetComparator(); + assert(ucmp); + size_t ts_sz = ucmp->timestamp_size(); + if (0 == ts_sz) { + return TransactionBaseImpl::GetForUpdate(read_options, column_family, key, + value, exclusive, do_validate); + } + } else { + Status s = db_impl_->FailIfTsSizesMismatch(column_family, + *(read_options.timestamp)); + if (!s.ok()) { + return s; + } + } + + if (!do_validate) { + return Status::InvalidArgument( + "If do_validate is false then GetForUpdate with read_timestamp is not " + "defined."); + } else if (kMaxTxnTimestamp == read_timestamp_) { + return Status::InvalidArgument("read_timestamp must be set for validation"); + } + + if (!read_options.timestamp) { + ReadOptions read_opts_copy = read_options; + char ts_buf[sizeof(kMaxTxnTimestamp)]; + EncodeFixed64(ts_buf, read_timestamp_); + Slice ts(ts_buf, sizeof(ts_buf)); + read_opts_copy.timestamp = &ts; + return TransactionBaseImpl::GetForUpdate(read_opts_copy, column_family, key, + value, exclusive, do_validate); + } + assert(read_options.timestamp); + const char* const ts_buf = read_options.timestamp->data(); + assert(read_options.timestamp->size() == sizeof(kMaxTxnTimestamp)); + TxnTimestamp ts = DecodeFixed64(ts_buf); + if (ts != read_timestamp_) { + return Status::InvalidArgument("Must read from the same read_timestamp"); + } + return TransactionBaseImpl::GetForUpdate(read_options, column_family, key, + value, exclusive, do_validate); +} + +Status WriteCommittedTxn::Put(ColumnFamilyHandle* column_family, + const Slice& key, const Slice& value, + const bool assume_tracked) { + const bool do_validate = !assume_tracked; + return Operate(column_family, key, do_validate, assume_tracked, + [column_family, &key, &value, this]() { + Status s = + GetBatchForWrite()->Put(column_family, key, value); + if (s.ok()) { + ++num_puts_; + } + return s; + }); +} + +Status WriteCommittedTxn::Put(ColumnFamilyHandle* column_family, + const SliceParts& key, const SliceParts& value, + const bool assume_tracked) { + const bool do_validate = !assume_tracked; + return Operate(column_family, key, do_validate, assume_tracked, + [column_family, &key, &value, this]() { + Status s = + GetBatchForWrite()->Put(column_family, key, value); + if (s.ok()) { + ++num_puts_; + } + return s; + }); +} + +Status WriteCommittedTxn::PutUntracked(ColumnFamilyHandle* column_family, + const Slice& key, const Slice& value) { + return Operate( + column_family, key, /*do_validate=*/false, + /*assume_tracked=*/false, [column_family, &key, &value, this]() { + Status s = GetBatchForWrite()->Put(column_family, key, value); + if (s.ok()) { + ++num_puts_; + } + return s; + }); +} + +Status WriteCommittedTxn::PutUntracked(ColumnFamilyHandle* column_family, + const SliceParts& key, + const SliceParts& value) { + return Operate( + column_family, key, /*do_validate=*/false, + /*assume_tracked=*/false, [column_family, &key, &value, this]() { + Status s = GetBatchForWrite()->Put(column_family, key, value); + if (s.ok()) { + ++num_puts_; + } + return s; + }); +} + +Status WriteCommittedTxn::Delete(ColumnFamilyHandle* column_family, + const Slice& key, const bool assume_tracked) { + const bool do_validate = !assume_tracked; + return Operate(column_family, key, do_validate, assume_tracked, + [column_family, &key, this]() { + Status s = GetBatchForWrite()->Delete(column_family, key); + if (s.ok()) { + ++num_deletes_; + } + return s; + }); +} + +Status WriteCommittedTxn::Delete(ColumnFamilyHandle* column_family, + const SliceParts& key, + const bool assume_tracked) { + const bool do_validate = !assume_tracked; + return Operate(column_family, key, do_validate, assume_tracked, + [column_family, &key, this]() { + Status s = GetBatchForWrite()->Delete(column_family, key); + if (s.ok()) { + ++num_deletes_; + } + return s; + }); +} + +Status WriteCommittedTxn::DeleteUntracked(ColumnFamilyHandle* column_family, + const Slice& key) { + return Operate(column_family, key, /*do_validate=*/false, + /*assume_tracked=*/false, [column_family, &key, this]() { + Status s = GetBatchForWrite()->Delete(column_family, key); + if (s.ok()) { + ++num_deletes_; + } + return s; + }); +} + +Status WriteCommittedTxn::DeleteUntracked(ColumnFamilyHandle* column_family, + const SliceParts& key) { + return Operate(column_family, key, /*do_validate=*/false, + /*assume_tracked=*/false, [column_family, &key, this]() { + Status s = GetBatchForWrite()->Delete(column_family, key); + if (s.ok()) { + ++num_deletes_; + } + return s; + }); +} + +Status WriteCommittedTxn::SingleDelete(ColumnFamilyHandle* column_family, + const Slice& key, + const bool assume_tracked) { + const bool do_validate = !assume_tracked; + return Operate(column_family, key, do_validate, assume_tracked, + [column_family, &key, this]() { + Status s = + GetBatchForWrite()->SingleDelete(column_family, key); + if (s.ok()) { + ++num_deletes_; + } + return s; + }); +} + +Status WriteCommittedTxn::SingleDelete(ColumnFamilyHandle* column_family, + const SliceParts& key, + const bool assume_tracked) { + const bool do_validate = !assume_tracked; + return Operate(column_family, key, do_validate, assume_tracked, + [column_family, &key, this]() { + Status s = + GetBatchForWrite()->SingleDelete(column_family, key); + if (s.ok()) { + ++num_deletes_; + } + return s; + }); +} + +Status WriteCommittedTxn::SingleDeleteUntracked( + ColumnFamilyHandle* column_family, const Slice& key) { + return Operate(column_family, key, /*do_validate=*/false, + /*assume_tracked=*/false, [column_family, &key, this]() { + Status s = + GetBatchForWrite()->SingleDelete(column_family, key); + if (s.ok()) { + ++num_deletes_; + } + return s; + }); +} + +Status WriteCommittedTxn::Merge(ColumnFamilyHandle* column_family, + const Slice& key, const Slice& value, + const bool assume_tracked) { + const bool do_validate = !assume_tracked; + return Operate(column_family, key, do_validate, assume_tracked, + [column_family, &key, &value, this]() { + Status s = + GetBatchForWrite()->Merge(column_family, key, value); + if (s.ok()) { + ++num_merges_; + } + return s; + }); +} + +template +Status WriteCommittedTxn::Operate(ColumnFamilyHandle* column_family, + const TKey& key, const bool do_validate, + const bool assume_tracked, + TOperation&& operation) { + Status s; + if constexpr (std::is_same_v) { + s = TryLock(column_family, key, /*read_only=*/false, /*exclusive=*/true, + do_validate, assume_tracked); + } else if constexpr (std::is_same_v) { + std::string key_buf; + Slice contiguous_key(key, &key_buf); + s = TryLock(column_family, contiguous_key, /*read_only=*/false, + /*exclusive=*/true, do_validate, assume_tracked); + } + if (!s.ok()) { + return s; + } + column_family = + column_family ? column_family : db_impl_->DefaultColumnFamily(); + assert(column_family); + const Comparator* const ucmp = column_family->GetComparator(); + assert(ucmp); + size_t ts_sz = ucmp->timestamp_size(); + if (ts_sz > 0) { + assert(ts_sz == sizeof(TxnTimestamp)); + if (!IndexingEnabled()) { + cfs_with_ts_tracked_when_indexing_disabled_.insert( + column_family->GetID()); + } + } + return operation(); +} + Status WriteCommittedTxn::SetReadTimestampForValidation(TxnTimestamp ts) { if (read_timestamp_ < kMaxTxnTimestamp && ts < read_timestamp_) { return Status::InvalidArgument( @@ -154,6 +423,17 @@ Status WriteCommittedTxn::SetCommitTimestamp(TxnTimestamp ts) { } Status PessimisticTransaction::CommitBatch(WriteBatch* batch) { + if (batch && WriteBatchInternal::HasKeyWithTimestamp(*batch)) { + // CommitBatch() needs to lock the keys in the batch. + // However, the application also needs to specify the timestamp for the + // keys in batch before calling this API. + // This means timestamp order may violate the order of locking, thus + // violate the sequence number order for the same user key. + // Therefore, we disallow this operation for now. + return Status::NotSupported( + "Batch to commit includes timestamp assigned before locking"); + } + std::unique_ptr keys_to_unlock(lock_tracker_factory_.Create()); Status s = LockBatch(batch, keys_to_unlock.get()); @@ -369,9 +649,41 @@ Status PessimisticTransaction::Commit() { } Status WriteCommittedTxn::CommitWithoutPrepareInternal() { + WriteBatchWithIndex* wbwi = GetWriteBatch(); + assert(wbwi); + WriteBatch* wb = wbwi->GetWriteBatch(); + assert(wb); + + const bool needs_ts = WriteBatchInternal::HasKeyWithTimestamp(*wb); + if (needs_ts && commit_timestamp_ == kMaxTxnTimestamp) { + return Status::InvalidArgument("Must assign a commit timestamp"); + } + + if (needs_ts) { + assert(commit_timestamp_ != kMaxTxnTimestamp); + char commit_ts_buf[sizeof(kMaxTxnTimestamp)]; + EncodeFixed64(commit_ts_buf, commit_timestamp_); + Slice commit_ts(commit_ts_buf, sizeof(commit_ts_buf)); + + Status s = + wb->UpdateTimestamps(commit_ts, [wbwi, this](uint32_t cf) -> size_t { + auto cf_iter = cfs_with_ts_tracked_when_indexing_disabled_.find(cf); + if (cf_iter != cfs_with_ts_tracked_when_indexing_disabled_.end()) { + return sizeof(kMaxTxnTimestamp); + } + const Comparator* ucmp = + WriteBatchWithIndexInternal::GetUserComparator(*wbwi, cf); + return ucmp ? ucmp->timestamp_size() + : std::numeric_limits::max(); + }); + if (!s.ok()) { + return s; + } + } + uint64_t seq_used = kMaxSequenceNumber; auto s = - db_impl_->WriteImpl(write_options_, GetWriteBatch()->GetWriteBatch(), + db_impl_->WriteImpl(write_options_, wb, /*callback*/ nullptr, /*log_used*/ nullptr, /*log_ref*/ 0, /*disable_memtable*/ false, &seq_used); assert(!s.ok() || seq_used != kMaxSequenceNumber); @@ -394,11 +706,46 @@ Status WriteCommittedTxn::CommitBatchInternal(WriteBatch* batch, size_t) { } Status WriteCommittedTxn::CommitInternal() { + WriteBatchWithIndex* wbwi = GetWriteBatch(); + assert(wbwi); + WriteBatch* wb = wbwi->GetWriteBatch(); + assert(wb); + + const bool needs_ts = WriteBatchInternal::HasKeyWithTimestamp(*wb); + if (needs_ts && commit_timestamp_ == kMaxTxnTimestamp) { + return Status::InvalidArgument("Must assign a commit timestamp"); + } // We take the commit-time batch and append the Commit marker. // The Memtable will ignore the Commit marker in non-recovery mode WriteBatch* working_batch = GetCommitTimeWriteBatch(); - auto s = WriteBatchInternal::MarkCommit(working_batch, name_); - assert(s.ok()); + + Status s; + if (!needs_ts) { + s = WriteBatchInternal::MarkCommit(working_batch, name_); + } else { + assert(commit_timestamp_ != kMaxTxnTimestamp); + char commit_ts_buf[sizeof(kMaxTxnTimestamp)]; + EncodeFixed64(commit_ts_buf, commit_timestamp_); + Slice commit_ts(commit_ts_buf, sizeof(commit_ts_buf)); + s = WriteBatchInternal::MarkCommitWithTimestamp(working_batch, name_, + commit_ts); + if (s.ok()) { + s = wb->UpdateTimestamps(commit_ts, [wbwi, this](uint32_t cf) -> size_t { + if (cfs_with_ts_tracked_when_indexing_disabled_.find(cf) != + cfs_with_ts_tracked_when_indexing_disabled_.end()) { + return sizeof(kMaxTxnTimestamp); + } + const Comparator* ucmp = + WriteBatchWithIndexInternal::GetUserComparator(*wbwi, cf); + return ucmp ? ucmp->timestamp_size() + : std::numeric_limits::max(); + }); + } + } + + if (!s.ok()) { + return s; + } // any operations appended to this working_batch will be ignored from WAL working_batch->MarkWalTerminationPoint(); @@ -406,8 +753,7 @@ Status WriteCommittedTxn::CommitInternal() { // insert prepared batch into Memtable only skipping WAL. // Memtable will ignore BeginPrepare/EndPrepare markers // in non recovery mode and simply insert the values - s = WriteBatchInternal::Append(working_batch, - GetWriteBatch()->GetWriteBatch()); + s = WriteBatchInternal::Append(working_batch, wb); assert(s.ok()); uint64_t seq_used = kMaxSequenceNumber; @@ -489,6 +835,10 @@ Status PessimisticTransaction::RollbackToSavePoint() { // On success, caller should unlock keys_to_unlock Status PessimisticTransaction::LockBatch(WriteBatch* batch, LockTracker* keys_to_unlock) { + if (!batch) { + return Status::InvalidArgument("batch is nullptr"); + } + class Handler : public WriteBatch::Handler { public: // Sorted map of column_family_id to sorted set of keys. diff --git a/utilities/transactions/pessimistic_transaction.h b/utilities/transactions/pessimistic_transaction.h index 6908aa7a0d..609bcd6005 100644 --- a/utilities/transactions/pessimistic_transaction.h +++ b/utilities/transactions/pessimistic_transaction.h @@ -223,10 +223,73 @@ class WriteCommittedTxn : public PessimisticTransaction { ~WriteCommittedTxn() override {} + using TransactionBaseImpl::GetForUpdate; + Status GetForUpdate(const ReadOptions& read_options, + ColumnFamilyHandle* column_family, const Slice& key, + std::string* value, bool exclusive, + const bool do_validate) override; + Status GetForUpdate(const ReadOptions& read_options, + ColumnFamilyHandle* column_family, const Slice& key, + PinnableSlice* pinnable_val, bool exclusive, + const bool do_validate) override; + + using TransactionBaseImpl::Put; + // `key` does NOT include timestamp even when it's enabled. + Status Put(ColumnFamilyHandle* column_family, const Slice& key, + const Slice& value, const bool assume_tracked = false) override; + Status Put(ColumnFamilyHandle* column_family, const SliceParts& key, + const SliceParts& value, + const bool assume_tracked = false) override; + + using TransactionBaseImpl::PutUntracked; + Status PutUntracked(ColumnFamilyHandle* column_family, const Slice& key, + const Slice& value) override; + Status PutUntracked(ColumnFamilyHandle* column_family, const SliceParts& key, + const SliceParts& value) override; + + using TransactionBaseImpl::Delete; + // `key` does NOT include timestamp even when it's enabled. + Status Delete(ColumnFamilyHandle* column_family, const Slice& key, + const bool assume_tracked = false) override; + Status Delete(ColumnFamilyHandle* column_family, const SliceParts& key, + const bool assume_tracked = false) override; + + using TransactionBaseImpl::DeleteUntracked; + Status DeleteUntracked(ColumnFamilyHandle* column_family, + const Slice& key) override; + Status DeleteUntracked(ColumnFamilyHandle* column_family, + const SliceParts& key) override; + + using TransactionBaseImpl::SingleDelete; + // `key` does NOT include timestamp even when it's enabled. + Status SingleDelete(ColumnFamilyHandle* column_family, const Slice& key, + const bool assume_tracked = false) override; + Status SingleDelete(ColumnFamilyHandle* column_family, const SliceParts& key, + const bool assume_tracked = false) override; + + using TransactionBaseImpl::SingleDeleteUntracked; + Status SingleDeleteUntracked(ColumnFamilyHandle* column_family, + const Slice& key) override; + + using TransactionBaseImpl::Merge; + Status Merge(ColumnFamilyHandle* column_family, const Slice& key, + const Slice& value, const bool assume_tracked = false) override; + Status SetReadTimestampForValidation(TxnTimestamp ts) override; Status SetCommitTimestamp(TxnTimestamp ts) override; private: + template + Status GetForUpdateImpl(const ReadOptions& read_options, + ColumnFamilyHandle* column_family, const Slice& key, + TValue* value, bool exclusive, + const bool do_validate); + + template + Status Operate(ColumnFamilyHandle* column_family, const TKey& key, + const bool do_validate, const bool assume_tracked, + TOperation&& operation); + Status PrepareInternal() override; Status CommitWithoutPrepareInternal() override; @@ -236,6 +299,12 @@ class WriteCommittedTxn : public PessimisticTransaction { Status CommitInternal() override; Status RollbackInternal() override; + + // Column families that enable timestamps and whose data are written when + // indexing_enabled_ is false. If a key is written when indexing_enabled_ is + // true, then the corresponding column family is not added to cfs_with_ts + // even if it enables timestamp. + std::unordered_set cfs_with_ts_tracked_when_indexing_disabled_; }; } // namespace ROCKSDB_NAMESPACE diff --git a/utilities/transactions/pessimistic_transaction_db.cc b/utilities/transactions/pessimistic_transaction_db.cc index c5eeb5398f..c1e3a2ab2e 100644 --- a/utilities/transactions/pessimistic_transaction_db.cc +++ b/utilities/transactions/pessimistic_transaction_db.cc @@ -86,8 +86,10 @@ Status PessimisticTransactionDB::VerifyCFOptions( << " timestamp size is " << ts_sz << " bytes"; return Status::InvalidArgument(oss.str()); } - // TODO: Update this check once timestamp is supported. - return Status::NotSupported("Transaction DB does not support timestamp"); + if (txn_db_options_.write_policy != WRITE_COMMITTED) { + return Status::NotSupported("Only WriteCommittedTxn supports timestamp"); + } + return Status::OK(); } Status PessimisticTransactionDB::Initialize( @@ -442,7 +444,10 @@ Transaction* PessimisticTransactionDB::BeginInternalTransaction( Status PessimisticTransactionDB::Put(const WriteOptions& options, ColumnFamilyHandle* column_family, const Slice& key, const Slice& val) { - Status s; + Status s = FailIfCfEnablesTs(this, column_family); + if (!s.ok()) { + return s; + } Transaction* txn = BeginInternalTransaction(options); txn->DisableIndexing(); @@ -463,7 +468,10 @@ Status PessimisticTransactionDB::Put(const WriteOptions& options, Status PessimisticTransactionDB::Delete(const WriteOptions& wopts, ColumnFamilyHandle* column_family, const Slice& key) { - Status s; + Status s = FailIfCfEnablesTs(this, column_family); + if (!s.ok()) { + return s; + } Transaction* txn = BeginInternalTransaction(wopts); txn->DisableIndexing(); @@ -485,7 +493,10 @@ Status PessimisticTransactionDB::Delete(const WriteOptions& wopts, Status PessimisticTransactionDB::SingleDelete(const WriteOptions& wopts, ColumnFamilyHandle* column_family, const Slice& key) { - Status s; + Status s = FailIfCfEnablesTs(this, column_family); + if (!s.ok()) { + return s; + } Transaction* txn = BeginInternalTransaction(wopts); txn->DisableIndexing(); @@ -507,7 +518,10 @@ Status PessimisticTransactionDB::SingleDelete(const WriteOptions& wopts, Status PessimisticTransactionDB::Merge(const WriteOptions& options, ColumnFamilyHandle* column_family, const Slice& key, const Slice& value) { - Status s; + Status s = FailIfCfEnablesTs(this, column_family); + if (!s.ok()) { + return s; + } Transaction* txn = BeginInternalTransaction(options); txn->DisableIndexing(); @@ -533,6 +547,10 @@ Status PessimisticTransactionDB::Write(const WriteOptions& opts, Status WriteCommittedTxnDB::Write(const WriteOptions& opts, WriteBatch* updates) { + Status s = FailIfBatchHasTs(updates); + if (!s.ok()) { + return s; + } if (txn_db_options_.skip_concurrency_control) { return db_impl_->Write(opts, updates); } else { @@ -543,6 +561,10 @@ Status WriteCommittedTxnDB::Write(const WriteOptions& opts, Status WriteCommittedTxnDB::Write( const WriteOptions& opts, const TransactionDBWriteOptimizations& optimizations, WriteBatch* updates) { + Status s = FailIfBatchHasTs(updates); + if (!s.ok()) { + return s; + } if (optimizations.skip_concurrency_control) { return db_impl_->Write(opts, updates); } else { diff --git a/utilities/transactions/pessimistic_transaction_db.h b/utilities/transactions/pessimistic_transaction_db.h index eb0dd2f052..c0a4b97362 100644 --- a/utilities/transactions/pessimistic_transaction_db.h +++ b/utilities/transactions/pessimistic_transaction_db.h @@ -155,6 +155,11 @@ class PessimisticTransactionDB : public TransactionDB { std::shared_ptr info_log_; const TransactionDBOptions txn_db_options_; + static Status FailIfBatchHasTs(const WriteBatch* wb); + + static Status FailIfCfEnablesTs(const DB* db, + const ColumnFamilyHandle* column_family); + void ReinitializeTransaction( Transaction* txn, const WriteOptions& write_options, const TransactionOptions& txn_options = TransactionOptions()); @@ -175,11 +180,12 @@ class PessimisticTransactionDB : public TransactionDB { friend class WriteUnpreparedTransactionTest_RecoveryTest_Test; friend class WriteUnpreparedTransactionTest_MarkLogWithPrepSection_Test; + Transaction* BeginInternalTransaction(const WriteOptions& options); + std::shared_ptr lock_manager_; // Must be held when adding/dropping column families. InstrumentedMutex column_family_mutex_; - Transaction* BeginInternalTransaction(const WriteOptions& options); // Used to ensure that no locks are stolen from an expirable transaction // that has started a commit. Only transactions with an expiration time @@ -224,5 +230,30 @@ class WriteCommittedTxnDB : public PessimisticTransactionDB { virtual Status Write(const WriteOptions& opts, WriteBatch* updates) override; }; +inline Status PessimisticTransactionDB::FailIfBatchHasTs( + const WriteBatch* batch) { + if (batch != nullptr && WriteBatchInternal::HasKeyWithTimestamp(*batch)) { + return Status::NotSupported( + "Writes with timestamp must go through transaction API instead of " + "TransactionDB."); + } + return Status::OK(); +} + +inline Status PessimisticTransactionDB::FailIfCfEnablesTs( + const DB* db, const ColumnFamilyHandle* column_family) { + assert(db); + column_family = column_family ? column_family : db->DefaultColumnFamily(); + assert(column_family); + const Comparator* const ucmp = column_family->GetComparator(); + assert(ucmp); + if (ucmp->timestamp_size() > 0) { + return Status::NotSupported( + "Write operation with user timestamp must go through the transaction " + "API instead of TransactionDB."); + } + return Status::OK(); +} + } // namespace ROCKSDB_NAMESPACE #endif // ROCKSDB_LITE diff --git a/utilities/transactions/transaction_base.h b/utilities/transactions/transaction_base.h index 7febb10ba6..e4da8d44b9 100644 --- a/utilities/transactions/transaction_base.h +++ b/utilities/transactions/transaction_base.h @@ -220,6 +220,8 @@ class TransactionBaseImpl : public Transaction { void EnableIndexing() override { indexing_enabled_ = true; } + bool IndexingEnabled() const { return indexing_enabled_; } + uint64_t GetElapsedTime() const override; uint64_t GetNumPuts() const override; @@ -278,6 +280,8 @@ class TransactionBaseImpl : public Transaction { assert(s.ok()); } + WriteBatchBase* GetBatchForWrite(); + DB* db_; DBImpl* dbimpl_; @@ -365,7 +369,6 @@ class TransactionBaseImpl : public Transaction { bool read_only, bool exclusive, const bool do_validate = true, const bool assume_tracked = false); - WriteBatchBase* GetBatchForWrite(); void SetSnapshotInternal(const Snapshot* snapshot); }; diff --git a/utilities/transactions/transaction_test.cc b/utilities/transactions/transaction_test.cc index 1970cf6b21..53f9606fed 100644 --- a/utilities/transactions/transaction_test.cc +++ b/utilities/transactions/transaction_test.cc @@ -6421,12 +6421,18 @@ TEST_P(TransactionTest, OpenAndEnableU64Timestamp) { cf_opts.comparator = test::BytewiseComparatorWithU64TsWrapper(); { ColumnFamilyHandle* cfh = nullptr; - ASSERT_TRUE( - db->CreateColumnFamily(cf_opts, test_cf_name, &cfh).IsNotSupported()); + const Status s = db->CreateColumnFamily(cf_opts, test_cf_name, &cfh); + if (txn_db_options.write_policy == WRITE_COMMITTED) { + ASSERT_OK(s); + delete cfh; + } else { + ASSERT_TRUE(s.IsNotSupported()); + assert(!cfh); + } } // Bypass transaction db layer. - { + if (txn_db_options.write_policy != WRITE_COMMITTED) { DBImpl* db_impl = static_cast_with_check(db->GetRootDB()); assert(db_impl); ColumnFamilyHandle* cfh = nullptr; @@ -6439,7 +6445,15 @@ TEST_P(TransactionTest, OpenAndEnableU64Timestamp) { cf_descs.emplace_back(kDefaultColumnFamilyName, options); cf_descs.emplace_back(test_cf_name, cf_opts); std::vector handles; - ASSERT_TRUE(ReOpenNoDelete(cf_descs, &handles).IsNotSupported()); + const Status s = ReOpenNoDelete(cf_descs, &handles); + if (txn_db_options.write_policy == WRITE_COMMITTED) { + ASSERT_OK(s); + for (auto* h : handles) { + delete h; + } + } else { + ASSERT_TRUE(s.IsNotSupported()); + } } } diff --git a/utilities/transactions/transaction_test.h b/utilities/transactions/transaction_test.h index 2aecb312a0..2780cf24de 100644 --- a/utilities/transactions/transaction_test.h +++ b/utilities/transactions/transaction_test.h @@ -170,12 +170,12 @@ class TransactionTestBase : public ::testing::Test { txn_db_options.write_policy == WRITE_PREPARED; Status s = DBImpl::Open(options_copy, dbname, cfs, handles, &root_db, use_seq_per_batch, use_batch_per_txn); - StackableDB* stackable_db = new StackableDB(root_db); + auto stackable_db = std::make_unique(root_db); if (s.ok()) { assert(root_db != nullptr); // If WrapStackableDB() returns non-ok, then stackable_db is already // deleted within WrapStackableDB(). - s = TransactionDB::WrapStackableDB(stackable_db, txn_db_options, + s = TransactionDB::WrapStackableDB(stackable_db.release(), txn_db_options, compaction_enabled_cf_indices, *handles, &db); } @@ -522,4 +522,44 @@ class MySQLStyleTransactionTest const bool with_slow_threads_; }; +class WriteCommittedTxnWithTsTest + : public TransactionTestBase, + public ::testing::WithParamInterface> { + public: + WriteCommittedTxnWithTsTest() + : TransactionTestBase(std::get<0>(GetParam()), std::get<1>(GetParam()), + WRITE_COMMITTED, kOrderedWrite) {} + ~WriteCommittedTxnWithTsTest() override { + for (auto* h : handles_) { + delete h; + } + } + + Status GetFromDb(ReadOptions read_opts, ColumnFamilyHandle* column_family, + const Slice& key, TxnTimestamp ts, std::string* value) { + std::string ts_buf; + PutFixed64(&ts_buf, ts); + Slice ts_slc = ts_buf; + read_opts.timestamp = &ts_slc; + assert(db); + return db->Get(read_opts, column_family, key, value); + } + + Transaction* NewTxn(WriteOptions write_opts, TransactionOptions txn_opts) { + assert(db); + auto* txn = db->BeginTransaction(write_opts, txn_opts); + assert(txn); + const bool enable_indexing = std::get<2>(GetParam()); + if (enable_indexing) { + txn->EnableIndexing(); + } else { + txn->DisableIndexing(); + } + return txn; + } + + protected: + std::vector handles_{}; +}; + } // namespace ROCKSDB_NAMESPACE diff --git a/utilities/transactions/write_committed_transaction_ts_test.cc b/utilities/transactions/write_committed_transaction_ts_test.cc new file mode 100644 index 0000000000..57ea6b3030 --- /dev/null +++ b/utilities/transactions/write_committed_transaction_ts_test.cc @@ -0,0 +1,515 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "rocksdb/db.h" +#include "rocksdb/options.h" +#include "rocksdb/utilities/transaction_db.h" +#include "utilities/merge_operators.h" +#ifndef ROCKSDB_LITE + +#include "test_util/testutil.h" +#include "utilities/transactions/transaction_test.h" + +namespace ROCKSDB_NAMESPACE { + +INSTANTIATE_TEST_CASE_P( + DBAsBaseDB, WriteCommittedTxnWithTsTest, + ::testing::Values(std::make_tuple(false, /*two_write_queue=*/false, + /*enable_indexing=*/false), + std::make_tuple(false, /*two_write_queue=*/true, + /*enable_indexing=*/false), + std::make_tuple(false, /*two_write_queue=*/false, + /*enable_indexing=*/true), + std::make_tuple(false, /*two_write_queue=*/true, + /*enable_indexing=*/true))); + +INSTANTIATE_TEST_CASE_P( + DBAsStackableDB, WriteCommittedTxnWithTsTest, + ::testing::Values(std::make_tuple(true, /*two_write_queue=*/false, + /*enable_indexing=*/false), + std::make_tuple(true, /*two_write_queue=*/true, + /*enable_indexing=*/false), + std::make_tuple(true, /*two_write_queue=*/false, + /*enable_indexing=*/true), + std::make_tuple(true, /*two_write_queue=*/true, + /*enable_indexing=*/true))); + +TEST_P(WriteCommittedTxnWithTsTest, SanityChecks) { + ASSERT_OK(ReOpenNoDelete()); + + ColumnFamilyOptions cf_opts; + cf_opts.comparator = test::BytewiseComparatorWithU64TsWrapper(); + const std::string test_cf_name = "test_cf"; + ColumnFamilyHandle* cfh = nullptr; + assert(db); + ASSERT_OK(db->CreateColumnFamily(cf_opts, test_cf_name, &cfh)); + delete cfh; + cfh = nullptr; + + std::vector cf_descs; + cf_descs.emplace_back(kDefaultColumnFamilyName, options); + cf_descs.emplace_back(test_cf_name, cf_opts); + ASSERT_OK(ReOpenNoDelete(cf_descs, &handles_)); + + std::unique_ptr txn( + NewTxn(WriteOptions(), TransactionOptions())); + assert(txn); + ASSERT_OK(txn->Put(handles_[1], "foo", "value")); + ASSERT_TRUE(txn->Commit().IsInvalidArgument()); + + auto* pessimistic_txn = + static_cast_with_check(txn.get()); + ASSERT_TRUE( + pessimistic_txn->CommitBatch(/*batch=*/nullptr).IsInvalidArgument()); + + { + WriteBatchWithIndex* wbwi = txn->GetWriteBatch(); + assert(wbwi); + WriteBatch* wb = wbwi->GetWriteBatch(); + assert(wb); + // Write a key to the batch for nonexisting cf. + ASSERT_OK(WriteBatchInternal::Put(wb, /*column_family_id=*/10, /*key=*/"", + /*value=*/"")); + } + + ASSERT_OK(txn->SetCommitTimestamp(20)); + + ASSERT_TRUE(txn->Commit().IsInvalidArgument()); + txn.reset(); + + std::unique_ptr txn1( + NewTxn(WriteOptions(), TransactionOptions())); + assert(txn1); + ASSERT_OK(txn1->SetName("txn1")); + ASSERT_OK(txn1->Put(handles_[1], "foo", "value")); + { + WriteBatchWithIndex* wbwi = txn1->GetWriteBatch(); + assert(wbwi); + WriteBatch* wb = wbwi->GetWriteBatch(); + assert(wb); + // Write a key to the batch for non-existing cf. + ASSERT_OK(WriteBatchInternal::Put(wb, /*column_family_id=*/10, /*key=*/"", + /*value=*/"")); + } + ASSERT_OK(txn1->Prepare()); + ASSERT_OK(txn1->SetCommitTimestamp(21)); + ASSERT_TRUE(txn1->Commit().IsInvalidArgument()); + txn1.reset(); +} + +TEST_P(WriteCommittedTxnWithTsTest, ReOpenWithTimestamp) { + options.merge_operator = MergeOperators::CreateUInt64AddOperator(); + ASSERT_OK(ReOpenNoDelete()); + + ColumnFamilyOptions cf_opts; + cf_opts.comparator = test::BytewiseComparatorWithU64TsWrapper(); + const std::string test_cf_name = "test_cf"; + ColumnFamilyHandle* cfh = nullptr; + assert(db); + ASSERT_OK(db->CreateColumnFamily(cf_opts, test_cf_name, &cfh)); + delete cfh; + cfh = nullptr; + + std::vector cf_descs; + cf_descs.emplace_back(kDefaultColumnFamilyName, options); + cf_descs.emplace_back(test_cf_name, cf_opts); + ASSERT_OK(ReOpenNoDelete(cf_descs, &handles_)); + + std::unique_ptr txn0( + NewTxn(WriteOptions(), TransactionOptions())); + assert(txn0); + ASSERT_OK(txn0->Put(handles_[1], "foo", "value")); + ASSERT_OK(txn0->SetName("txn0")); + ASSERT_OK(txn0->Prepare()); + ASSERT_TRUE(txn0->Commit().IsInvalidArgument()); + txn0.reset(); + + std::unique_ptr txn1( + NewTxn(WriteOptions(), TransactionOptions())); + assert(txn1); + ASSERT_OK(txn1->Put(handles_[1], "foo", "value1")); + { + std::string buf; + PutFixed64(&buf, 23); + ASSERT_OK(txn1->Put("id", buf)); + ASSERT_OK(txn1->Merge("id", buf)); + } + ASSERT_OK(txn1->SetName("txn1")); + ASSERT_OK(txn1->Prepare()); + ASSERT_OK(txn1->SetCommitTimestamp(/*ts=*/23)); + ASSERT_OK(txn1->Commit()); + txn1.reset(); + + { + std::string value; + const Status s = + GetFromDb(ReadOptions(), handles_[1], "foo", /*ts=*/23, &value); + ASSERT_OK(s); + ASSERT_EQ("value1", value); + } + + { + std::string value; + const Status s = db->Get(ReadOptions(), handles_[0], "id", &value); + ASSERT_OK(s); + uint64_t ival = 0; + Slice value_slc = value; + bool result = GetFixed64(&value_slc, &ival); + assert(result); + ASSERT_EQ(46, ival); + } +} + +TEST_P(WriteCommittedTxnWithTsTest, RecoverFromWal) { + ASSERT_OK(ReOpenNoDelete()); + + ColumnFamilyOptions cf_opts; + cf_opts.comparator = test::BytewiseComparatorWithU64TsWrapper(); + const std::string test_cf_name = "test_cf"; + ColumnFamilyHandle* cfh = nullptr; + assert(db); + ASSERT_OK(db->CreateColumnFamily(cf_opts, test_cf_name, &cfh)); + delete cfh; + cfh = nullptr; + + std::vector cf_descs; + cf_descs.emplace_back(kDefaultColumnFamilyName, options); + cf_descs.emplace_back(test_cf_name, cf_opts); + options.avoid_flush_during_shutdown = true; + ASSERT_OK(ReOpenNoDelete(cf_descs, &handles_)); + + std::unique_ptr txn0( + NewTxn(WriteOptions(), TransactionOptions())); + assert(txn0); + ASSERT_OK(txn0->Put(handles_[1], "foo", "foo_value")); + ASSERT_OK(txn0->SetName("txn0")); + ASSERT_OK(txn0->Prepare()); + + WriteOptions write_opts; + write_opts.sync = true; + std::unique_ptr txn1(NewTxn(write_opts, TransactionOptions())); + assert(txn1); + ASSERT_OK(txn1->Put("bar", "bar_value_1")); + ASSERT_OK(txn1->Put(handles_[1], "bar", "bar_value_1")); + ASSERT_OK(txn1->SetName("txn1")); + ASSERT_OK(txn1->Prepare()); + ASSERT_OK(txn1->SetCommitTimestamp(/*ts=*/23)); + ASSERT_OK(txn1->Commit()); + txn1.reset(); + + std::unique_ptr txn2(NewTxn(write_opts, TransactionOptions())); + assert(txn2); + ASSERT_OK(txn2->Put("key1", "value_3")); + ASSERT_OK(txn2->Put(handles_[1], "key1", "value_3")); + ASSERT_OK(txn2->SetCommitTimestamp(/*ts=*/24)); + ASSERT_OK(txn2->Commit()); + txn2.reset(); + + txn0.reset(); + + ASSERT_OK(ReOpenNoDelete(cf_descs, &handles_)); + + { + std::string value; + Status s = GetFromDb(ReadOptions(), handles_[1], "foo", /*ts=*/23, &value); + ASSERT_TRUE(s.IsNotFound()); + + s = db->Get(ReadOptions(), handles_[0], "bar", &value); + ASSERT_OK(s); + ASSERT_EQ("bar_value_1", value); + + value.clear(); + s = GetFromDb(ReadOptions(), handles_[1], "bar", /*ts=*/23, &value); + ASSERT_OK(s); + ASSERT_EQ("bar_value_1", value); + + s = GetFromDb(ReadOptions(), handles_[1], "key1", /*ts=*/23, &value); + ASSERT_TRUE(s.IsNotFound()); + + s = db->Get(ReadOptions(), handles_[0], "key1", &value); + ASSERT_OK(s); + ASSERT_EQ("value_3", value); + + s = GetFromDb(ReadOptions(), handles_[1], "key1", /*ts=*/24, &value); + ASSERT_OK(s); + ASSERT_EQ("value_3", value); + } +} + +TEST_P(WriteCommittedTxnWithTsTest, TransactionDbLevelApi) { + ASSERT_OK(ReOpenNoDelete()); + + ColumnFamilyOptions cf_options; + cf_options.merge_operator = MergeOperators::CreateStringAppendOperator(); + cf_options.comparator = test::BytewiseComparatorWithU64TsWrapper(); + const std::string test_cf_name = "test_cf"; + ColumnFamilyHandle* cfh = nullptr; + assert(db); + ASSERT_OK(db->CreateColumnFamily(cf_options, test_cf_name, &cfh)); + delete cfh; + cfh = nullptr; + + std::vector cf_descs; + cf_descs.emplace_back(kDefaultColumnFamilyName, options); + cf_descs.emplace_back(test_cf_name, cf_options); + + ASSERT_OK(ReOpenNoDelete(cf_descs, &handles_)); + + std::string key_str = "tes_key"; + std::string ts_str; + std::string value_str = "test_value"; + PutFixed64(&ts_str, 100); + Slice value = value_str; + + ASSERT_TRUE( + db->Put(WriteOptions(), handles_[1], "foo", "bar").IsNotSupported()); + ASSERT_TRUE(db->Delete(WriteOptions(), handles_[1], "foo").IsNotSupported()); + ASSERT_TRUE( + db->SingleDelete(WriteOptions(), handles_[1], "foo").IsNotSupported()); + ASSERT_TRUE( + db->Merge(WriteOptions(), handles_[1], "foo", "+1").IsNotSupported()); + WriteBatch wb1(/*reserved_bytes=*/0, /*max_bytes=*/0, + /*protection_bytes_per_key=*/0, /*default_cf_ts_sz=*/0); + ASSERT_OK(wb1.Put(handles_[1], key_str, ts_str, value)); + ASSERT_TRUE(db->Write(WriteOptions(), &wb1).IsNotSupported()); + ASSERT_TRUE(db->Write(WriteOptions(), TransactionDBWriteOptimizations(), &wb1) + .IsNotSupported()); + auto* pessimistic_txn_db = + static_cast_with_check(db); + assert(pessimistic_txn_db); + ASSERT_TRUE( + pessimistic_txn_db->WriteWithConcurrencyControl(WriteOptions(), &wb1) + .IsNotSupported()); + + ASSERT_OK(db->Put(WriteOptions(), "foo", "value")); + ASSERT_OK(db->Put(WriteOptions(), "bar", "value")); + ASSERT_OK(db->Delete(WriteOptions(), "bar")); + ASSERT_OK(db->SingleDelete(WriteOptions(), "foo")); + ASSERT_OK(db->Put(WriteOptions(), "key", "value")); + ASSERT_OK(db->Merge(WriteOptions(), "key", "_more")); + WriteBatch wb2(/*reserved_bytes=*/0, /*max_bytes=*/0, + /*protection_bytes_per_key=*/0, /*default_cf_ts_sz=*/0); + ASSERT_OK(wb2.Put(key_str, value)); + ASSERT_OK(db->Write(WriteOptions(), &wb2)); + ASSERT_OK(db->Write(WriteOptions(), TransactionDBWriteOptimizations(), &wb2)); + ASSERT_OK( + pessimistic_txn_db->WriteWithConcurrencyControl(WriteOptions(), &wb2)); + + std::unique_ptr txn( + NewTxn(WriteOptions(), TransactionOptions())); + assert(txn); + + WriteBatch wb3(/*reserved_bytes=*/0, /*max_bytes=*/0, + /*protection_bytes_per_key=*/0, /*default_cf_ts_sz=*/0); + + ASSERT_OK(wb3.Put(handles_[1], "key", "value")); + auto* pessimistic_txn = + static_cast_with_check(txn.get()); + assert(pessimistic_txn); + ASSERT_TRUE(pessimistic_txn->CommitBatch(&wb3).IsNotSupported()); + + txn.reset(); +} + +TEST_P(WriteCommittedTxnWithTsTest, Merge) { + options.merge_operator = MergeOperators::CreateStringAppendOperator(); + ASSERT_OK(ReOpenNoDelete()); + + ColumnFamilyOptions cf_options; + cf_options.comparator = test::BytewiseComparatorWithU64TsWrapper(); + const std::string test_cf_name = "test_cf"; + ColumnFamilyHandle* cfh = nullptr; + assert(db); + ASSERT_OK(db->CreateColumnFamily(cf_options, test_cf_name, &cfh)); + delete cfh; + cfh = nullptr; + + std::vector cf_descs; + cf_descs.emplace_back(kDefaultColumnFamilyName, options); + cf_descs.emplace_back(test_cf_name, Options(DBOptions(), cf_options)); + options.avoid_flush_during_shutdown = true; + + ASSERT_OK(ReOpenNoDelete(cf_descs, &handles_)); + + std::unique_ptr txn( + NewTxn(WriteOptions(), TransactionOptions())); + assert(txn); + ASSERT_OK(txn->Put(handles_[1], "foo", "bar")); + ASSERT_TRUE(txn->Merge(handles_[1], "foo", "1").IsInvalidArgument()); + txn.reset(); +} + +TEST_P(WriteCommittedTxnWithTsTest, GetForUpdate) { + ASSERT_OK(ReOpenNoDelete()); + + ColumnFamilyOptions cf_options; + cf_options.comparator = test::BytewiseComparatorWithU64TsWrapper(); + const std::string test_cf_name = "test_cf"; + ColumnFamilyHandle* cfh = nullptr; + assert(db); + ASSERT_OK(db->CreateColumnFamily(cf_options, test_cf_name, &cfh)); + delete cfh; + cfh = nullptr; + + std::vector cf_descs; + cf_descs.emplace_back(kDefaultColumnFamilyName, options); + cf_descs.emplace_back(test_cf_name, Options(DBOptions(), cf_options)); + options.avoid_flush_during_shutdown = true; + + ASSERT_OK(ReOpenNoDelete(cf_descs, &handles_)); + + std::unique_ptr txn0( + NewTxn(WriteOptions(), TransactionOptions())); + + std::unique_ptr txn1( + NewTxn(WriteOptions(), TransactionOptions())); + ASSERT_OK(txn1->Put(handles_[1], "key", "value1")); + ASSERT_OK(txn1->SetCommitTimestamp(24)); + ASSERT_OK(txn1->Commit()); + txn1.reset(); + + std::string value; + ASSERT_OK(txn0->SetReadTimestampForValidation(23)); + ASSERT_TRUE( + txn0->GetForUpdate(ReadOptions(), handles_[1], "key", &value).IsBusy()); + ASSERT_OK(txn0->Rollback()); + txn0.reset(); + + std::unique_ptr txn2( + NewTxn(WriteOptions(), TransactionOptions())); + ASSERT_OK(txn2->SetReadTimestampForValidation(25)); + ASSERT_OK(txn2->GetForUpdate(ReadOptions(), handles_[1], "key", &value)); + ASSERT_OK(txn2->SetCommitTimestamp(26)); + ASSERT_OK(txn2->Commit()); + txn2.reset(); +} + +TEST_P(WriteCommittedTxnWithTsTest, BlindWrite) { + ASSERT_OK(ReOpenNoDelete()); + + ColumnFamilyOptions cf_options; + cf_options.comparator = test::BytewiseComparatorWithU64TsWrapper(); + const std::string test_cf_name = "test_cf"; + ColumnFamilyHandle* cfh = nullptr; + assert(db); + ASSERT_OK(db->CreateColumnFamily(cf_options, test_cf_name, &cfh)); + delete cfh; + cfh = nullptr; + + std::vector cf_descs; + cf_descs.emplace_back(kDefaultColumnFamilyName, options); + cf_descs.emplace_back(test_cf_name, Options(DBOptions(), cf_options)); + options.avoid_flush_during_shutdown = true; + ASSERT_OK(ReOpenNoDelete(cf_descs, &handles_)); + + std::unique_ptr txn0( + NewTxn(WriteOptions(), TransactionOptions())); + assert(txn0); + std::unique_ptr txn1( + NewTxn(WriteOptions(), TransactionOptions())); + assert(txn1); + + { + std::string value; + ASSERT_OK(txn0->SetReadTimestampForValidation(100)); + // Lock "key". + ASSERT_TRUE(txn0->GetForUpdate(ReadOptions(), handles_[1], "key", &value) + .IsNotFound()); + } + + ASSERT_OK(txn0->Put(handles_[1], "key", "value0")); + ASSERT_OK(txn0->SetCommitTimestamp(101)); + ASSERT_OK(txn0->Commit()); + + ASSERT_OK(txn1->Put(handles_[1], "key", "value1")); + // In reality, caller needs to ensure commit_ts of txn1 is greater than the + // commit_ts of txn0, which is true for lock-based concurrency control. + ASSERT_OK(txn1->SetCommitTimestamp(102)); + ASSERT_OK(txn1->Commit()); + + txn0.reset(); + txn1.reset(); +} + +TEST_P(WriteCommittedTxnWithTsTest, RefineReadTimestamp) { + ASSERT_OK(ReOpenNoDelete()); + + ColumnFamilyOptions cf_options; + cf_options.comparator = test::BytewiseComparatorWithU64TsWrapper(); + const std::string test_cf_name = "test_cf"; + ColumnFamilyHandle* cfh = nullptr; + assert(db); + ASSERT_OK(db->CreateColumnFamily(cf_options, test_cf_name, &cfh)); + delete cfh; + cfh = nullptr; + + std::vector cf_descs; + cf_descs.emplace_back(kDefaultColumnFamilyName, options); + cf_descs.emplace_back(test_cf_name, Options(DBOptions(), cf_options)); + options.avoid_flush_during_shutdown = true; + + ASSERT_OK(ReOpenNoDelete(cf_descs, &handles_)); + + std::unique_ptr txn0( + NewTxn(WriteOptions(), TransactionOptions())); + assert(txn0); + + std::unique_ptr txn1( + NewTxn(WriteOptions(), TransactionOptions())); + assert(txn1); + + { + ASSERT_OK(txn0->SetReadTimestampForValidation(100)); + // Lock "key0", "key1", ..., "key4". + for (int i = 0; i < 5; ++i) { + std::string value; + ASSERT_TRUE(txn0->GetForUpdate(ReadOptions(), handles_[1], + "key" + std::to_string(i), &value) + .IsNotFound()); + } + } + ASSERT_OK(txn1->Put(handles_[1], "key5", "value5_0")); + ASSERT_OK(txn1->SetName("txn1")); + ASSERT_OK(txn1->Prepare()); + ASSERT_OK(txn1->SetCommitTimestamp(101)); + ASSERT_OK(txn1->Commit()); + txn1.reset(); + + { + std::string value; + ASSERT_TRUE(txn0->GetForUpdate(ReadOptions(), handles_[1], "key5", &value) + .IsBusy()); + ASSERT_OK(txn0->SetReadTimestampForValidation(102)); + ASSERT_OK(txn0->GetForUpdate(ReadOptions(), handles_[1], "key5", &value)); + ASSERT_EQ("value5_0", value); + } + + for (int i = 0; i < 6; ++i) { + ASSERT_OK(txn0->Put(handles_[1], "key" + std::to_string(i), + "value" + std::to_string(i))); + } + ASSERT_OK(txn0->SetName("txn0")); + ASSERT_OK(txn0->Prepare()); + ASSERT_OK(txn0->SetCommitTimestamp(103)); + ASSERT_OK(txn0->Commit()); + txn0.reset(); +} + +} // namespace ROCKSDB_NAMESPACE + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} + +#else +#include + +int main(int /*argc*/, char** /*argv*/) { + fprintf(stderr, "SKIPPED as Transactions not supported in ROCKSDB_LITE\n"); + return 0; +} + +#endif // ROCKSDB_LITE