diff --git a/CMakeLists.txt b/CMakeLists.txt index bd7a8fbe45..8209f30fe9 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -520,7 +520,7 @@ set(SOURCES utilities/transactions/optimistic_transaction_db_impl.cc utilities/transactions/optimistic_transaction_impl.cc utilities/transactions/transaction_base.cc - utilities/transactions/transaction_db_impl.cc + utilities/transactions/pessimistic_transaction_db.cc utilities/transactions/transaction_db_mutex_impl.cc utilities/transactions/transaction_impl.cc utilities/transactions/transaction_lock_mgr.cc diff --git a/TARGETS b/TARGETS index f2aa661f38..e52f507074 100644 --- a/TARGETS +++ b/TARGETS @@ -247,11 +247,12 @@ cpp_library( "utilities/transactions/optimistic_transaction_db_impl.cc", "utilities/transactions/optimistic_transaction_impl.cc", "utilities/transactions/transaction_base.cc", - "utilities/transactions/transaction_db_impl.cc", + "utilities/transactions/pessimistic_transaction_db.cc", "utilities/transactions/transaction_db_mutex_impl.cc", "utilities/transactions/transaction_impl.cc", "utilities/transactions/transaction_lock_mgr.cc", "utilities/transactions/transaction_util.cc", + "utilities/transactions/write_prepared_transaction_impl.cc", "utilities/ttl/db_ttl_impl.cc", "utilities/write_batch_with_index/write_batch_with_index.cc", "utilities/write_batch_with_index/write_batch_with_index_internal.cc", diff --git a/include/rocksdb/utilities/transaction_db.h b/include/rocksdb/utilities/transaction_db.h index 259f50fe6a..548518f600 100644 --- a/include/rocksdb/utilities/transaction_db.h +++ b/include/rocksdb/utilities/transaction_db.h @@ -23,6 +23,12 @@ namespace rocksdb { class TransactionDBMutexFactory; +enum TxnDBWritePolicy { + WRITE_COMMITTED = 0, // write only the committed data + WRITE_PREPARED, // write data after the prepare phase of 2pc + WRITE_UNPREPARED // write data before the prepare phase of 2pc +}; + struct TransactionDBOptions { // Specifies the maximum number of keys that can be locked at the same time // per column family. @@ -66,6 +72,12 @@ struct TransactionDBOptions { // condition variable for all transaction locking instead of the default // mutex/condvar implementation. std::shared_ptr custom_mutex_factory; + + // The policy for when to write the data into the DB. The default policy is to + // write only the committed data (WRITE_COMMITTED). The data could be written + // before the commit phase. The DB then needs to provide the mechanisms to + // tell apart committed from uncommitted data. + TxnDBWritePolicy write_policy; }; struct TransactionOptions { diff --git a/src.mk b/src.mk index 0b0d4e6ab5..44c59fea7d 100644 --- a/src.mk +++ b/src.mk @@ -195,7 +195,7 @@ LIB_SOURCES = \ utilities/transactions/optimistic_transaction_db_impl.cc \ utilities/transactions/optimistic_transaction_impl.cc \ utilities/transactions/transaction_base.cc \ - utilities/transactions/transaction_db_impl.cc \ + utilities/transactions/pessimistic_transaction_db.cc \ utilities/transactions/transaction_db_mutex_impl.cc \ utilities/transactions/transaction_impl.cc \ utilities/transactions/transaction_lock_mgr.cc \ diff --git a/utilities/transactions/transaction_db_impl.cc b/utilities/transactions/pessimistic_transaction_db.cc similarity index 75% rename from utilities/transactions/transaction_db_impl.cc rename to utilities/transactions/pessimistic_transaction_db.cc index bd43b585ac..052dc80f73 100644 --- a/utilities/transactions/transaction_db_impl.cc +++ b/utilities/transactions/pessimistic_transaction_db.cc @@ -5,7 +5,7 @@ #ifndef ROCKSDB_LITE -#include "utilities/transactions/transaction_db_impl.h" +#include "utilities/transactions/pessimistic_transaction_db.h" #include #include @@ -21,8 +21,8 @@ namespace rocksdb { -TransactionDBImpl::TransactionDBImpl(DB* db, - const TransactionDBOptions& txn_db_options) +PessimisticTransactionDB::PessimisticTransactionDB( + DB* db, const TransactionDBOptions& txn_db_options) : TransactionDB(db), db_impl_(static_cast_with_check(db)), txn_db_options_(txn_db_options), @@ -34,9 +34,9 @@ TransactionDBImpl::TransactionDBImpl(DB* db, assert(db_impl_ != nullptr); } -// Support initiliazing TransactionDBImpl from a stackable db +// Support initiliazing PessimisticTransactionDB from a stackable db // -// TransactionDBImpl +// PessimisticTransactionDB // ^ ^ // | | // | + @@ -50,8 +50,8 @@ TransactionDBImpl::TransactionDBImpl(DB* db, // + // DB // -TransactionDBImpl::TransactionDBImpl(StackableDB* db, - const TransactionDBOptions& txn_db_options) +PessimisticTransactionDB::PessimisticTransactionDB( + StackableDB* db, const TransactionDBOptions& txn_db_options) : TransactionDB(db), db_impl_(static_cast_with_check(db->GetRootDB())), txn_db_options_(txn_db_options), @@ -63,13 +63,13 @@ TransactionDBImpl::TransactionDBImpl(StackableDB* db, assert(db_impl_ != nullptr); } -TransactionDBImpl::~TransactionDBImpl() { +PessimisticTransactionDB::~PessimisticTransactionDB() { while (!transactions_.empty()) { delete transactions_.begin()->second; } } -Status TransactionDBImpl::Initialize( +Status PessimisticTransactionDB::Initialize( const std::vector& compaction_enabled_cf_indices, const std::vector& handles) { for (auto cf_ptr : handles) { @@ -121,7 +121,7 @@ Status TransactionDBImpl::Initialize( return s; } -Transaction* TransactionDBImpl::BeginTransaction( +Transaction* WriteCommittedTxnDB::BeginTransaction( const WriteOptions& write_options, const TransactionOptions& txn_options, Transaction* old_txn) { if (old_txn != nullptr) { @@ -132,7 +132,18 @@ Transaction* TransactionDBImpl::BeginTransaction( } } -TransactionDBOptions TransactionDBImpl::ValidateTxnDBOptions( +Transaction* WritePreparedTxnDB::BeginTransaction( + const WriteOptions& write_options, const TransactionOptions& txn_options, + Transaction* old_txn) { + if (old_txn != nullptr) { + ReinitializeTransaction(old_txn, write_options, txn_options); + return old_txn; + } else { + return new WritePreparedTxnImpl(this, write_options, txn_options); + } +} + +TransactionDBOptions PessimisticTransactionDB::ValidateTxnDBOptions( const TransactionDBOptions& txn_db_options) { TransactionDBOptions validated = txn_db_options; @@ -213,8 +224,19 @@ Status TransactionDB::WrapDB( DB* db, const TransactionDBOptions& txn_db_options, const std::vector& compaction_enabled_cf_indices, const std::vector& handles, TransactionDB** dbptr) { - TransactionDBImpl* txn_db = new TransactionDBImpl( - db, TransactionDBImpl::ValidateTxnDBOptions(txn_db_options)); + PessimisticTransactionDB* txn_db; + switch (txn_db_options.write_policy) { + case WRITE_UNPREPARED: + return Status::NotSupported("WRITE_UNPREPARED is not implemented yet"); + case WRITE_PREPARED: + txn_db = new WritePreparedTxnDB( + db, PessimisticTransactionDB::ValidateTxnDBOptions(txn_db_options)); + break; + case WRITE_COMMITTED: + default: + txn_db = new WriteCommittedTxnDB( + db, PessimisticTransactionDB::ValidateTxnDBOptions(txn_db_options)); + } *dbptr = txn_db; Status s = txn_db->Initialize(compaction_enabled_cf_indices, handles); return s; @@ -227,8 +249,19 @@ Status TransactionDB::WrapStackableDB( StackableDB* db, const TransactionDBOptions& txn_db_options, const std::vector& compaction_enabled_cf_indices, const std::vector& handles, TransactionDB** dbptr) { - TransactionDBImpl* txn_db = new TransactionDBImpl( - db, TransactionDBImpl::ValidateTxnDBOptions(txn_db_options)); + PessimisticTransactionDB* txn_db; + switch (txn_db_options.write_policy) { + case WRITE_UNPREPARED: + return Status::NotSupported("WRITE_UNPREPARED is not implemented yet"); + case WRITE_PREPARED: + txn_db = new WritePreparedTxnDB( + db, PessimisticTransactionDB::ValidateTxnDBOptions(txn_db_options)); + break; + case WRITE_COMMITTED: + default: + txn_db = new WriteCommittedTxnDB( + db, PessimisticTransactionDB::ValidateTxnDBOptions(txn_db_options)); + } *dbptr = txn_db; Status s = txn_db->Initialize(compaction_enabled_cf_indices, handles); return s; @@ -236,11 +269,12 @@ Status TransactionDB::WrapStackableDB( // Let TransactionLockMgr know that this column family exists so it can // allocate a LockMap for it. -void TransactionDBImpl::AddColumnFamily(const ColumnFamilyHandle* handle) { +void PessimisticTransactionDB::AddColumnFamily( + const ColumnFamilyHandle* handle) { lock_mgr_.AddColumnFamily(handle->GetID()); } -Status TransactionDBImpl::CreateColumnFamily( +Status PessimisticTransactionDB::CreateColumnFamily( const ColumnFamilyOptions& options, const std::string& column_family_name, ColumnFamilyHandle** handle) { InstrumentedMutexLock l(&column_family_mutex_); @@ -255,7 +289,8 @@ Status TransactionDBImpl::CreateColumnFamily( // Let TransactionLockMgr know that it can deallocate the LockMap for this // column family. -Status TransactionDBImpl::DropColumnFamily(ColumnFamilyHandle* column_family) { +Status PessimisticTransactionDB::DropColumnFamily( + ColumnFamilyHandle* column_family) { InstrumentedMutexLock l(&column_family_mutex_); Status s = db_->DropColumnFamily(column_family); @@ -266,23 +301,24 @@ Status TransactionDBImpl::DropColumnFamily(ColumnFamilyHandle* column_family) { return s; } -Status TransactionDBImpl::TryLock(PessimisticTxn* txn, uint32_t cfh_id, - const std::string& key, bool exclusive) { +Status PessimisticTransactionDB::TryLock(PessimisticTxn* txn, uint32_t cfh_id, + const std::string& key, + bool exclusive) { return lock_mgr_.TryLock(txn, cfh_id, key, GetEnv(), exclusive); } -void TransactionDBImpl::UnLock(PessimisticTxn* txn, - const TransactionKeyMap* keys) { +void PessimisticTransactionDB::UnLock(PessimisticTxn* txn, + const TransactionKeyMap* keys) { lock_mgr_.UnLock(txn, keys, GetEnv()); } -void TransactionDBImpl::UnLock(PessimisticTxn* txn, uint32_t cfh_id, - const std::string& key) { +void PessimisticTransactionDB::UnLock(PessimisticTxn* txn, uint32_t cfh_id, + const std::string& key) { lock_mgr_.UnLock(txn, cfh_id, key, GetEnv()); } // Used when wrapping DB write operations in a transaction -Transaction* TransactionDBImpl::BeginInternalTransaction( +Transaction* PessimisticTransactionDB::BeginInternalTransaction( const WriteOptions& options) { TransactionOptions txn_options; Transaction* txn = BeginTransaction(options, txn_options, nullptr); @@ -301,9 +337,9 @@ Transaction* TransactionDBImpl::BeginInternalTransaction( // sort its keys before locking them. This guarantees that TransactionDB write // methods cannot deadlock with eachother (but still could deadlock with a // Transaction). -Status TransactionDBImpl::Put(const WriteOptions& options, - ColumnFamilyHandle* column_family, - const Slice& key, const Slice& val) { +Status PessimisticTransactionDB::Put(const WriteOptions& options, + ColumnFamilyHandle* column_family, + const Slice& key, const Slice& val) { Status s; Transaction* txn = BeginInternalTransaction(options); @@ -322,9 +358,9 @@ Status TransactionDBImpl::Put(const WriteOptions& options, return s; } -Status TransactionDBImpl::Delete(const WriteOptions& wopts, - ColumnFamilyHandle* column_family, - const Slice& key) { +Status PessimisticTransactionDB::Delete(const WriteOptions& wopts, + ColumnFamilyHandle* column_family, + const Slice& key) { Status s; Transaction* txn = BeginInternalTransaction(wopts); @@ -344,9 +380,9 @@ Status TransactionDBImpl::Delete(const WriteOptions& wopts, return s; } -Status TransactionDBImpl::Merge(const WriteOptions& options, - ColumnFamilyHandle* column_family, - const Slice& key, const Slice& value) { +Status PessimisticTransactionDB::Merge(const WriteOptions& options, + ColumnFamilyHandle* column_family, + const Slice& key, const Slice& value) { Status s; Transaction* txn = BeginInternalTransaction(options); @@ -366,7 +402,8 @@ Status TransactionDBImpl::Merge(const WriteOptions& options, return s; } -Status TransactionDBImpl::Write(const WriteOptions& opts, WriteBatch* updates) { +Status PessimisticTransactionDB::Write(const WriteOptions& opts, + WriteBatch* updates) { // Need to lock all keys in this batch to prevent write conflicts with // concurrent transactions. Transaction* txn = BeginInternalTransaction(opts); @@ -385,19 +422,19 @@ Status TransactionDBImpl::Write(const WriteOptions& opts, WriteBatch* updates) { return s; } -void TransactionDBImpl::InsertExpirableTransaction(TransactionID tx_id, - PessimisticTxn* tx) { +void PessimisticTransactionDB::InsertExpirableTransaction(TransactionID tx_id, + PessimisticTxn* tx) { assert(tx->GetExpirationTime() > 0); std::lock_guard lock(map_mutex_); expirable_transactions_map_.insert({tx_id, tx}); } -void TransactionDBImpl::RemoveExpirableTransaction(TransactionID tx_id) { +void PessimisticTransactionDB::RemoveExpirableTransaction(TransactionID tx_id) { std::lock_guard lock(map_mutex_); expirable_transactions_map_.erase(tx_id); } -bool TransactionDBImpl::TryStealingExpiredTransactionLocks( +bool PessimisticTransactionDB::TryStealingExpiredTransactionLocks( TransactionID tx_id) { std::lock_guard lock(map_mutex_); @@ -409,7 +446,7 @@ bool TransactionDBImpl::TryStealingExpiredTransactionLocks( return tx.TryStealingLocks(); } -void TransactionDBImpl::ReinitializeTransaction( +void PessimisticTransactionDB::ReinitializeTransaction( Transaction* txn, const WriteOptions& write_options, const TransactionOptions& txn_options) { auto txn_impl = static_cast_with_check(txn); @@ -417,7 +454,7 @@ void TransactionDBImpl::ReinitializeTransaction( txn_impl->Reinitialize(this, write_options, txn_options); } -Transaction* TransactionDBImpl::GetTransactionByName( +Transaction* PessimisticTransactionDB::GetTransactionByName( const TransactionName& name) { std::lock_guard lock(name_map_mutex_); auto it = transactions_.find(name); @@ -428,7 +465,7 @@ Transaction* TransactionDBImpl::GetTransactionByName( } } -void TransactionDBImpl::GetAllPreparedTransactions( +void PessimisticTransactionDB::GetAllPreparedTransactions( std::vector* transv) { assert(transv); transv->clear(); @@ -440,11 +477,12 @@ void TransactionDBImpl::GetAllPreparedTransactions( } } -TransactionLockMgr::LockStatusData TransactionDBImpl::GetLockStatusData() { +TransactionLockMgr::LockStatusData +PessimisticTransactionDB::GetLockStatusData() { return lock_mgr_.GetLockStatusData(); } -void TransactionDBImpl::RegisterTransaction(Transaction* txn) { +void PessimisticTransactionDB::RegisterTransaction(Transaction* txn) { assert(txn); assert(txn->GetName().length() > 0); assert(GetTransactionByName(txn->GetName()) == nullptr); @@ -453,7 +491,7 @@ void TransactionDBImpl::RegisterTransaction(Transaction* txn) { transactions_[txn->GetName()] = txn; } -void TransactionDBImpl::UnregisterTransaction(Transaction* txn) { +void PessimisticTransactionDB::UnregisterTransaction(Transaction* txn) { assert(txn); std::lock_guard lock(name_map_mutex_); auto it = transactions_.find(txn->GetName()); diff --git a/utilities/transactions/transaction_db_impl.h b/utilities/transactions/pessimistic_transaction_db.h similarity index 67% rename from utilities/transactions/transaction_db_impl.h rename to utilities/transactions/pessimistic_transaction_db.h index dfc13fbd70..d9cf7d5586 100644 --- a/utilities/transactions/transaction_db_impl.h +++ b/utilities/transactions/pessimistic_transaction_db.h @@ -17,25 +17,26 @@ #include "rocksdb/utilities/transaction_db.h" #include "utilities/transactions/transaction_impl.h" #include "utilities/transactions/transaction_lock_mgr.h" +#include "utilities/transactions/write_prepared_transaction_impl.h" namespace rocksdb { -class TransactionDBImpl : public TransactionDB { +class PessimisticTransactionDB : public TransactionDB { public: - explicit TransactionDBImpl(DB* db, - const TransactionDBOptions& txn_db_options); + explicit PessimisticTransactionDB(DB* db, + const TransactionDBOptions& txn_db_options); - explicit TransactionDBImpl(StackableDB* db, - const TransactionDBOptions& txn_db_options); + explicit PessimisticTransactionDB(StackableDB* db, + const TransactionDBOptions& txn_db_options); - ~TransactionDBImpl(); + virtual ~PessimisticTransactionDB(); Status Initialize(const std::vector& compaction_enabled_cf_indices, const std::vector& handles); Transaction* BeginTransaction(const WriteOptions& write_options, const TransactionOptions& txn_options, - Transaction* old_txn) override; + Transaction* old_txn) override = 0; using StackableDB::Put; virtual Status Put(const WriteOptions& options, @@ -97,11 +98,12 @@ class TransactionDBImpl : public TransactionDB { TransactionLockMgr::LockStatusData GetLockStatusData() override; - private: + protected: void ReinitializeTransaction( Transaction* txn, const WriteOptions& write_options, const TransactionOptions& txn_options = TransactionOptions()); + private: DBImpl* db_impl_; const TransactionDBOptions txn_db_options_; TransactionLockMgr lock_mgr_; @@ -122,5 +124,44 @@ class TransactionDBImpl : public TransactionDB { std::unordered_map transactions_; }; +// A PessimisticTransactionDB that writes the data to the DB after the commit. +// In this way the DB only contains the committed data. +class WriteCommittedTxnDB : public PessimisticTransactionDB { + public: + explicit WriteCommittedTxnDB(DB* db, + const TransactionDBOptions& txn_db_options) + : PessimisticTransactionDB(db, txn_db_options) {} + + explicit WriteCommittedTxnDB(StackableDB* db, + const TransactionDBOptions& txn_db_options) + : PessimisticTransactionDB(db, txn_db_options) {} + + virtual ~WriteCommittedTxnDB() {} + + Transaction* BeginTransaction(const WriteOptions& write_options, + const TransactionOptions& txn_options, + Transaction* old_txn) override; +}; + +// A PessimisticTransactionDB that writes data to DB after prepare phase of 2PC. +// In this way some data in the DB might not be committed. The DB provides +// mechanisms to tell such data apart from committed data. +class WritePreparedTxnDB : public PessimisticTransactionDB { + public: + explicit WritePreparedTxnDB(DB* db, + const TransactionDBOptions& txn_db_options) + : PessimisticTransactionDB(db, txn_db_options) {} + + explicit WritePreparedTxnDB(StackableDB* db, + const TransactionDBOptions& txn_db_options) + : PessimisticTransactionDB(db, txn_db_options) {} + + virtual ~WritePreparedTxnDB() {} + + Transaction* BeginTransaction(const WriteOptions& write_options, + const TransactionOptions& txn_options, + Transaction* old_txn) override; +}; + } // namespace rocksdb #endif // ROCKSDB_LITE diff --git a/utilities/transactions/transaction_impl.cc b/utilities/transactions/transaction_impl.cc index ececec6d53..a2219e1a38 100644 --- a/utilities/transactions/transaction_impl.cc +++ b/utilities/transactions/transaction_impl.cc @@ -22,7 +22,7 @@ #include "util/cast_util.h" #include "util/string_util.h" #include "util/sync_point.h" -#include "utilities/transactions/transaction_db_impl.h" +#include "utilities/transactions/pessimistic_transaction_db.h" #include "utilities/transactions/transaction_util.h" namespace rocksdb { @@ -48,7 +48,7 @@ PessimisticTxn::PessimisticTxn(TransactionDB* txn_db, deadlock_detect_(false), deadlock_detect_depth_(0) { txn_db_impl_ = - static_cast_with_check(txn_db); + static_cast_with_check(txn_db); db_impl_ = static_cast_with_check(db_); Initialize(txn_options); } diff --git a/utilities/transactions/transaction_impl.h b/utilities/transactions/transaction_impl.h index 8445b0a50a..dce5c7b97e 100644 --- a/utilities/transactions/transaction_impl.h +++ b/utilities/transactions/transaction_impl.h @@ -30,7 +30,7 @@ namespace rocksdb { -class TransactionDBImpl; +class PessimisticTransactionDB; class PessimisticTxn; // A transaction under pessimistic concurrency control. This class implements @@ -121,7 +121,7 @@ class PessimisticTxn : public TransactionBaseImpl { void Clear() override; - TransactionDBImpl* txn_db_impl_; + PessimisticTransactionDB* txn_db_impl_; DBImpl* db_impl_; // If non-zero, this transaction should not be committed after this time (in diff --git a/utilities/transactions/transaction_lock_mgr.cc b/utilities/transactions/transaction_lock_mgr.cc index 99e71eeb0d..95612cd397 100644 --- a/utilities/transactions/transaction_lock_mgr.cc +++ b/utilities/transactions/transaction_lock_mgr.cc @@ -26,7 +26,7 @@ #include "util/murmurhash.h" #include "util/sync_point.h" #include "util/thread_local.h" -#include "utilities/transactions/transaction_db_impl.h" +#include "utilities/transactions/pessimistic_transaction_db.h" namespace rocksdb { @@ -115,7 +115,7 @@ TransactionLockMgr::TransactionLockMgr( mutex_factory_(mutex_factory) { assert(txn_db); txn_db_impl_ = - static_cast_with_check(txn_db); + static_cast_with_check(txn_db); } TransactionLockMgr::~TransactionLockMgr() {} diff --git a/utilities/transactions/transaction_lock_mgr.h b/utilities/transactions/transaction_lock_mgr.h index 6c0d1e99dc..86a65783fd 100644 --- a/utilities/transactions/transaction_lock_mgr.h +++ b/utilities/transactions/transaction_lock_mgr.h @@ -27,7 +27,7 @@ struct LockMap; struct LockMapStripe; class Slice; -class TransactionDBImpl; +class PessimisticTransactionDB; class TransactionLockMgr { public: @@ -61,7 +61,7 @@ class TransactionLockMgr { LockStatusData GetLockStatusData(); private: - TransactionDBImpl* txn_db_impl_; + PessimisticTransactionDB* txn_db_impl_; // Default number of lock map stripes per column family const size_t default_num_stripes_; diff --git a/utilities/transactions/write_prepared_transaction_impl.cc b/utilities/transactions/write_prepared_transaction_impl.cc index ded6bcb2bc..c018e94604 100644 --- a/utilities/transactions/write_prepared_transaction_impl.cc +++ b/utilities/transactions/write_prepared_transaction_impl.cc @@ -21,7 +21,7 @@ #include "rocksdb/utilities/transaction_db.h" #include "util/string_util.h" #include "util/sync_point.h" -#include "utilities/transactions/transaction_db_impl.h" +#include "utilities/transactions/pessimistic_transaction_db.h" #include "utilities/transactions/transaction_impl.h" #include "utilities/transactions/transaction_util.h"