// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. // This source code is licensed under both the GPLv2 (found in the // COPYING file in the root directory) and Apache 2.0 License // (found in the LICENSE.Apache file in the root directory). #include "utilities/transactions/pessimistic_transaction_db.h" #include #include #include #include #include #include "db/db_impl/db_impl.h" #include "logging/logging.h" #include "rocksdb/db.h" #include "rocksdb/options.h" #include "rocksdb/utilities/transaction_db.h" #include "test_util/sync_point.h" #include "util/cast_util.h" #include "util/mutexlock.h" #include "utilities/transactions/pessimistic_transaction.h" #include "utilities/transactions/transaction_db_mutex_impl.h" #include "utilities/transactions/write_prepared_txn_db.h" #include "utilities/transactions/write_unprepared_txn_db.h" namespace ROCKSDB_NAMESPACE { PessimisticTransactionDB::PessimisticTransactionDB( DB* db, const TransactionDBOptions& txn_db_options) : TransactionDB(db), db_impl_(static_cast_with_check(db)), txn_db_options_(txn_db_options), lock_manager_(NewLockManager(this, txn_db_options)) { assert(db_impl_ != nullptr); info_log_ = db_impl_->GetDBOptions().info_log; } // Support initiliazing PessimisticTransactionDB from a stackable db // // PessimisticTransactionDB // ^ ^ // | | // | + // | StackableDB // | ^ // | | // + + // DBImpl // ^ // |(inherit) // + // DB // PessimisticTransactionDB::PessimisticTransactionDB( StackableDB* db, const TransactionDBOptions& txn_db_options) : TransactionDB(db), db_impl_(static_cast_with_check(db->GetRootDB())), txn_db_options_(txn_db_options), lock_manager_(NewLockManager(this, txn_db_options)) { assert(db_impl_ != nullptr); } PessimisticTransactionDB::~PessimisticTransactionDB() { while (!transactions_.empty()) { delete transactions_.begin()->second; // TODO(myabandeh): this seems to be an unsafe approach as it is not quite // clear whether delete would also remove the entry from transactions_. } } Status PessimisticTransactionDB::VerifyCFOptions( const ColumnFamilyOptions& cf_options) { const Comparator* const ucmp = cf_options.comparator; assert(ucmp); size_t ts_sz = ucmp->timestamp_size(); if (0 == ts_sz) { return Status::OK(); } if (ts_sz != sizeof(TxnTimestamp)) { std::ostringstream oss; oss << "Timestamp of transaction must have " << sizeof(TxnTimestamp) << " bytes. CF comparator " << std::string(ucmp->Name()) << " timestamp size is " << ts_sz << " bytes"; return Status::InvalidArgument(oss.str()); } if (txn_db_options_.write_policy != WRITE_COMMITTED) { return Status::NotSupported("Only WriteCommittedTxn supports timestamp"); } return Status::OK(); } Status PessimisticTransactionDB::Initialize( const std::vector& compaction_enabled_cf_indices, const std::vector& handles) { for (auto cf_ptr : handles) { AddColumnFamily(cf_ptr); } // Verify cf options for (auto handle : handles) { ColumnFamilyDescriptor cfd; Status s = handle->GetDescriptor(&cfd); if (!s.ok()) { return s; } s = VerifyCFOptions(cfd.options); if (!s.ok()) { return s; } } // Re-enable compaction for the column families that initially had // compaction enabled. std::vector compaction_enabled_cf_handles; compaction_enabled_cf_handles.reserve(compaction_enabled_cf_indices.size()); for (auto index : compaction_enabled_cf_indices) { compaction_enabled_cf_handles.push_back(handles[index]); } Status s = EnableAutoCompaction(compaction_enabled_cf_handles); // create 'real' transactions from recovered shell transactions auto dbimpl = static_cast_with_check(GetRootDB()); assert(dbimpl != nullptr); auto rtrxs = dbimpl->recovered_transactions(); for (auto it = rtrxs.begin(); it != rtrxs.end(); ++it) { auto recovered_trx = it->second; assert(recovered_trx); assert(recovered_trx->batches_.size() == 1); const auto& seq = recovered_trx->batches_.begin()->first; const auto& batch_info = recovered_trx->batches_.begin()->second; assert(batch_info.log_number_); assert(recovered_trx->name_.length()); // TODO: plumb Env::IOActivity, Env::IOPriority WriteOptions w_options; w_options.sync = true; TransactionOptions t_options; // This would help avoiding deadlock for keys that although exist in the WAL // did not go through concurrency control. This includes the merge that // MyRocks uses for auto-inc columns. It is safe to do so, since (i) if // there is a conflict between the keys of two transactions that must be // avoided, it is already avoided by the application, MyRocks, before the // restart (ii) application, MyRocks, guarntees to rollback/commit the // recovered transactions before new transactions start. t_options.skip_concurrency_control = true; Transaction* real_trx = BeginTransaction(w_options, t_options, nullptr); assert(real_trx); real_trx->SetLogNumber(batch_info.log_number_); assert(seq != kMaxSequenceNumber); if (GetTxnDBOptions().write_policy != WRITE_COMMITTED) { real_trx->SetId(seq); } s = real_trx->SetName(recovered_trx->name_); if (!s.ok()) { break; } s = real_trx->RebuildFromWriteBatch(batch_info.batch_); // WriteCommitted set this to to disable this check that is specific to // WritePrepared txns assert(batch_info.batch_cnt_ == 0 || real_trx->GetWriteBatch()->SubBatchCnt() == batch_info.batch_cnt_); real_trx->SetState(Transaction::PREPARED); if (!s.ok()) { break; } } if (s.ok()) { dbimpl->DeleteAllRecoveredTransactions(); } return s; } Transaction* WriteCommittedTxnDB::BeginTransaction( const WriteOptions& write_options, const TransactionOptions& txn_options, Transaction* old_txn) { if (old_txn != nullptr) { ReinitializeTransaction(old_txn, write_options, txn_options); return old_txn; } else { return new WriteCommittedTxn(this, write_options, txn_options); } } TransactionDBOptions PessimisticTransactionDB::ValidateTxnDBOptions( const TransactionDBOptions& txn_db_options) { TransactionDBOptions validated = txn_db_options; if (txn_db_options.num_stripes == 0) { validated.num_stripes = 1; } return validated; } Status TransactionDB::Open(const Options& options, const TransactionDBOptions& txn_db_options, const std::string& dbname, TransactionDB** dbptr) { DBOptions db_options(options); ColumnFamilyOptions cf_options(options); std::vector column_families; column_families.emplace_back(kDefaultColumnFamilyName, cf_options); std::vector handles; Status s = TransactionDB::Open(db_options, txn_db_options, dbname, column_families, &handles, dbptr); if (s.ok()) { assert(handles.size() == 1); // i can delete the handle since DBImpl is always holding a reference to // default column family delete handles[0]; } return s; } Status TransactionDB::Open( const DBOptions& db_options, const TransactionDBOptions& txn_db_options, const std::string& dbname, const std::vector& column_families, std::vector* handles, TransactionDB** dbptr) { Status s; DB* db = nullptr; if (txn_db_options.write_policy == WRITE_COMMITTED && db_options.unordered_write) { return Status::NotSupported( "WRITE_COMMITTED is incompatible with unordered_writes"); } if (txn_db_options.write_policy == WRITE_UNPREPARED && db_options.unordered_write) { // TODO(lth): support it return Status::NotSupported( "WRITE_UNPREPARED is currently incompatible with unordered_writes"); } if (txn_db_options.write_policy == WRITE_PREPARED && db_options.unordered_write && !db_options.two_write_queues) { return Status::NotSupported( "WRITE_PREPARED is incompatible with unordered_writes if " "two_write_queues is not enabled."); } std::vector column_families_copy = column_families; std::vector compaction_enabled_cf_indices; DBOptions db_options_2pc = db_options; PrepareWrap(&db_options_2pc, &column_families_copy, &compaction_enabled_cf_indices); const bool use_seq_per_batch = txn_db_options.write_policy == WRITE_PREPARED || txn_db_options.write_policy == WRITE_UNPREPARED; const bool use_batch_per_txn = txn_db_options.write_policy == WRITE_COMMITTED || txn_db_options.write_policy == WRITE_PREPARED; s = DBImpl::Open(db_options_2pc, dbname, column_families_copy, handles, &db, use_seq_per_batch, use_batch_per_txn); if (s.ok()) { ROCKS_LOG_WARN(db->GetDBOptions().info_log, "Transaction write_policy is %" PRId32, static_cast(txn_db_options.write_policy)); // if WrapDB return non-ok, db will be deleted in WrapDB() via // ~StackableDB(). s = WrapDB(db, txn_db_options, compaction_enabled_cf_indices, *handles, dbptr); } return s; } void TransactionDB::PrepareWrap( DBOptions* db_options, std::vector* column_families, std::vector* compaction_enabled_cf_indices) { compaction_enabled_cf_indices->clear(); // Enable MemTable History if not already enabled for (size_t i = 0; i < column_families->size(); i++) { ColumnFamilyOptions* cf_options = &(*column_families)[i].options; if (cf_options->max_write_buffer_size_to_maintain == 0 && cf_options->max_write_buffer_number_to_maintain == 0) { // Setting to -1 will set the History size to // max_write_buffer_number * write_buffer_size. cf_options->max_write_buffer_size_to_maintain = -1; } if (!cf_options->disable_auto_compactions) { // Disable compactions momentarily to prevent race with DB::Open cf_options->disable_auto_compactions = true; compaction_enabled_cf_indices->push_back(i); } } db_options->allow_2pc = true; } namespace { template Status WrapAnotherDBInternal( DBType* db, const TransactionDBOptions& txn_db_options, const std::vector& compaction_enabled_cf_indices, const std::vector& handles, TransactionDB** dbptr) { assert(db != nullptr); assert(dbptr != nullptr); *dbptr = nullptr; std::unique_ptr txn_db; // txn_db owns object pointed to by the raw db pointer. switch (txn_db_options.write_policy) { case WRITE_UNPREPARED: txn_db.reset(new WriteUnpreparedTxnDB( db, PessimisticTransactionDB::ValidateTxnDBOptions(txn_db_options))); break; case WRITE_PREPARED: txn_db.reset(new WritePreparedTxnDB( db, PessimisticTransactionDB::ValidateTxnDBOptions(txn_db_options))); break; case WRITE_COMMITTED: default: txn_db.reset(new WriteCommittedTxnDB( db, PessimisticTransactionDB::ValidateTxnDBOptions(txn_db_options))); } txn_db->UpdateCFComparatorMap(handles); Status s = txn_db->Initialize(compaction_enabled_cf_indices, handles); // In case of a failure at this point, db is deleted via the txn_db destructor // and set to nullptr. if (s.ok()) { *dbptr = txn_db.release(); } else { for (auto* h : handles) { delete h; } // txn_db still owns db, and ~StackableDB() will be called when txn_db goes // out of scope, deleting the input db pointer. ROCKS_LOG_FATAL(db->GetDBOptions().info_log, "Failed to initialize txn_db: %s", s.ToString().c_str()); } return s; } } // namespace Status TransactionDB::WrapDB( // make sure this db is already opened with memtable history enabled, // auto compaction distabled and 2 phase commit enabled DB* db, const TransactionDBOptions& txn_db_options, const std::vector& compaction_enabled_cf_indices, const std::vector& handles, TransactionDB** dbptr) { return WrapAnotherDBInternal(db, txn_db_options, compaction_enabled_cf_indices, handles, dbptr); } Status TransactionDB::WrapStackableDB( // make sure this stackable_db is already opened with memtable history // enabled, auto compaction distabled and 2 phase commit enabled StackableDB* db, const TransactionDBOptions& txn_db_options, const std::vector& compaction_enabled_cf_indices, const std::vector& handles, TransactionDB** dbptr) { return WrapAnotherDBInternal(db, txn_db_options, compaction_enabled_cf_indices, handles, dbptr); } // Let LockManager know that this column family exists so it can // allocate a LockMap for it. void PessimisticTransactionDB::AddColumnFamily( const ColumnFamilyHandle* handle) { lock_manager_->AddColumnFamily(handle); } Status PessimisticTransactionDB::CreateColumnFamily( const ColumnFamilyOptions& options, const std::string& column_family_name, ColumnFamilyHandle** handle) { InstrumentedMutexLock l(&column_family_mutex_); Status s = VerifyCFOptions(options); if (!s.ok()) { return s; } s = db_->CreateColumnFamily(options, column_family_name, handle); if (s.ok()) { lock_manager_->AddColumnFamily(*handle); UpdateCFComparatorMap(*handle); } return s; } Status PessimisticTransactionDB::CreateColumnFamilies( const ColumnFamilyOptions& options, const std::vector& column_family_names, std::vector* handles) { InstrumentedMutexLock l(&column_family_mutex_); Status s = VerifyCFOptions(options); if (!s.ok()) { return s; } s = db_->CreateColumnFamilies(options, column_family_names, handles); if (s.ok()) { for (auto* handle : *handles) { lock_manager_->AddColumnFamily(handle); UpdateCFComparatorMap(handle); } } return s; } Status PessimisticTransactionDB::CreateColumnFamilies( const std::vector& column_families, std::vector* handles) { InstrumentedMutexLock l(&column_family_mutex_); for (auto& cf_desc : column_families) { Status s = VerifyCFOptions(cf_desc.options); if (!s.ok()) { return s; } } Status s = db_->CreateColumnFamilies(column_families, handles); if (s.ok()) { for (auto* handle : *handles) { lock_manager_->AddColumnFamily(handle); UpdateCFComparatorMap(handle); } } return s; } // Let LockManager know that it can deallocate the LockMap for this // column family. Status PessimisticTransactionDB::DropColumnFamily( ColumnFamilyHandle* column_family) { InstrumentedMutexLock l(&column_family_mutex_); Status s = db_->DropColumnFamily(column_family); if (s.ok()) { lock_manager_->RemoveColumnFamily(column_family); } return s; } Status PessimisticTransactionDB::DropColumnFamilies( const std::vector& column_families) { InstrumentedMutexLock l(&column_family_mutex_); Status s = db_->DropColumnFamilies(column_families); if (s.ok()) { for (auto* handle : column_families) { lock_manager_->RemoveColumnFamily(handle); } } return s; } Status PessimisticTransactionDB::TryLock(PessimisticTransaction* txn, uint32_t cfh_id, const std::string& key, bool exclusive) { return lock_manager_->TryLock(txn, cfh_id, key, GetEnv(), exclusive); } Status PessimisticTransactionDB::TryRangeLock(PessimisticTransaction* txn, uint32_t cfh_id, const Endpoint& start_endp, const Endpoint& end_endp) { return lock_manager_->TryLock(txn, cfh_id, start_endp, end_endp, GetEnv(), /*exclusive=*/true); } void PessimisticTransactionDB::UnLock(PessimisticTransaction* txn, const LockTracker& keys) { lock_manager_->UnLock(txn, keys, GetEnv()); } void PessimisticTransactionDB::UnLock(PessimisticTransaction* txn, uint32_t cfh_id, const std::string& key) { lock_manager_->UnLock(txn, cfh_id, key, GetEnv()); } // Used when wrapping DB write operations in a transaction Transaction* PessimisticTransactionDB::BeginInternalTransaction( const WriteOptions& options) { TransactionOptions txn_options; Transaction* txn = BeginTransaction(options, txn_options, nullptr); // Use default timeout for non-transactional writes txn->SetLockTimeout(txn_db_options_.default_lock_timeout); return txn; } // All user Put, Merge, Delete, and Write requests must be intercepted to make // sure that they lock all keys that they are writing to avoid causing conflicts // with any concurrent transactions. The easiest way to do this is to wrap all // write operations in a transaction. // // Put(), Merge(), and Delete() only lock a single key per call. Write() will // sort its keys before locking them. This guarantees that TransactionDB write // methods cannot deadlock with each other (but still could deadlock with a // Transaction). Status PessimisticTransactionDB::Put(const WriteOptions& options, ColumnFamilyHandle* column_family, const Slice& key, const Slice& val) { Status s = FailIfCfEnablesTs(this, column_family); if (!s.ok()) { return s; } Transaction* txn = BeginInternalTransaction(options); txn->DisableIndexing(); // Since the client didn't create a transaction, they don't care about // conflict checking for this write. So we just need to do PutUntracked(). s = txn->PutUntracked(column_family, key, val); if (s.ok()) { s = txn->Commit(); } delete txn; return s; } Status PessimisticTransactionDB::Delete(const WriteOptions& wopts, ColumnFamilyHandle* column_family, const Slice& key) { Status s = FailIfCfEnablesTs(this, column_family); if (!s.ok()) { return s; } Transaction* txn = BeginInternalTransaction(wopts); txn->DisableIndexing(); // Since the client didn't create a transaction, they don't care about // conflict checking for this write. So we just need to do // DeleteUntracked(). s = txn->DeleteUntracked(column_family, key); if (s.ok()) { s = txn->Commit(); } delete txn; return s; } Status PessimisticTransactionDB::SingleDelete(const WriteOptions& wopts, ColumnFamilyHandle* column_family, const Slice& key) { Status s = FailIfCfEnablesTs(this, column_family); if (!s.ok()) { return s; } Transaction* txn = BeginInternalTransaction(wopts); txn->DisableIndexing(); // Since the client didn't create a transaction, they don't care about // conflict checking for this write. So we just need to do // SingleDeleteUntracked(). s = txn->SingleDeleteUntracked(column_family, key); if (s.ok()) { s = txn->Commit(); } delete txn; return s; } Status PessimisticTransactionDB::Merge(const WriteOptions& options, ColumnFamilyHandle* column_family, const Slice& key, const Slice& value) { Status s = FailIfCfEnablesTs(this, column_family); if (!s.ok()) { return s; } Transaction* txn = BeginInternalTransaction(options); txn->DisableIndexing(); // Since the client didn't create a transaction, they don't care about // conflict checking for this write. So we just need to do // MergeUntracked(). s = txn->MergeUntracked(column_family, key, value); if (s.ok()) { s = txn->Commit(); } delete txn; return s; } Status PessimisticTransactionDB::Write(const WriteOptions& opts, WriteBatch* updates) { return WriteWithConcurrencyControl(opts, updates); } Status WriteCommittedTxnDB::Write(const WriteOptions& opts, WriteBatch* updates) { Status s = FailIfBatchHasTs(updates); if (!s.ok()) { return s; } if (txn_db_options_.skip_concurrency_control) { return db_impl_->Write(opts, updates); } else { return WriteWithConcurrencyControl(opts, updates); } } Status WriteCommittedTxnDB::Write( const WriteOptions& opts, const TransactionDBWriteOptimizations& optimizations, WriteBatch* updates) { Status s = FailIfBatchHasTs(updates); if (!s.ok()) { return s; } if (optimizations.skip_concurrency_control) { return db_impl_->Write(opts, updates); } else { return WriteWithConcurrencyControl(opts, updates); } } void PessimisticTransactionDB::InsertExpirableTransaction( TransactionID tx_id, PessimisticTransaction* tx) { assert(tx->GetExpirationTime() > 0); std::lock_guard lock(map_mutex_); expirable_transactions_map_.insert({tx_id, tx}); } void PessimisticTransactionDB::RemoveExpirableTransaction(TransactionID tx_id) { std::lock_guard lock(map_mutex_); expirable_transactions_map_.erase(tx_id); } bool PessimisticTransactionDB::TryStealingExpiredTransactionLocks( TransactionID tx_id) { std::lock_guard lock(map_mutex_); auto tx_it = expirable_transactions_map_.find(tx_id); if (tx_it == expirable_transactions_map_.end()) { return true; } PessimisticTransaction& tx = *(tx_it->second); return tx.TryStealingLocks(); } void PessimisticTransactionDB::ReinitializeTransaction( Transaction* txn, const WriteOptions& write_options, const TransactionOptions& txn_options) { auto txn_impl = static_cast_with_check(txn); txn_impl->Reinitialize(this, write_options, txn_options); } Transaction* PessimisticTransactionDB::GetTransactionByName( const TransactionName& name) { std::lock_guard lock(name_map_mutex_); auto it = transactions_.find(name); if (it == transactions_.end()) { return nullptr; } else { return it->second; } } void PessimisticTransactionDB::GetAllPreparedTransactions( std::vector* transv) { assert(transv); transv->clear(); std::lock_guard lock(name_map_mutex_); for (auto it = transactions_.begin(); it != transactions_.end(); ++it) { if (it->second->GetState() == Transaction::PREPARED) { transv->push_back(it->second); } } } LockManager::PointLockStatus PessimisticTransactionDB::GetLockStatusData() { return lock_manager_->GetPointLockStatus(); } std::vector PessimisticTransactionDB::GetDeadlockInfoBuffer() { return lock_manager_->GetDeadlockInfoBuffer(); } void PessimisticTransactionDB::SetDeadlockInfoBufferSize(uint32_t target_size) { lock_manager_->Resize(target_size); } void PessimisticTransactionDB::RegisterTransaction(Transaction* txn) { assert(txn); assert(txn->GetName().length() > 0); assert(GetTransactionByName(txn->GetName()) == nullptr); assert(txn->GetState() == Transaction::STARTED); std::lock_guard lock(name_map_mutex_); transactions_[txn->GetName()] = txn; } void PessimisticTransactionDB::UnregisterTransaction(Transaction* txn) { assert(txn); std::lock_guard lock(name_map_mutex_); auto it = transactions_.find(txn->GetName()); assert(it != transactions_.end()); transactions_.erase(it); } std::pair> PessimisticTransactionDB::CreateTimestampedSnapshot(TxnTimestamp ts) { if (kMaxTxnTimestamp == ts) { return std::make_pair(Status::InvalidArgument("invalid ts"), nullptr); } assert(db_impl_); return db_impl_->CreateTimestampedSnapshot(kMaxSequenceNumber, ts); } std::shared_ptr PessimisticTransactionDB::GetTimestampedSnapshot(TxnTimestamp ts) const { assert(db_impl_); return db_impl_->GetTimestampedSnapshot(ts); } void PessimisticTransactionDB::ReleaseTimestampedSnapshotsOlderThan( TxnTimestamp ts) { assert(db_impl_); db_impl_->ReleaseTimestampedSnapshotsOlderThan(ts); } Status PessimisticTransactionDB::GetTimestampedSnapshots( TxnTimestamp ts_lb, TxnTimestamp ts_ub, std::vector>& timestamped_snapshots) const { assert(db_impl_); return db_impl_->GetTimestampedSnapshots(ts_lb, ts_ub, timestamped_snapshots); } Status SnapshotCreationCallback::operator()(SequenceNumber seq, bool disable_memtable) { assert(db_impl_); assert(commit_ts_ != kMaxTxnTimestamp); const bool two_write_queues = db_impl_->immutable_db_options().two_write_queues; assert(!two_write_queues || !disable_memtable); #ifdef NDEBUG (void)two_write_queues; (void)disable_memtable; #endif const bool seq_per_batch = db_impl_->seq_per_batch(); if (!seq_per_batch) { assert(db_impl_->GetLastPublishedSequence() <= seq); } else { assert(db_impl_->GetLastPublishedSequence() < seq); } // Create a snapshot which can also be used for write conflict checking. auto ret = db_impl_->CreateTimestampedSnapshot(seq, commit_ts_); snapshot_creation_status_ = ret.first; snapshot_ = ret.second; if (snapshot_creation_status_.ok()) { assert(snapshot_); } else { assert(!snapshot_); } if (snapshot_ && snapshot_notifier_) { snapshot_notifier_->SnapshotCreated(snapshot_.get()); } return Status::OK(); } } // namespace ROCKSDB_NAMESPACE