diff --git a/db/db_impl.h b/db/db_impl.h index bc251492b7..011066bf44 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -673,10 +673,14 @@ class DBImpl : public DB { // batch which will be written to memtable later during the commit, and in // WritePrepared it is guaranteed since it will be used only for WAL markers // which will never be written to memtable. + // batch_cnt is expected to be non-zero in seq_per_batch mode and indicates + // the number of sub-patches. A sub-patch is a subset of the write batch that + // does not have duplicate keys. Status WriteImpl(const WriteOptions& options, WriteBatch* updates, WriteCallback* callback = nullptr, uint64_t* log_used = nullptr, uint64_t log_ref = 0, bool disable_memtable = false, uint64_t* seq_used = nullptr, + size_t batch_cnt = 0, PreReleaseCallback* pre_release_callback = nullptr); Status PipelinedWriteImpl(const WriteOptions& options, WriteBatch* updates, @@ -685,10 +689,13 @@ class DBImpl : public DB { bool disable_memtable = false, uint64_t* seq_used = nullptr); + // batch_cnt is expected to be non-zero in seq_per_batch mode and indicates + // the number of sub-patches. A sub-patch is a subset of the write batch that + // does not have duplicate keys. Status WriteImplWALOnly(const WriteOptions& options, WriteBatch* updates, WriteCallback* callback = nullptr, uint64_t* log_used = nullptr, uint64_t log_ref = 0, - uint64_t* seq_used = nullptr, + uint64_t* seq_used = nullptr, size_t batch_cnt = 0, PreReleaseCallback* pre_release_callback = nullptr); uint64_t FindMinLogContainingOutstandingPrep(); diff --git a/db/db_impl_write.cc b/db/db_impl_write.cc index d1a2daf66c..d6a062702c 100644 --- a/db/db_impl_write.cc +++ b/db/db_impl_write.cc @@ -64,7 +64,9 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, WriteBatch* my_batch, WriteCallback* callback, uint64_t* log_used, uint64_t log_ref, bool disable_memtable, uint64_t* seq_used, + size_t batch_cnt, PreReleaseCallback* pre_release_callback) { + assert(!seq_per_batch_ || batch_cnt != 0); if (my_batch == nullptr) { return Status::Corruption("Batch is nullptr!"); } @@ -76,6 +78,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, "pipelined_writes is not compatible with concurrent prepares"); } if (seq_per_batch_ && immutable_db_options_.enable_pipelined_write) { + // TODO(yiwu): update pipeline write with seq_per_batch and batch_cnt return Status::NotSupported( "pipelined_writes is not compatible with seq_per_batch"); } @@ -93,7 +96,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, if (two_write_queues_ && disable_memtable) { return WriteImplWALOnly(write_options, my_batch, callback, log_used, - log_ref, seq_used, pre_release_callback); + log_ref, seq_used, batch_cnt, pre_release_callback); } if (immutable_db_options_.enable_pipelined_write) { @@ -103,7 +106,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, PERF_TIMER_GUARD(write_pre_and_post_process_time); WriteThread::Writer w(write_options, my_batch, callback, log_ref, - disable_memtable, pre_release_callback); + disable_memtable, batch_cnt, pre_release_callback); if (!write_options.disableWAL) { RecordTick(stats_, WRITE_WITH_WAL); @@ -122,7 +125,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, w.status = WriteBatchInternal::InsertInto( &w, w.sequence, &column_family_memtables, &flush_scheduler_, write_options.ignore_missing_column_families, 0 /*log_number*/, this, - true /*concurrent_memtable_writes*/, seq_per_batch_); + true /*concurrent_memtable_writes*/, seq_per_batch_, w.batch_cnt); } if (write_thread_.CompleteParallelMemTableWriter(&w)) { @@ -214,7 +217,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, uint64_t total_byte_size = 0; for (auto* writer : write_group) { if (writer->CheckCallback(this)) { - valid_batches++; + valid_batches += writer->batch_cnt; if (writer->ShouldWriteToMemtable()) { total_count += WriteBatchInternal::Count(writer->batch); parallel = parallel && !writer->batch->HasMerge(); @@ -303,7 +306,8 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, } writer->sequence = next_sequence; if (seq_per_batch_) { - next_sequence++; + assert(writer->batch_cnt); + next_sequence += writer->batch_cnt; } else if (writer->ShouldWriteToMemtable()) { next_sequence += WriteBatchInternal::Count(writer->batch); } @@ -323,7 +327,8 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, w.status = WriteBatchInternal::InsertInto( &w, w.sequence, &column_family_memtables, &flush_scheduler_, write_options.ignore_missing_column_families, 0 /*log_number*/, - this, true /*concurrent_memtable_writes*/, seq_per_batch_); + this, true /*concurrent_memtable_writes*/, seq_per_batch_, + w.batch_cnt); } } if (seq_used != nullptr) { @@ -515,12 +520,13 @@ Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options, Status DBImpl::WriteImplWALOnly(const WriteOptions& write_options, WriteBatch* my_batch, WriteCallback* callback, uint64_t* log_used, uint64_t log_ref, - uint64_t* seq_used, + uint64_t* seq_used, size_t batch_cnt, PreReleaseCallback* pre_release_callback) { Status status; PERF_TIMER_GUARD(write_pre_and_post_process_time); WriteThread::Writer w(write_options, my_batch, callback, log_ref, - true /* disable_memtable */, pre_release_callback); + true /* disable_memtable */, batch_cnt, + pre_release_callback); RecordTick(stats_, WRITE_WITH_WAL); StopWatch write_sw(env_, immutable_db_options_.statistics.get(), DB_WRITE); @@ -576,7 +582,15 @@ Status DBImpl::WriteImplWALOnly(const WriteOptions& write_options, PERF_TIMER_GUARD(write_wal_time); // LastAllocatedSequence is increased inside WriteToWAL under // wal_write_mutex_ to ensure ordered events in WAL - size_t seq_inc = seq_per_batch_ ? write_group.size : 0 /*total_count*/; + size_t seq_inc = 0 /* total_count */; + if (seq_per_batch_) { + size_t total_batch_cnt = 0; + for (auto* writer : write_group) { + assert(writer->batch_cnt); + total_batch_cnt += writer->batch_cnt; + } + seq_inc = total_batch_cnt; + } if (!write_options.disableWAL) { status = ConcurrentWriteToWAL(write_group, log_used, &last_sequence, seq_inc); @@ -591,7 +605,8 @@ Status DBImpl::WriteImplWALOnly(const WriteOptions& write_options, } writer->sequence = curr_seq; if (seq_per_batch_) { - curr_seq++; + assert(writer->batch_cnt); + curr_seq += writer->batch_cnt; } // else seq advances only by memtable writes } diff --git a/db/write_batch.cc b/db/write_batch.cc index 3e82f836ce..7963c326bb 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -402,14 +402,21 @@ Status WriteBatch::Iterate(Handler* handler) const { bool empty_batch = true; int found = 0; Status s; - while (s.ok() && !input.empty() && handler->Continue()) { - char tag = 0; - uint32_t column_family = 0; // default + char tag = 0; + uint32_t column_family = 0; // default + while ((s.ok() || s.IsTryAgain()) && !input.empty() && handler->Continue()) { + if (!s.IsTryAgain()) { + tag = 0; + column_family = 0; // default - s = ReadRecordFromWriteBatch(&input, &tag, &column_family, &key, &value, - &blob, &xid); - if (!s.ok()) { - return s; + s = ReadRecordFromWriteBatch(&input, &tag, &column_family, &key, &value, + &blob, &xid); + if (!s.ok()) { + return s; + } + } else { + assert(s.IsTryAgain()); + s = Status::OK(); } switch (tag) { @@ -418,47 +425,59 @@ Status WriteBatch::Iterate(Handler* handler) const { assert(content_flags_.load(std::memory_order_relaxed) & (ContentFlags::DEFERRED | ContentFlags::HAS_PUT)); s = handler->PutCF(column_family, key, value); - empty_batch = false; - found++; + if (s.ok()) { + empty_batch = false; + found++; + } break; case kTypeColumnFamilyDeletion: case kTypeDeletion: assert(content_flags_.load(std::memory_order_relaxed) & (ContentFlags::DEFERRED | ContentFlags::HAS_DELETE)); s = handler->DeleteCF(column_family, key); - empty_batch = false; - found++; + if (s.ok()) { + empty_batch = false; + found++; + } break; case kTypeColumnFamilySingleDeletion: case kTypeSingleDeletion: assert(content_flags_.load(std::memory_order_relaxed) & (ContentFlags::DEFERRED | ContentFlags::HAS_SINGLE_DELETE)); s = handler->SingleDeleteCF(column_family, key); - empty_batch = false; - found++; + if (s.ok()) { + empty_batch = false; + found++; + } break; case kTypeColumnFamilyRangeDeletion: case kTypeRangeDeletion: assert(content_flags_.load(std::memory_order_relaxed) & (ContentFlags::DEFERRED | ContentFlags::HAS_DELETE_RANGE)); s = handler->DeleteRangeCF(column_family, key, value); - empty_batch = false; - found++; + if (s.ok()) { + empty_batch = false; + found++; + } break; case kTypeColumnFamilyMerge: case kTypeMerge: assert(content_flags_.load(std::memory_order_relaxed) & (ContentFlags::DEFERRED | ContentFlags::HAS_MERGE)); s = handler->MergeCF(column_family, key, value); - empty_batch = false; - found++; + if (s.ok()) { + empty_batch = false; + found++; + } break; case kTypeColumnFamilyBlobIndex: case kTypeBlobIndex: assert(content_flags_.load(std::memory_order_relaxed) & (ContentFlags::DEFERRED | ContentFlags::HAS_BLOB_INDEX)); s = handler->PutBlobIndexCF(column_family, key, value); - found++; + if (s.ok()) { + found++; + } break; case kTypeLogData: handler->LogData(blob); @@ -1084,12 +1103,23 @@ class MemTableInserter : public WriteBatch::Handler { MaybeAdvanceSeq(); return seek_status; } + Status ret_status; MemTable* mem = cf_mems_->GetMemTable(); auto* moptions = mem->GetImmutableMemTableOptions(); + // inplace_update_support is inconsistent with snapshots, and therefore with + // any kind of transactions including the ones that use seq_per_batch + assert(!seq_per_batch_ || !moptions->inplace_update_support); if (!moptions->inplace_update_support) { - mem->Add(sequence_, value_type, key, value, concurrent_memtable_writes_, - get_post_process_info(mem)); + bool mem_res = + mem->Add(sequence_, value_type, key, value, + concurrent_memtable_writes_, get_post_process_info(mem)); + if (!mem_res) { + assert(seq_per_batch_); + ret_status = Status::TryAgain("key+seq exists"); + const bool BATCH_BOUNDRY = true; + MaybeAdvanceSeq(BATCH_BOUNDRY); + } } else if (moptions->inplace_callback == nullptr) { assert(!concurrent_memtable_writes_); mem->Update(sequence_, key, value); @@ -1125,11 +1155,15 @@ class MemTableInserter : public WriteBatch::Handler { value, &merged_value); if (status == UpdateStatus::UPDATED_INPLACE) { // prev_value is updated in-place with final value. - mem->Add(sequence_, value_type, key, Slice(prev_buffer, prev_size)); + bool mem_res __attribute__((__unused__)) = mem->Add( + sequence_, value_type, key, Slice(prev_buffer, prev_size)); + assert(mem_res); RecordTick(moptions->statistics, NUMBER_KEYS_WRITTEN); } else if (status == UpdateStatus::UPDATED) { // merged_value contains the final value. - mem->Add(sequence_, value_type, key, Slice(merged_value)); + bool mem_res __attribute__((__unused__)) = + mem->Add(sequence_, value_type, key, Slice(merged_value)); + assert(mem_res); RecordTick(moptions->statistics, NUMBER_KEYS_WRITTEN); } } @@ -1139,7 +1173,7 @@ class MemTableInserter : public WriteBatch::Handler { // in memtable add/update. MaybeAdvanceSeq(); CheckMemtableFull(); - return Status::OK(); + return ret_status; } virtual Status PutCF(uint32_t column_family_id, const Slice& key, @@ -1149,12 +1183,20 @@ class MemTableInserter : public WriteBatch::Handler { Status DeleteImpl(uint32_t column_family_id, const Slice& key, const Slice& value, ValueType delete_type) { + Status ret_status; MemTable* mem = cf_mems_->GetMemTable(); - mem->Add(sequence_, delete_type, key, value, concurrent_memtable_writes_, - get_post_process_info(mem)); + bool mem_res = + mem->Add(sequence_, delete_type, key, value, + concurrent_memtable_writes_, get_post_process_info(mem)); + if (!mem_res) { + assert(seq_per_batch_); + ret_status = Status::TryAgain("key+seq exists"); + const bool BATCH_BOUNDRY = true; + MaybeAdvanceSeq(BATCH_BOUNDRY); + } MaybeAdvanceSeq(); CheckMemtableFull(); - return Status::OK(); + return ret_status; } virtual Status DeleteCF(uint32_t column_family_id, @@ -1246,6 +1288,7 @@ class MemTableInserter : public WriteBatch::Handler { return seek_status; } + Status ret_status; MemTable* mem = cf_mems_->GetMemTable(); auto* moptions = mem->GetImmutableMemTableOptions(); bool perform_merge = false; @@ -1301,18 +1344,30 @@ class MemTableInserter : public WriteBatch::Handler { perform_merge = false; } else { // 3) Add value to memtable - mem->Add(sequence_, kTypeValue, key, new_value); + bool mem_res = mem->Add(sequence_, kTypeValue, key, new_value); + if (!mem_res) { + assert(seq_per_batch_); + ret_status = Status::TryAgain("key+seq exists"); + const bool BATCH_BOUNDRY = true; + MaybeAdvanceSeq(BATCH_BOUNDRY); + } } } if (!perform_merge) { // Add merge operator to memtable - mem->Add(sequence_, kTypeMerge, key, value); + bool mem_res = mem->Add(sequence_, kTypeMerge, key, value); + if (!mem_res) { + assert(seq_per_batch_); + ret_status = Status::TryAgain("key+seq exists"); + const bool BATCH_BOUNDRY = true; + MaybeAdvanceSeq(BATCH_BOUNDRY); + } } MaybeAdvanceSeq(); CheckMemtableFull(); - return Status::OK(); + return ret_status; } virtual Status PutBlobIndexCF(uint32_t column_family_id, const Slice& key, @@ -1496,6 +1551,8 @@ Status WriteBatchInternal::InsertInto( if (!w->status.ok()) { return w->status; } + assert(!seq_per_batch || w->batch_cnt != 0); + assert(!seq_per_batch || inserter.sequence() - w->sequence == w->batch_cnt); } return Status::OK(); } @@ -1504,7 +1561,7 @@ Status WriteBatchInternal::InsertInto( WriteThread::Writer* writer, SequenceNumber sequence, ColumnFamilyMemTables* memtables, FlushScheduler* flush_scheduler, bool ignore_missing_column_families, uint64_t log_number, DB* db, - bool concurrent_memtable_writes, bool seq_per_batch) { + bool concurrent_memtable_writes, bool seq_per_batch, size_t batch_cnt) { assert(writer->ShouldWriteToMemtable()); MemTableInserter inserter(sequence, memtables, flush_scheduler, ignore_missing_column_families, log_number, db, @@ -1513,6 +1570,8 @@ Status WriteBatchInternal::InsertInto( SetSequence(writer->batch, sequence); inserter.set_log_number_ref(writer->log_ref); Status s = writer->batch->Iterate(&inserter); + assert(!seq_per_batch || batch_cnt != 0); + assert(!seq_per_batch || inserter.sequence() - sequence == batch_cnt); if (concurrent_memtable_writes) { inserter.PostProcess(); } diff --git a/db/write_batch_internal.h b/db/write_batch_internal.h index a352fc89c1..aeaf0e1ea9 100644 --- a/db/write_batch_internal.h +++ b/db/write_batch_internal.h @@ -182,7 +182,7 @@ class WriteBatchInternal { bool ignore_missing_column_families = false, uint64_t log_number = 0, DB* db = nullptr, bool concurrent_memtable_writes = false, - bool seq_per_batch = false); + bool seq_per_batch = false, size_t batch_cnt = 0); static Status Append(WriteBatch* dst, const WriteBatch* src, const bool WAL_only = false); diff --git a/db/write_callback_test.cc b/db/write_callback_test.cc index 036d51293c..7d04d50173 100644 --- a/db/write_callback_test.cc +++ b/db/write_callback_test.cc @@ -291,7 +291,7 @@ TEST_F(WriteCallbackTest, WriteWithCallbackTest) { woptions.disableWAL = !enable_WAL; woptions.sync = enable_WAL; Status s; - if (seq_per_batch && two_queues) { + if (seq_per_batch) { class PublishSeqCallback : public PreReleaseCallback { public: PublishSeqCallback(DBImpl* db_impl_in) @@ -302,9 +302,13 @@ TEST_F(WriteCallbackTest, WriteWithCallbackTest) { } DBImpl* db_impl_; } publish_seq_callback(db_impl); - s = db_impl->WriteImpl(woptions, &write_op.write_batch_, - &write_op.callback_, nullptr, 0, false, - nullptr, &publish_seq_callback); + // seq_per_batch requires a natural batch separator or Noop + WriteBatchInternal::InsertNoop(&write_op.write_batch_); + const size_t ONE_BATCH = 1; + s = db_impl->WriteImpl( + woptions, &write_op.write_batch_, &write_op.callback_, + nullptr, 0, false, nullptr, ONE_BATCH, + two_queues ? &publish_seq_callback : nullptr); } else { s = db_impl->WriteWithCallback( woptions, &write_op.write_batch_, &write_op.callback_); diff --git a/db/write_thread.h b/db/write_thread.h index 3d5e2f9e6e..518b542992 100644 --- a/db/write_thread.h +++ b/db/write_thread.h @@ -118,6 +118,7 @@ class WriteThread { bool no_slowdown; bool disable_wal; bool disable_memtable; + size_t batch_cnt; // if non-zero, number of sub-batches in the write batch PreReleaseCallback* pre_release_callback; uint64_t log_used; // log number that this batch was inserted into uint64_t log_ref; // log number that memtable insert should reference @@ -128,6 +129,7 @@ class WriteThread { SequenceNumber sequence; // the sequence number to use for the first key Status status; // status of memtable inserter Status callback_status; // status returned by callback->Callback() + std::aligned_storage::type state_mutex_bytes; std::aligned_storage::type state_cv_bytes; Writer* link_older; // read/write only before linking, or as leader @@ -139,6 +141,7 @@ class WriteThread { no_slowdown(false), disable_wal(false), disable_memtable(false), + batch_cnt(0), pre_release_callback(nullptr), log_used(0), log_ref(0), @@ -152,12 +155,14 @@ class WriteThread { Writer(const WriteOptions& write_options, WriteBatch* _batch, WriteCallback* _callback, uint64_t _log_ref, bool _disable_memtable, + size_t _batch_cnt = 0, PreReleaseCallback* _pre_release_callback = nullptr) : batch(_batch), sync(write_options.sync), no_slowdown(write_options.no_slowdown), disable_wal(write_options.disableWAL), disable_memtable(_disable_memtable), + batch_cnt(_batch_cnt), pre_release_callback(_pre_release_callback), log_used(0), log_ref(_log_ref), diff --git a/include/rocksdb/utilities/write_batch_with_index.h b/include/rocksdb/utilities/write_batch_with_index.h index f49cb3888c..84b2938157 100644 --- a/include/rocksdb/utilities/write_batch_with_index.h +++ b/include/rocksdb/utilities/write_batch_with_index.h @@ -228,13 +228,8 @@ class WriteBatchWithIndex : public WriteBatchBase { private: friend class WritePreparedTxn; - // TODO(myabandeh): this is hackish, non-efficient solution to enable the e2e - // unit tests. Replace it with a proper solution. Collapse the WriteBatch to - // remove the duplicate keys. The index will not be updated after this. - // Returns false if collapse was not necessary - bool Collapse(); - void DisableDuplicateMergeKeys() { allow_dup_merge_ = false; } - bool allow_dup_merge_ = true; + // Returns true if there has been duplicate keys in the batch. + bool HasDuplicateKeys(); Status GetFromBatchAndDB(DB* db, const ReadOptions& read_options, ColumnFamilyHandle* column_family, const Slice& key, diff --git a/utilities/transactions/pessimistic_transaction.cc b/utilities/transactions/pessimistic_transaction.cc index 203dd2071c..3a7d801aa8 100644 --- a/utilities/transactions/pessimistic_transaction.cc +++ b/utilities/transactions/pessimistic_transaction.cc @@ -307,7 +307,7 @@ Status WriteCommittedTxn::CommitWithoutPrepareInternal() { return s; } -Status WriteCommittedTxn::CommitBatchInternal(WriteBatch* batch) { +Status WriteCommittedTxn::CommitBatchInternal(WriteBatch* batch, size_t) { Status s = db_->Write(write_options_, batch); return s; } diff --git a/utilities/transactions/pessimistic_transaction.h b/utilities/transactions/pessimistic_transaction.h index 51776b8001..0434a69db8 100644 --- a/utilities/transactions/pessimistic_transaction.h +++ b/utilities/transactions/pessimistic_transaction.h @@ -121,7 +121,10 @@ class PessimisticTransaction : public TransactionBaseImpl { virtual Status CommitWithoutPrepareInternal() = 0; - virtual Status CommitBatchInternal(WriteBatch* batch) = 0; + // batch_cnt if non-zero is the number of sub-batches. A sub-batch is a batch + // with no duplicate keys. If zero, then the number of sub-batches is unknown. + virtual Status CommitBatchInternal(WriteBatch* batch, + size_t batch_cnt = 0) = 0; virtual Status CommitInternal() = 0; @@ -204,7 +207,7 @@ class WriteCommittedTxn : public PessimisticTransaction { Status CommitWithoutPrepareInternal() override; - Status CommitBatchInternal(WriteBatch* batch) override; + Status CommitBatchInternal(WriteBatch* batch, size_t batch_cnt) override; Status CommitInternal() override; diff --git a/utilities/transactions/pessimistic_transaction_db.cc b/utilities/transactions/pessimistic_transaction_db.cc index 9fff27e77c..0aad76a502 100644 --- a/utilities/transactions/pessimistic_transaction_db.cc +++ b/utilities/transactions/pessimistic_transaction_db.cc @@ -82,12 +82,29 @@ PessimisticTransactionDB::~PessimisticTransactionDB() { } } +Status PessimisticTransactionDB::VerifyCFOptions(const ColumnFamilyOptions&) { + return Status::OK(); +} + Status PessimisticTransactionDB::Initialize( const std::vector& compaction_enabled_cf_indices, const std::vector& handles) { for (auto cf_ptr : handles) { AddColumnFamily(cf_ptr); } + // Verify cf options + for (auto handle : handles) { + ColumnFamilyDescriptor cfd; + Status s = handle->GetDescriptor(&cfd); + if (!s.ok()) { + return s; + } + s = VerifyCFOptions(cfd.options); + if (!s.ok()) { + return s; + } + } + // Re-enable compaction for the column families that initially had // compaction enabled. std::vector compaction_enabled_cf_handles; @@ -158,6 +175,30 @@ TransactionDBOptions PessimisticTransactionDB::ValidateTxnDBOptions( return validated; } +void PessimisticTransactionDB::UpdateCFComparatorMap( + const std::vector& handles) { + auto cf_map = new std::map(); + for (auto h : handles) { + auto id = h->GetID(); + const Comparator* comparator = h->GetComparator(); + (*cf_map)[id] = comparator; + } + cf_map_.store(cf_map); + cf_map_gc_.reset(cf_map); +} + +void PessimisticTransactionDB::UpdateCFComparatorMap( + const ColumnFamilyHandle* h) { + auto old_cf_map_ptr = cf_map_.load(); + assert(old_cf_map_ptr); + auto cf_map = new std::map(*old_cf_map_ptr); + auto id = h->GetID(); + const Comparator* comparator = h->GetComparator(); + (*cf_map)[id] = comparator; + cf_map_.store(cf_map); + cf_map_gc_.reset(cf_map); +} + Status TransactionDB::Open(const Options& options, const TransactionDBOptions& txn_db_options, const std::string& dbname, TransactionDB** dbptr) { @@ -245,6 +286,7 @@ Status TransactionDB::WrapDB( txn_db = new WriteCommittedTxnDB( db, PessimisticTransactionDB::ValidateTxnDBOptions(txn_db_options)); } + txn_db->UpdateCFComparatorMap(handles); *dbptr = txn_db; Status s = txn_db->Initialize(compaction_enabled_cf_indices, handles); return s; @@ -270,6 +312,7 @@ Status TransactionDB::WrapStackableDB( txn_db = new WriteCommittedTxnDB( db, PessimisticTransactionDB::ValidateTxnDBOptions(txn_db_options)); } + txn_db->UpdateCFComparatorMap(handles); *dbptr = txn_db; Status s = txn_db->Initialize(compaction_enabled_cf_indices, handles); return s; @@ -286,10 +329,15 @@ Status PessimisticTransactionDB::CreateColumnFamily( const ColumnFamilyOptions& options, const std::string& column_family_name, ColumnFamilyHandle** handle) { InstrumentedMutexLock l(&column_family_mutex_); + Status s = VerifyCFOptions(options); + if (!s.ok()) { + return s; + } - Status s = db_->CreateColumnFamily(options, column_family_name, handle); + s = db_->CreateColumnFamily(options, column_family_name, handle); if (s.ok()) { lock_mgr_.AddColumnFamily((*handle)->GetID()); + UpdateCFComparatorMap(*handle); } return s; @@ -439,8 +487,6 @@ Status PessimisticTransactionDB::Write(const WriteOptions& opts, // concurrent transactions. Transaction* txn = BeginInternalTransaction(opts); txn->DisableIndexing(); - // TODO(myabandeh): indexing being disabled we need another machanism to - // detect duplicattes in the input patch auto txn_impl = static_cast_with_check(txn); diff --git a/utilities/transactions/pessimistic_transaction_db.h b/utilities/transactions/pessimistic_transaction_db.h index 4311e88c35..9b4ad5618e 100644 --- a/utilities/transactions/pessimistic_transaction_db.h +++ b/utilities/transactions/pessimistic_transaction_db.h @@ -113,15 +113,27 @@ class PessimisticTransactionDB : public TransactionDB { std::vector GetDeadlockInfoBuffer() override; void SetDeadlockInfoBufferSize(uint32_t target_size) override; + void UpdateCFComparatorMap(const std::vector& handles); + void UpdateCFComparatorMap(const ColumnFamilyHandle* handle); + std::map* GetCFComparatorMap() { + return cf_map_.load(); + } + protected: DBImpl* db_impl_; std::shared_ptr info_log_; const TransactionDBOptions txn_db_options_; + // A cache of the cf comparators + std::atomic*> cf_map_; + // GC of the object above + std::unique_ptr> cf_map_gc_; void ReinitializeTransaction( Transaction* txn, const WriteOptions& write_options, const TransactionOptions& txn_options = TransactionOptions()); + virtual Status VerifyCFOptions(const ColumnFamilyOptions& cf_options); + private: friend class WritePreparedTxnDB; friend class WritePreparedTxnDBMock; diff --git a/utilities/transactions/transaction_base.h b/utilities/transactions/transaction_base.h index 05dca1da4a..d336eb3a78 100644 --- a/utilities/transactions/transaction_base.h +++ b/utilities/transactions/transaction_base.h @@ -232,7 +232,7 @@ class TransactionBaseImpl : public Transaction { // iterates over the given batch and makes the appropriate inserts. // used for rebuilding prepared transactions after recovery. - Status RebuildFromWriteBatch(WriteBatch* src_batch) override; + virtual Status RebuildFromWriteBatch(WriteBatch* src_batch) override; WriteBatch* GetCommitTimeWriteBatch() override; diff --git a/utilities/transactions/transaction_test.cc b/utilities/transactions/transaction_test.cc index 129e9f5764..d80dd05d04 100644 --- a/utilities/transactions/transaction_test.cc +++ b/utilities/transactions/transaction_test.cc @@ -70,6 +70,25 @@ TEST_P(TransactionTest, DoubleEmptyWrite) { ASSERT_OK(db->Write(write_options, &batch)); ASSERT_OK(db->Write(write_options, &batch)); + + // Also test committing empty transactions in 2PC + TransactionOptions txn_options; + Transaction* txn0 = db->BeginTransaction(write_options, txn_options); + ASSERT_OK(txn0->SetName("xid")); + ASSERT_OK(txn0->Prepare()); + ASSERT_OK(txn0->Commit()); + delete txn0; + + // Also test that it works during recovery + txn0 = db->BeginTransaction(write_options, txn_options); + ASSERT_OK(txn0->SetName("xid2")); + txn0->Put(Slice("foo0"), Slice("bar0a")); + ASSERT_OK(txn0->Prepare()); + delete txn0; + ASSERT_OK(ReOpenNoDelete()); + txn0 = db->GetTransactionByName("xid2"); + ASSERT_OK(txn0->Commit()); + delete txn0; } TEST_P(TransactionTest, SuccessTest) { @@ -2874,12 +2893,8 @@ TEST_P(TransactionTest, UntrackedWrites) { // Untracked writes should succeed even though key was written after snapshot s = txn->PutUntracked("untracked", "1"); ASSERT_OK(s); - if (txn_db_options.write_policy != WRITE_PREPARED) { - // WRITE_PREPARED does not currently support dup merge keys. - // TODO(myabandeh): remove this if-then when the support is added - s = txn->MergeUntracked("untracked", "2"); - ASSERT_OK(s); - } + s = txn->MergeUntracked("untracked", "2"); + ASSERT_OK(s); s = txn->DeleteUntracked("untracked"); ASSERT_OK(s); @@ -4250,11 +4265,6 @@ TEST_P(TransactionTest, SingleDeleteTest) { } TEST_P(TransactionTest, MergeTest) { - if (txn_db_options.write_policy == WRITE_PREPARED) { - // WRITE_PREPARED does not currently support dup merge keys. - // TODO(myabandeh): remove this if-then when the support is added - return; - } WriteOptions write_options; ReadOptions read_options; string value; @@ -4978,65 +4988,273 @@ TEST_P(TransactionTest, SeqAdvanceTest) { } // Test that the transactional db can handle duplicate keys in the write batch -TEST_P(TransactionTest, DuplicateKeyTest) { +TEST_P(TransactionTest, DuplicateKeys) { + ColumnFamilyOptions cf_options; + std::string cf_name = "two"; + ColumnFamilyHandle* cf_handle = nullptr; + { + db->CreateColumnFamily(cf_options, cf_name, &cf_handle); + WriteOptions write_options; + WriteBatch batch; + batch.Put(Slice("key"), Slice("value")); + batch.Put(Slice("key2"), Slice("value2")); + // duplicate the keys + batch.Put(Slice("key"), Slice("value3")); + // duplicate the 2nd key. It should not be counted duplicate since a + // sub-patch is cut after the last duplicate. + batch.Put(Slice("key2"), Slice("value4")); + // duplicate the keys but in a different cf. It should not be counted as + // duplicate keys + batch.Put(cf_handle, Slice("key"), Slice("value5")); + + ASSERT_OK(db->Write(write_options, &batch)); + + ReadOptions ropt; + PinnableSlice pinnable_val; + auto s = db->Get(ropt, db->DefaultColumnFamily(), "key", &pinnable_val); + ASSERT_OK(s); + ASSERT_TRUE(pinnable_val == ("value3")); + s = db->Get(ropt, db->DefaultColumnFamily(), "key2", &pinnable_val); + ASSERT_OK(s); + ASSERT_TRUE(pinnable_val == ("value4")); + s = db->Get(ropt, cf_handle, "key", &pinnable_val); + ASSERT_OK(s); + ASSERT_TRUE(pinnable_val == ("value5")); + + delete cf_handle; + } + + // Test with non-bytewise comparator + { + // A comparator that uses only the first three bytes + class ThreeBytewiseComparator : public Comparator { + public: + ThreeBytewiseComparator() {} + virtual const char* Name() const override { + return "test.ThreeBytewiseComparator"; + } + virtual int Compare(const Slice& a, const Slice& b) const override { + Slice na = Slice(a.data(), a.size() < 3 ? a.size() : 3); + Slice nb = Slice(b.data(), b.size() < 3 ? b.size() : 3); + return na.compare(nb); + } + virtual bool Equal(const Slice& a, const Slice& b) const override { + Slice na = Slice(a.data(), a.size() < 3 ? a.size() : 3); + Slice nb = Slice(b.data(), b.size() < 3 ? b.size() : 3); + return na == nb; + } + // This methods below dont seem relevant to this test. Implement them if + // proven othersize. + void FindShortestSeparator(std::string* start, + const Slice& limit) const override { + const Comparator* bytewise_comp = BytewiseComparator(); + bytewise_comp->FindShortestSeparator(start, limit); + } + void FindShortSuccessor(std::string* key) const override { + const Comparator* bytewise_comp = BytewiseComparator(); + bytewise_comp->FindShortSuccessor(key); + } + }; + ReOpen(); + std::unique_ptr comp_gc(new ThreeBytewiseComparator()); + cf_options.comparator = comp_gc.get(); + ASSERT_OK(db->CreateColumnFamily(cf_options, cf_name, &cf_handle)); + WriteOptions write_options; + WriteBatch batch; + batch.Put(cf_handle, Slice("key"), Slice("value")); + // The first three bytes are the same, do it must be counted as duplicate + batch.Put(cf_handle, Slice("key2"), Slice("value2")); + ASSERT_OK(db->Write(write_options, &batch)); + + // The value must be the most recent value for all the keys equal to "key", + // including "key2" + ReadOptions ropt; + PinnableSlice pinnable_val; + ASSERT_OK(db->Get(ropt, cf_handle, "key", &pinnable_val)); + ASSERT_TRUE(pinnable_val == ("value2")); + + // Test duplicate keys with rollback + TransactionOptions txn_options; + Transaction* txn0 = db->BeginTransaction(write_options, txn_options); + ASSERT_OK(txn0->SetName("xid")); + ASSERT_OK(txn0->Put(cf_handle, Slice("key3"), Slice("value3"))); + ASSERT_OK(txn0->Merge(cf_handle, Slice("key4"), Slice("value4"))); + ASSERT_OK(txn0->Rollback()); + ASSERT_OK(db->Get(ropt, cf_handle, "key5", &pinnable_val)); + ASSERT_TRUE(pinnable_val == ("value2")); + delete txn0; + + delete cf_handle; + cf_options.comparator = BytewiseComparator(); + } + for (bool do_prepare : {true, false}) { + for (bool do_rollback : {true, false}) { + for (bool with_commit_batch : {true, false}) { + if (with_commit_batch && !do_prepare) { + continue; + } + if (with_commit_batch && do_rollback) { + continue; + } + ReOpen(); + db->CreateColumnFamily(cf_options, cf_name, &cf_handle); + TransactionOptions txn_options; + txn_options.use_only_the_last_commit_time_batch_for_recovery = false; + WriteOptions write_options; + Transaction* txn0 = db->BeginTransaction(write_options, txn_options); + auto s = txn0->SetName("xid"); + ASSERT_OK(s); + s = txn0->Put(Slice("foo0"), Slice("bar0a")); + ASSERT_OK(s); + s = txn0->Put(Slice("foo0"), Slice("bar0b")); + ASSERT_OK(s); + s = txn0->Put(Slice("foo1"), Slice("bar1")); + ASSERT_OK(s); + s = txn0->Merge(Slice("foo2"), Slice("bar2a")); + ASSERT_OK(s); + // Repeat a key after the start of a sub-patch. This should not cause a + // duplicate in the most recent sub-patch and hence not creating a new + // sub-patch. + s = txn0->Put(Slice("foo0"), Slice("bar0c")); + ASSERT_OK(s); + s = txn0->Merge(Slice("foo2"), Slice("bar2b")); + ASSERT_OK(s); + // duplicate the keys but in a different cf. It should not be counted as + // duplicate. + s = txn0->Put(cf_handle, Slice("foo0"), Slice("bar0-cf1")); + ASSERT_OK(s); + s = txn0->Put(Slice("foo3"), Slice("bar3")); + ASSERT_OK(s); + s = txn0->Merge(Slice("foo3"), Slice("bar3")); + ASSERT_OK(s); + s = txn0->Put(Slice("foo4"), Slice("bar4")); + ASSERT_OK(s); + s = txn0->Delete(Slice("foo4")); + ASSERT_OK(s); + s = txn0->SingleDelete(Slice("foo4")); + ASSERT_OK(s); + if (do_prepare) { + s = txn0->Prepare(); + ASSERT_OK(s); + } + if (do_rollback) { + // Test rolling back the batch with duplicates + s = txn0->Rollback(); + ASSERT_OK(s); + } else { + if (with_commit_batch) { + assert(do_prepare); + auto cb = txn0->GetCommitTimeWriteBatch(); + // duplicate a key in the original batch + // TODO(myabandeh): the behavior of GetCommitTimeWriteBatch + // conflicting with the prepared batch is currently undefined and + // gives different results in different implementations. + + // s = cb->Put(Slice("foo0"), Slice("bar0d")); + // ASSERT_OK(s); + // add a new duplicate key + s = cb->Put(Slice("foo6"), Slice("bar6a")); + ASSERT_OK(s); + s = cb->Put(Slice("foo6"), Slice("bar6b")); + ASSERT_OK(s); + // add a duplicate key that is removed in the same batch + s = cb->Put(Slice("foo7"), Slice("bar7a")); + ASSERT_OK(s); + s = cb->Delete(Slice("foo7")); + ASSERT_OK(s); + } + s = txn0->Commit(); + ASSERT_OK(s); + } + if (!do_prepare && !do_rollback) { + auto pdb = reinterpret_cast(db); + pdb->UnregisterTransaction(txn0); + } + delete txn0; + ReadOptions ropt; + PinnableSlice pinnable_val; + + if (do_rollback) { + s = db->Get(ropt, db->DefaultColumnFamily(), "foo0", &pinnable_val); + ASSERT_TRUE(s.IsNotFound()); + s = db->Get(ropt, cf_handle, "foo0", &pinnable_val); + ASSERT_TRUE(s.IsNotFound()); + s = db->Get(ropt, db->DefaultColumnFamily(), "foo1", &pinnable_val); + ASSERT_TRUE(s.IsNotFound()); + s = db->Get(ropt, db->DefaultColumnFamily(), "foo2", &pinnable_val); + ASSERT_TRUE(s.IsNotFound()); + s = db->Get(ropt, db->DefaultColumnFamily(), "foo3", &pinnable_val); + ASSERT_TRUE(s.IsNotFound()); + s = db->Get(ropt, db->DefaultColumnFamily(), "foo4", &pinnable_val); + ASSERT_TRUE(s.IsNotFound()); + } else { + s = db->Get(ropt, db->DefaultColumnFamily(), "foo0", &pinnable_val); + ASSERT_OK(s); + ASSERT_TRUE(pinnable_val == ("bar0c")); + s = db->Get(ropt, cf_handle, "foo0", &pinnable_val); + ASSERT_OK(s); + ASSERT_TRUE(pinnable_val == ("bar0-cf1")); + s = db->Get(ropt, db->DefaultColumnFamily(), "foo1", &pinnable_val); + ASSERT_OK(s); + ASSERT_TRUE(pinnable_val == ("bar1")); + s = db->Get(ropt, db->DefaultColumnFamily(), "foo2", &pinnable_val); + ASSERT_OK(s); + ASSERT_TRUE(pinnable_val == ("bar2a,bar2b")); + s = db->Get(ropt, db->DefaultColumnFamily(), "foo3", &pinnable_val); + ASSERT_OK(s); + ASSERT_TRUE(pinnable_val == ("bar3,bar3")); + s = db->Get(ropt, db->DefaultColumnFamily(), "foo4", &pinnable_val); + ASSERT_TRUE(s.IsNotFound()); + if (with_commit_batch) { + s = db->Get(ropt, db->DefaultColumnFamily(), "foo6", &pinnable_val); + ASSERT_OK(s); + ASSERT_TRUE(pinnable_val == ("bar6b")); + s = db->Get(ropt, db->DefaultColumnFamily(), "foo7", &pinnable_val); + ASSERT_TRUE(s.IsNotFound()); + } + } + delete cf_handle; + } // with_commit_batch + } // do_rollback + } // do_prepare + + { + // Also test with max_successive_merges > 0. max_successive_merges will not + // affect our algorithm for duplicate key insertion but we add the test to + // verify that. + cf_options.max_successive_merges = 2; + cf_options.merge_operator = MergeOperators::CreateStringAppendOperator(); + ReOpen(); + db->CreateColumnFamily(cf_options, cf_name, &cf_handle); + WriteOptions write_options; + // Ensure one value for the key + db->Put(write_options, cf_handle, Slice("key"), Slice("value")); + WriteBatch batch; + // Merge more than max_successive_merges times + batch.Merge(cf_handle, Slice("key"), Slice("1")); + batch.Merge(cf_handle, Slice("key"), Slice("2")); + batch.Merge(cf_handle, Slice("key"), Slice("3")); + batch.Merge(cf_handle, Slice("key"), Slice("4")); + ASSERT_OK(db->Write(write_options, &batch)); + ReadOptions read_options; + string value; + ASSERT_OK(db->Get(read_options, cf_handle, "key", &value)); + ASSERT_EQ(value, "value,1,2,3,4"); + delete cf_handle; + } + + { + // Test that the duplicate detection is not compromised after rolling back + // to a save point TransactionOptions txn_options; WriteOptions write_options; Transaction* txn0 = db->BeginTransaction(write_options, txn_options); - auto s = txn0->SetName("xid"); - ASSERT_OK(s); - s = txn0->Put(Slice("foo0"), Slice("bar0a")); - ASSERT_OK(s); - s = txn0->Put(Slice("foo0"), Slice("bar0b")); - ASSERT_OK(s); - s = txn0->Put(Slice("foo1"), Slice("bar1")); - ASSERT_OK(s); - s = txn0->Merge(Slice("foo2"), Slice("bar2a")); - ASSERT_OK(s); - // TODO(myabandeh): enable this after duplicatae merge keys are supported - // s = txn0->Merge(Slice("foo2"), Slice("bar2a")); - // ASSERT_OK(s); - s = txn0->Put(Slice("foo2"), Slice("bar2b")); - ASSERT_OK(s); - s = txn0->Put(Slice("foo3"), Slice("bar3")); - ASSERT_OK(s); - // TODO(myabandeh): enable this after duplicatae merge keys are supported - // s = txn0->Merge(Slice("foo3"), Slice("bar3")); - // ASSERT_OK(s); - s = txn0->Put(Slice("foo4"), Slice("bar4")); - ASSERT_OK(s); - s = txn0->Delete(Slice("foo4")); - ASSERT_OK(s); - s = txn0->SingleDelete(Slice("foo4")); - ASSERT_OK(s); - if (do_prepare) { - s = txn0->Prepare(); - ASSERT_OK(s); - } - s = txn0->Commit(); - ASSERT_OK(s); - if (!do_prepare) { - auto pdb = reinterpret_cast(db); - pdb->UnregisterTransaction(txn0); - } - delete txn0; - ReadOptions ropt; - PinnableSlice pinnable_val; - - s = db->Get(ropt, db->DefaultColumnFamily(), "foo0", &pinnable_val); - ASSERT_OK(s); - ASSERT_TRUE(pinnable_val == ("bar0b")); - s = db->Get(ropt, db->DefaultColumnFamily(), "foo1", &pinnable_val); - ASSERT_OK(s); - ASSERT_TRUE(pinnable_val == ("bar1")); - s = db->Get(ropt, db->DefaultColumnFamily(), "foo2", &pinnable_val); - ASSERT_OK(s); - ASSERT_TRUE(pinnable_val == ("bar2b")); - s = db->Get(ropt, db->DefaultColumnFamily(), "foo3", &pinnable_val); - ASSERT_OK(s); - ASSERT_TRUE(pinnable_val == ("bar3")); - s = db->Get(ropt, db->DefaultColumnFamily(), "foo4", &pinnable_val); - ASSERT_TRUE(s.IsNotFound()); + ASSERT_OK(txn0->Put(Slice("foo0"), Slice("bar0a"))); + ASSERT_OK(txn0->Put(Slice("foo0"), Slice("bar0b"))); + txn0->SetSavePoint(); + txn0->RollbackToSavePoint(); + ASSERT_OK(txn0->Commit()); } } diff --git a/utilities/transactions/write_prepared_txn.cc b/utilities/transactions/write_prepared_txn.cc index 7fea5a9df8..01515000d0 100644 --- a/utilities/transactions/write_prepared_txn.cc +++ b/utilities/transactions/write_prepared_txn.cc @@ -13,6 +13,7 @@ #include #include +#include #include "db/column_family.h" #include "db/db_impl.h" @@ -30,9 +31,7 @@ WritePreparedTxn::WritePreparedTxn(WritePreparedTxnDB* txn_db, const WriteOptions& write_options, const TransactionOptions& txn_options) : PessimisticTransaction(txn_db, write_options, txn_options), - wpt_db_(txn_db) { - GetWriteBatch()->DisableDuplicateMergeKeys(); -} + wpt_db_(txn_db) {} Status WritePreparedTxn::Get(const ReadOptions& read_options, ColumnFamilyHandle* column_family, @@ -63,6 +62,69 @@ Iterator* WritePreparedTxn::GetIterator(const ReadOptions& options, return write_batch_.NewIteratorWithBase(column_family, db_iter); } +namespace { +// A wrapper around Comparator to make it usable in std::set +struct SetComparator { + explicit SetComparator() : user_comparator_(BytewiseComparator()) {} + explicit SetComparator(const Comparator* user_comparator) + : user_comparator_(user_comparator ? user_comparator + : BytewiseComparator()) {} + bool operator()(const Slice& lhs, const Slice& rhs) const { + return user_comparator_->Compare(lhs, rhs) < 0; + } + + private: + const Comparator* user_comparator_; +}; +// Count the number of sub-batches inside a batch. A sub-batch does not have +// duplicate keys. +struct SubBatchCounter : public WriteBatch::Handler { + explicit SubBatchCounter(std::map& comparators) + : comparators_(comparators), batches_(1) {} + std::map& comparators_; + using CFKeys = std::set; + std::map keys_; + size_t batches_; + size_t BatchCount() { return batches_; } + void AddKey(uint32_t cf, const Slice& key) { + CFKeys& cf_keys = keys_[cf]; + if (cf_keys.size() == 0) { // just inserted + auto cmp = comparators_[cf]; + keys_[cf] = CFKeys(SetComparator(cmp)); + } + auto it = cf_keys.insert(key); + if (it.second == false) { // second is false if a element already existed. + batches_++; + keys_.clear(); + keys_[cf].insert(key); + } + } + Status MarkNoop(bool) override { return Status::OK(); } + Status MarkEndPrepare(const Slice&) override { return Status::OK(); } + Status MarkCommit(const Slice&) override { return Status::OK(); } + + Status PutCF(uint32_t cf, const Slice& key, const Slice&) override { + AddKey(cf, key); + return Status::OK(); + } + Status DeleteCF(uint32_t cf, const Slice& key) override { + AddKey(cf, key); + return Status::OK(); + } + Status SingleDeleteCF(uint32_t cf, const Slice& key) override { + AddKey(cf, key); + return Status::OK(); + } + Status MergeCF(uint32_t cf, const Slice& key, const Slice&) override { + AddKey(cf, key); + return Status::OK(); + } + Status MarkBeginPrepare() override { return Status::OK(); } + Status MarkRollback(const Slice&) override { return Status::OK(); } + bool WriteAfterCommit() const override { return false; } +}; +} // namespace + Status WritePreparedTxn::PrepareInternal() { WriteOptions write_options = write_options_; write_options.disableWAL = false; @@ -71,15 +133,18 @@ Status WritePreparedTxn::PrepareInternal() { !WRITE_AFTER_COMMIT); const bool DISABLE_MEMTABLE = true; uint64_t seq_used = kMaxSequenceNumber; - bool collapsed = GetWriteBatch()->Collapse(); - if (collapsed) { - ROCKS_LOG_WARN(db_impl_->immutable_db_options().info_log, - "Collapse overhead due to duplicate keys"); + // For each duplicate key we account for a new sub-batch + prepare_batch_cnt_ = 1; + if (GetWriteBatch()->HasDuplicateKeys()) { + SubBatchCounter counter(*wpt_db_->GetCFComparatorMap()); + auto s = GetWriteBatch()->GetWriteBatch()->Iterate(&counter); + assert(s.ok()); + prepare_batch_cnt_ = counter.BatchCount(); } Status s = db_impl_->WriteImpl(write_options, GetWriteBatch()->GetWriteBatch(), /*callback*/ nullptr, &log_number_, /*log ref*/ 0, - !DISABLE_MEMTABLE, &seq_used); + !DISABLE_MEMTABLE, &seq_used, prepare_batch_cnt_); assert(!s.ok() || seq_used != kMaxSequenceNumber); auto prepare_seq = seq_used; SetId(prepare_seq); @@ -93,18 +158,32 @@ Status WritePreparedTxn::PrepareInternal() { } Status WritePreparedTxn::CommitWithoutPrepareInternal() { - bool collapsed = GetWriteBatch()->Collapse(); - if (collapsed) { - ROCKS_LOG_WARN(db_impl_->immutable_db_options().info_log, - "Collapse overhead due to duplicate keys"); + // For each duplicate key we account for a new sub-batch + size_t batch_cnt = 1; + if (GetWriteBatch()->HasDuplicateKeys()) { + batch_cnt = 0; // this will trigger a batch cnt compute } - return CommitBatchInternal(GetWriteBatch()->GetWriteBatch()); + return CommitBatchInternal(GetWriteBatch()->GetWriteBatch(), batch_cnt); } -Status WritePreparedTxn::CommitBatchInternal(WriteBatch* batch) { +Status WritePreparedTxn::CommitBatchInternal(WriteBatch* batch, + size_t batch_cnt) { ROCKS_LOG_DETAILS(db_impl_->immutable_db_options().info_log, "CommitBatchInternal"); - // TODO(myabandeh): handle the duplicate keys in the batch + if (batch->Count() == 0) { + // Otherwise our 1 seq per batch logic will break since there is no seq + // increased for this batch. + return Status::OK(); + } + if (batch_cnt == 0) { // not provided, then compute it + // TODO(myabandeh): add an option to allow user skipping this cost + SubBatchCounter counter(*wpt_db_->GetCFComparatorMap()); + auto s = batch->Iterate(&counter); + assert(s.ok()); + batch_cnt = counter.BatchCount(); + } + assert(batch_cnt); + bool do_one_write = !db_impl_->immutable_db_options().two_write_queues; bool sync = write_options_.sync; if (!do_one_write) { @@ -116,12 +195,12 @@ Status WritePreparedTxn::CommitBatchInternal(WriteBatch* batch) { const bool DISABLE_MEMTABLE = true; const uint64_t no_log_ref = 0; uint64_t seq_used = kMaxSequenceNumber; - const bool INCLUDES_DATA = true; + const size_t ZERO_PREPARES = 0; WritePreparedCommitEntryPreReleaseCallback update_commit_map( - wpt_db_, db_impl_, kMaxSequenceNumber, INCLUDES_DATA); - auto s = db_impl_->WriteImpl(write_options_, batch, nullptr, nullptr, - no_log_ref, !DISABLE_MEMTABLE, &seq_used, - do_one_write ? &update_commit_map : nullptr); + wpt_db_, db_impl_, kMaxSequenceNumber, ZERO_PREPARES, batch_cnt); + auto s = db_impl_->WriteImpl( + write_options_, batch, nullptr, nullptr, no_log_ref, !DISABLE_MEMTABLE, + &seq_used, batch_cnt, do_one_write ? &update_commit_map : nullptr); assert(!s.ok() || seq_used != kMaxSequenceNumber); uint64_t& prepare_seq = seq_used; SetId(prepare_seq); @@ -144,13 +223,14 @@ Status WritePreparedTxn::CommitBatchInternal(WriteBatch* batch) { // Commit the batch by writing an empty batch to the 2nd queue that will // release the commit sequence number to readers. WritePreparedCommitEntryPreReleaseCallback update_commit_map_with_prepare( - wpt_db_, db_impl_, prepare_seq); + wpt_db_, db_impl_, prepare_seq, batch_cnt); WriteBatch empty_batch; empty_batch.PutLogData(Slice()); + const size_t ONE_BATCH = 1; // In the absence of Prepare markers, use Noop as a batch separator WriteBatchInternal::InsertNoop(&empty_batch); s = db_impl_->WriteImpl(write_options_, &empty_batch, nullptr, nullptr, - no_log_ref, DISABLE_MEMTABLE, &seq_used, + no_log_ref, DISABLE_MEMTABLE, &seq_used, ONE_BATCH, &update_commit_map_with_prepare); assert(!s.ok() || seq_used != kMaxSequenceNumber); return s; @@ -175,17 +255,26 @@ Status WritePreparedTxn::CommitInternal() { auto prepare_seq = GetId(); const bool includes_data = !empty && !for_recovery; + assert(prepare_batch_cnt_); + size_t commit_batch_cnt = 0; + if (includes_data) { + SubBatchCounter counter(*wpt_db_->GetCFComparatorMap()); + auto s = working_batch->Iterate(&counter); + assert(s.ok()); + commit_batch_cnt = counter.BatchCount(); + } WritePreparedCommitEntryPreReleaseCallback update_commit_map( - wpt_db_, db_impl_, prepare_seq, includes_data); + wpt_db_, db_impl_, prepare_seq, prepare_batch_cnt_, commit_batch_cnt); const bool disable_memtable = !includes_data; uint64_t seq_used = kMaxSequenceNumber; // Since the prepared batch is directly written to memtable, there is already // a connection between the memtable and its WAL, so there is no need to // redundantly reference the log that contains the prepared data. const uint64_t zero_log_number = 0ull; + size_t batch_cnt = commit_batch_cnt ? commit_batch_cnt : 1; auto s = db_impl_->WriteImpl(write_options_, working_batch, nullptr, nullptr, zero_log_number, disable_memtable, &seq_used, - &update_commit_map); + batch_cnt, &update_commit_map); assert(!s.ok() || seq_used != kMaxSequenceNumber); return s; } @@ -203,16 +292,36 @@ Status WritePreparedTxn::RollbackInternal() { ReadOptions roptions; WritePreparedTxnReadCallback callback; WriteBatch* rollback_batch_; - RollbackWriteBatchBuilder(DBImpl* db, WritePreparedTxnDB* wpt_db, - SequenceNumber snap_seq, WriteBatch* dst_batch) - : db_(db), callback(wpt_db, snap_seq), rollback_batch_(dst_batch) {} + std::map& comparators_; + using CFKeys = std::set; + std::map keys_; + RollbackWriteBatchBuilder( + DBImpl* db, WritePreparedTxnDB* wpt_db, SequenceNumber snap_seq, + WriteBatch* dst_batch, + std::map& comparators) + : db_(db), + callback(wpt_db, snap_seq), + rollback_batch_(dst_batch), + comparators_(comparators) {} Status Rollback(uint32_t cf, const Slice& key) { + Status s; + CFKeys& cf_keys = keys_[cf]; + if (cf_keys.size() == 0) { // just inserted + auto cmp = comparators_[cf]; + keys_[cf] = CFKeys(SetComparator(cmp)); + } + auto it = cf_keys.insert(key); + if (it.second == + false) { // second is false if a element already existed. + return s; + } + PinnableSlice pinnable_val; bool not_used; auto cf_handle = db_->GetColumnFamilyHandle(cf); - auto s = db_->GetImpl(roptions, cf_handle, key, &pinnable_val, ¬_used, - &callback); + s = db_->GetImpl(roptions, cf_handle, key, &pinnable_val, ¬_used, + &callback); assert(s.ok() || s.IsNotFound()); if (s.ok()) { s = rollback_batch_->Put(cf_handle, key, pinnable_val); @@ -254,7 +363,8 @@ Status WritePreparedTxn::RollbackInternal() { protected: virtual bool WriteAfterCommit() const override { return false; } - } rollback_handler(db_impl_, wpt_db_, last_visible_txn, &rollback_batch); + } rollback_handler(db_impl_, wpt_db_, last_visible_txn, &rollback_batch, + *wpt_db_->GetCFComparatorMap()); auto s = GetWriteBatch()->GetWriteBatch()->Iterate(&rollback_handler); assert(s.ok()); if (!s.ok()) { @@ -266,11 +376,12 @@ Status WritePreparedTxn::RollbackInternal() { const bool DISABLE_MEMTABLE = true; const uint64_t no_log_ref = 0; uint64_t seq_used = kMaxSequenceNumber; - const bool INCLUDES_DATA = true; + const size_t ZERO_PREPARES = 0; + const size_t ONE_BATCH = 1; WritePreparedCommitEntryPreReleaseCallback update_commit_map( - wpt_db_, db_impl_, kMaxSequenceNumber, INCLUDES_DATA); + wpt_db_, db_impl_, kMaxSequenceNumber, ZERO_PREPARES, ONE_BATCH); s = db_impl_->WriteImpl(write_options_, &rollback_batch, nullptr, nullptr, - no_log_ref, !DISABLE_MEMTABLE, &seq_used, + no_log_ref, !DISABLE_MEMTABLE, &seq_used, ONE_BATCH, do_one_write ? &update_commit_map : nullptr); assert(!s.ok() || seq_used != kMaxSequenceNumber); if (!s.ok()) { @@ -289,13 +400,13 @@ Status WritePreparedTxn::RollbackInternal() { // Commit the batch by writing an empty batch to the queue that will release // the commit sequence number to readers. WritePreparedCommitEntryPreReleaseCallback update_commit_map_with_prepare( - wpt_db_, db_impl_, prepare_seq); + wpt_db_, db_impl_, prepare_seq, ONE_BATCH); WriteBatch empty_batch; empty_batch.PutLogData(Slice()); // In the absence of Prepare markers, use Noop as a batch separator WriteBatchInternal::InsertNoop(&empty_batch); s = db_impl_->WriteImpl(write_options_, &empty_batch, nullptr, nullptr, - no_log_ref, DISABLE_MEMTABLE, &seq_used, + no_log_ref, DISABLE_MEMTABLE, &seq_used, ONE_BATCH, &update_commit_map_with_prepare); assert(!s.ok() || seq_used != kMaxSequenceNumber); // Mark the txn as rolled back @@ -334,6 +445,18 @@ Status WritePreparedTxn::ValidateSnapshot(ColumnFamilyHandle* column_family, &snap_checker); } +Status WritePreparedTxn::RebuildFromWriteBatch(WriteBatch* src_batch) { + auto ret = PessimisticTransaction::RebuildFromWriteBatch(src_batch); + prepare_batch_cnt_ = 1; + if (GetWriteBatch()->HasDuplicateKeys()) { + SubBatchCounter counter(*wpt_db_->GetCFComparatorMap()); + auto s = GetWriteBatch()->GetWriteBatch()->Iterate(&counter); + assert(s.ok()); + prepare_batch_cnt_ = counter.BatchCount(); + } + return ret; +} + } // namespace rocksdb #endif // ROCKSDB_LITE diff --git a/utilities/transactions/write_prepared_txn.h b/utilities/transactions/write_prepared_txn.h index 17f7d0c96f..de6a72ac9e 100644 --- a/utilities/transactions/write_prepared_txn.h +++ b/utilities/transactions/write_prepared_txn.h @@ -68,7 +68,7 @@ class WritePreparedTxn : public PessimisticTransaction { Status CommitWithoutPrepareInternal() override; - Status CommitBatchInternal(WriteBatch* batch) override; + Status CommitBatchInternal(WriteBatch* batch, size_t batch_cnt) override; // Since the data is already written to memtables at the Prepare phase, the // commit entails writing only a commit marker in the WAL. The sequence number @@ -84,11 +84,15 @@ class WritePreparedTxn : public PessimisticTransaction { const Slice& key, SequenceNumber* tracked_at_seq) override; + virtual Status RebuildFromWriteBatch(WriteBatch* src_batch) override; + // No copying allowed WritePreparedTxn(const WritePreparedTxn&); void operator=(const WritePreparedTxn&); WritePreparedTxnDB* wpt_db_; + // Number of sub-batches in prepare + size_t prepare_batch_cnt_ = 0; }; } // namespace rocksdb diff --git a/utilities/transactions/write_prepared_txn_db.cc b/utilities/transactions/write_prepared_txn_db.cc index 867b757da5..79a3b727b7 100644 --- a/utilities/transactions/write_prepared_txn_db.cc +++ b/utilities/transactions/write_prepared_txn_db.cc @@ -11,7 +11,6 @@ #include "utilities/transactions/write_prepared_txn_db.h" -#include #include #include #include @@ -49,6 +48,20 @@ Status WritePreparedTxnDB::Initialize( return s; } +Status WritePreparedTxnDB::VerifyCFOptions( + const ColumnFamilyOptions& cf_options) { + Status s = PessimisticTransactionDB::VerifyCFOptions(cf_options); + if (!s.ok()) { + return s; + } + if (!cf_options.memtable_factory->CanHandleDuplicatedKey()) { + return Status::InvalidArgument( + "memtable_factory->CanHandleDuplicatedKey() cannot be false with " + "WritePrpeared transactions"); + } + return Status::OK(); +} + Transaction* WritePreparedTxnDB::BeginTransaction( const WriteOptions& write_options, const TransactionOptions& txn_options, Transaction* old_txn) { diff --git a/utilities/transactions/write_prepared_txn_db.h b/utilities/transactions/write_prepared_txn_db.h index 4e83609fed..0b096e4c23 100644 --- a/utilities/transactions/write_prepared_txn_db.h +++ b/utilities/transactions/write_prepared_txn_db.h @@ -205,6 +205,10 @@ class WritePreparedTxnDB : public PessimisticTransactionDB { // Struct to hold ownership of snapshot and read callback for cleanup. struct IteratorState; + protected: + virtual Status VerifyCFOptions( + const ColumnFamilyOptions& cf_options) override; + private: friend class WritePreparedTransactionTest_IsInSnapshotTest_Test; friend class WritePreparedTransactionTest_CheckAgainstSnapshotsTest_Test; @@ -401,30 +405,48 @@ class WritePreparedCommitEntryPreReleaseCallback : public PreReleaseCallback { public: // includes_data indicates that the commit also writes non-empty // CommitTimeWriteBatch to memtable, which needs to be committed separately. - WritePreparedCommitEntryPreReleaseCallback( - WritePreparedTxnDB* db, DBImpl* db_impl, - SequenceNumber prep_seq = kMaxSequenceNumber, bool includes_data = false) + WritePreparedCommitEntryPreReleaseCallback(WritePreparedTxnDB* db, + DBImpl* db_impl, + SequenceNumber prep_seq, + size_t prep_batch_cnt, + size_t data_batch_cnt = 0) : db_(db), db_impl_(db_impl), prep_seq_(prep_seq), - includes_data_(includes_data) {} + prep_batch_cnt_(prep_batch_cnt), + data_batch_cnt_(data_batch_cnt), + includes_data_(data_batch_cnt_ > 0) { + assert((prep_batch_cnt_ > 0) != (prep_seq == kMaxSequenceNumber)); // xor + assert(prep_batch_cnt_ > 0 || data_batch_cnt_ > 0); + } - virtual Status Callback(SequenceNumber commit_seq) { + virtual Status Callback(SequenceNumber commit_seq) override { assert(includes_data_ || prep_seq_ != kMaxSequenceNumber); + const uint64_t last_commit_seq = LIKELY(data_batch_cnt_ <= 1) + ? commit_seq + : commit_seq + data_batch_cnt_ - 1; if (prep_seq_ != kMaxSequenceNumber) { - db_->AddCommitted(prep_seq_, commit_seq); + for (size_t i = 0; i < prep_batch_cnt_; i++) { + db_->AddCommitted(prep_seq_ + i, last_commit_seq); + } } // else there was no prepare phase if (includes_data_) { + assert(data_batch_cnt_); // Commit the data that is accompnaied with the commit request const bool PREPARE_SKIPPED = true; - db_->AddCommitted(commit_seq, commit_seq, PREPARE_SKIPPED); + for (size_t i = 0; i < data_batch_cnt_; i++) { + // For commit seq of each batch use the commit seq of the last batch. + // This would make debugging easier by having all the batches having + // the same sequence number. + db_->AddCommitted(commit_seq + i, last_commit_seq, PREPARE_SKIPPED); + } } if (db_impl_->immutable_db_options().two_write_queues) { // Publish the sequence number. We can do that here assuming the callback // is invoked only from one write queue, which would guarantee that the // publish sequence numbers will be in order, i.e., once a seq is // published all the seq prior to that are also publishable. - db_impl_->SetLastPublishedSequence(commit_seq); + db_impl_->SetLastPublishedSequence(last_commit_seq); } // else SequenceNumber that is updated as part of the write already does the // publishing @@ -436,6 +458,8 @@ class WritePreparedCommitEntryPreReleaseCallback : public PreReleaseCallback { DBImpl* db_impl_; // kMaxSequenceNumber if there was no prepare phase SequenceNumber prep_seq_; + size_t prep_batch_cnt_; + size_t data_batch_cnt_; // Either because it is commit without prepare or it has a // CommitTimeWriteBatch bool includes_data_; diff --git a/utilities/write_batch_with_index/write_batch_with_index.cc b/utilities/write_batch_with_index/write_batch_with_index.cc index 6e36c430f0..6f26c13286 100644 --- a/utilities/write_batch_with_index/write_batch_with_index.cc +++ b/utilities/write_batch_with_index/write_batch_with_index.cc @@ -582,67 +582,8 @@ void WriteBatchWithIndex::Rep::AddNewEntry(uint32_t column_family_id) { WriteBatch* WriteBatchWithIndex::GetWriteBatch() { return &rep->write_batch; } - bool WriteBatchWithIndex::Collapse() { - if (rep->obsolete_offsets.size() == 0) { - return false; - } - std::sort(rep->obsolete_offsets.begin(), rep->obsolete_offsets.end()); - WriteBatch& write_batch = rep->write_batch; - assert(write_batch.Count() != 0); - size_t offset = WriteBatchInternal::GetFirstOffset(&write_batch); - Slice input(write_batch.Data()); - input.remove_prefix(offset); - std::string collapsed_buf; - collapsed_buf.resize(WriteBatchInternal::kHeader); - - size_t count = 0; - Status s; - // Loop through all entries in the write batch and add keep them if they are - // not obsolete by a newere entry. - while (s.ok() && !input.empty()) { - Slice key, value, blob, xid; - uint32_t column_family_id = 0; // default - char tag = 0; - // set offset of current entry for call to AddNewEntry() - size_t last_entry_offset = input.data() - write_batch.Data().data(); - s = ReadRecordFromWriteBatch(&input, &tag, &column_family_id, &key, - &value, &blob, &xid); - if (!rep->obsolete_offsets.empty() && - rep->obsolete_offsets.front() == last_entry_offset) { - rep->obsolete_offsets.erase(rep->obsolete_offsets.begin()); - continue; - } - switch (tag) { - case kTypeColumnFamilyValue: - case kTypeValue: - case kTypeColumnFamilyDeletion: - case kTypeDeletion: - case kTypeColumnFamilySingleDeletion: - case kTypeSingleDeletion: - case kTypeColumnFamilyMerge: - case kTypeMerge: - count++; - break; - case kTypeLogData: - case kTypeBeginPrepareXID: - case kTypeBeginPersistedPrepareXID: - case kTypeEndPrepareXID: - case kTypeCommitXID: - case kTypeRollbackXID: - case kTypeNoop: - break; - default: - assert(0); - } - size_t entry_offset = input.data() - write_batch.Data().data(); - const std::string& wb_data = write_batch.Data(); - Slice entry_ptr = Slice(wb_data.data() + last_entry_offset, - entry_offset - last_entry_offset); - collapsed_buf.append(entry_ptr.data(), entry_ptr.size()); - } - write_batch.rep_ = std::move(collapsed_buf); - WriteBatchInternal::SetCount(&write_batch, static_cast(count)); - return true; + bool WriteBatchWithIndex::HasDuplicateKeys() { + return rep->obsolete_offsets.size() > 0; } WBWIIterator* WriteBatchWithIndex::NewIterator() { @@ -758,15 +699,7 @@ Status WriteBatchWithIndex::Merge(ColumnFamilyHandle* column_family, rep->SetLastEntryOffset(); auto s = rep->write_batch.Merge(column_family, key, value); if (s.ok()) { - auto size_before = rep->obsolete_offsets.size(); rep->AddOrUpdateIndex(column_family, key); - auto size_after = rep->obsolete_offsets.size(); - bool duplicate_key = size_before != size_after; - if (!allow_dup_merge_ && duplicate_key) { - assert(0); - return Status::NotSupported( - "Duplicate key with merge value is not supported yet"); - } } return s; } @@ -775,15 +708,7 @@ Status WriteBatchWithIndex::Merge(const Slice& key, const Slice& value) { rep->SetLastEntryOffset(); auto s = rep->write_batch.Merge(key, value); if (s.ok()) { - auto size_before = rep->obsolete_offsets.size(); rep->AddOrUpdateIndex(key); - auto size_after = rep->obsolete_offsets.size(); - bool duplicate_key = size_before != size_after; - if (!allow_dup_merge_ && duplicate_key) { - assert(0); - return Status::NotSupported( - "Duplicate key with merge value is not supported yet"); - } } return s; } @@ -958,8 +883,9 @@ Status WriteBatchWithIndex::RollbackToSavePoint() { Status s = rep->write_batch.RollbackToSavePoint(); if (s.ok()) { - s = rep->ReBuildIndex(); + // obsolete_offsets will be rebuilt by ReBuildIndex rep->obsolete_offsets.clear(); + s = rep->ReBuildIndex(); } return s;