WritePrepared: commit only from the 2nd queue (#5014)

Summary:
When two_write_queues is enabled we call ::AddPrepared only from the main queue, which writes to both WAL and memtable, and call ::AddCommitted from the 2nd queue, which writes only to WAL. This simplifies the logic by avoiding concurrency between AddPrepared and also between AddCommitted. The patch fixes one case that did not conform with the rule above. This would allow future refactoring. For example AdvaneMaxEvictedSeq, which is invoked by AddCommitted, can be simplified by assuming lack of concurrent calls to it.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/5014

Differential Revision: D14210493

Pulled By: maysamyabandeh

fbshipit-source-id: 6db5ba372a294a568a14caa010576460917a4eab
This commit is contained in:
Maysam Yabandeh 2019-02-28 15:20:40 -08:00 committed by Facebook Github Bot
parent 06ea73d60b
commit 68a2f94d5d
2 changed files with 59 additions and 44 deletions

View File

@ -87,10 +87,7 @@ Status WritePreparedTxn::PrepareInternal() {
!WRITE_AFTER_COMMIT); !WRITE_AFTER_COMMIT);
// For each duplicate key we account for a new sub-batch // For each duplicate key we account for a new sub-batch
prepare_batch_cnt_ = GetWriteBatch()->SubBatchCnt(); prepare_batch_cnt_ = GetWriteBatch()->SubBatchCnt();
// AddPrepared better to be called in the pre-release callback otherwise there // Having AddPrepared in the PreReleaseCallback allows in-order addition of
// is a non-zero chance of max advancing prepare_seq and readers assume the
// data as committed.
// Also having it in the PreReleaseCallback allows in-order addition of
// prepared entries to PrepareHeap and hence enables an optimization. Refer to // prepared entries to PrepareHeap and hence enables an optimization. Refer to
// SmallestUnCommittedSeq for more details. // SmallestUnCommittedSeq for more details.
AddPreparedCallback add_prepared_callback( AddPreparedCallback add_prepared_callback(
@ -151,14 +148,18 @@ Status WritePreparedTxn::CommitInternal() {
const bool disable_memtable = !includes_data; const bool disable_memtable = !includes_data;
const bool do_one_write = const bool do_one_write =
!db_impl_->immutable_db_options().two_write_queues || disable_memtable; !db_impl_->immutable_db_options().two_write_queues || disable_memtable;
const bool publish_seq = do_one_write;
// Note: CommitTimeWriteBatch does not need AddPrepared since it is written to
// DB in one shot. min_uncommitted still works since it requires capturing
// data that is written to DB but not yet committed, while
// CommitTimeWriteBatch commits with PreReleaseCallback.
WritePreparedCommitEntryPreReleaseCallback update_commit_map( WritePreparedCommitEntryPreReleaseCallback update_commit_map(
wpt_db_, db_impl_, prepare_seq, prepare_batch_cnt_, commit_batch_cnt, wpt_db_, db_impl_, prepare_seq, prepare_batch_cnt_, commit_batch_cnt);
publish_seq); // This is to call AddPrepared on CommitTimeWriteBatch
AddPreparedCallback add_prepared_callback(
wpt_db_, commit_batch_cnt,
db_impl_->immutable_db_options().two_write_queues);
PreReleaseCallback* pre_release_callback;
if (do_one_write) {
pre_release_callback = &update_commit_map;
} else {
pre_release_callback = &add_prepared_callback;
}
uint64_t seq_used = kMaxSequenceNumber; uint64_t seq_used = kMaxSequenceNumber;
// Since the prepared batch is directly written to memtable, there is already // Since the prepared batch is directly written to memtable, there is already
// a connection between the memtable and its WAL, so there is no need to // a connection between the memtable and its WAL, so there is no need to
@ -167,37 +168,29 @@ Status WritePreparedTxn::CommitInternal() {
size_t batch_cnt = UNLIKELY(commit_batch_cnt) ? commit_batch_cnt : 1; size_t batch_cnt = UNLIKELY(commit_batch_cnt) ? commit_batch_cnt : 1;
auto s = db_impl_->WriteImpl(write_options_, working_batch, nullptr, nullptr, auto s = db_impl_->WriteImpl(write_options_, working_batch, nullptr, nullptr,
zero_log_number, disable_memtable, &seq_used, zero_log_number, disable_memtable, &seq_used,
batch_cnt, &update_commit_map); batch_cnt, pre_release_callback);
assert(!s.ok() || seq_used != kMaxSequenceNumber); assert(!s.ok() || seq_used != kMaxSequenceNumber);
const SequenceNumber commit_batch_seq = seq_used;
if (LIKELY(do_one_write || !s.ok())) { if (LIKELY(do_one_write || !s.ok())) {
if (LIKELY(s.ok())) { if (LIKELY(s.ok())) {
// Note RemovePrepared should be called after WriteImpl that publishsed // Note RemovePrepared should be called after WriteImpl that publishsed
// the seq. Otherwise SmallestUnCommittedSeq optimization breaks. // the seq. Otherwise SmallestUnCommittedSeq optimization breaks.
wpt_db_->RemovePrepared(prepare_seq, prepare_batch_cnt_); wpt_db_->RemovePrepared(prepare_seq, prepare_batch_cnt_);
} }
if (UNLIKELY(!do_one_write)) {
wpt_db_->RemovePrepared(commit_batch_seq, commit_batch_cnt);
}
return s; return s;
} // else do the 2nd write to publish seq } // else do the 2nd write to publish seq
// Note: the 2nd write comes with a performance penality. So if we have too // Note: the 2nd write comes with a performance penality. So if we have too
// many of commits accompanied with ComitTimeWriteBatch and yet we cannot // many of commits accompanied with ComitTimeWriteBatch and yet we cannot
// enable use_only_the_last_commit_time_batch_for_recovery_ optimization, // enable use_only_the_last_commit_time_batch_for_recovery_ optimization,
// two_write_queues should be disabled to avoid many additional writes here. // two_write_queues should be disabled to avoid many additional writes here.
class PublishSeqPreReleaseCallback : public PreReleaseCallback { const size_t kZeroData = 0;
public: // Update commit map only from the 2nd queue
explicit PublishSeqPreReleaseCallback(DBImpl* db_impl) WritePreparedCommitEntryPreReleaseCallback update_commit_map_with_aux_batch(
: db_impl_(db_impl) {} wpt_db_, db_impl_, prepare_seq, prepare_batch_cnt_, kZeroData,
Status Callback(SequenceNumber seq, bool is_mem_disabled) override { commit_batch_seq, commit_batch_cnt);
#ifdef NDEBUG
(void)is_mem_disabled;
#endif
assert(is_mem_disabled);
assert(db_impl_->immutable_db_options().two_write_queues);
db_impl_->SetLastPublishedSequence(seq);
return Status::OK();
}
private:
DBImpl* db_impl_;
} publish_seq_callback(db_impl_);
WriteBatch empty_batch; WriteBatch empty_batch;
empty_batch.PutLogData(Slice()); empty_batch.PutLogData(Slice());
// In the absence of Prepare markers, use Noop as a batch separator // In the absence of Prepare markers, use Noop as a batch separator
@ -207,11 +200,12 @@ Status WritePreparedTxn::CommitInternal() {
const uint64_t NO_REF_LOG = 0; const uint64_t NO_REF_LOG = 0;
s = db_impl_->WriteImpl(write_options_, &empty_batch, nullptr, nullptr, s = db_impl_->WriteImpl(write_options_, &empty_batch, nullptr, nullptr,
NO_REF_LOG, DISABLE_MEMTABLE, &seq_used, ONE_BATCH, NO_REF_LOG, DISABLE_MEMTABLE, &seq_used, ONE_BATCH,
&publish_seq_callback); &update_commit_map_with_aux_batch);
assert(!s.ok() || seq_used != kMaxSequenceNumber); assert(!s.ok() || seq_used != kMaxSequenceNumber);
// Note RemovePrepared should be called after WriteImpl that publishsed the // Note RemovePrepared should be called after WriteImpl that publishsed the
// seq. Otherwise SmallestUnCommittedSeq optimization breaks. // seq. Otherwise SmallestUnCommittedSeq optimization breaks.
wpt_db_->RemovePrepared(prepare_seq, prepare_batch_cnt_); wpt_db_->RemovePrepared(prepare_seq, prepare_batch_cnt_);
wpt_db_->RemovePrepared(commit_batch_seq, commit_batch_cnt);
return s; return s;
} }

View File

@ -321,12 +321,14 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
return false; return false;
} }
// Add the transaction with prepare sequence seq to the prepared list // Add the transaction with prepare sequence seq to the prepared list.
// Note: must be called serially with increasing seq on each call.
void AddPrepared(uint64_t seq); void AddPrepared(uint64_t seq);
// Remove the transaction with prepare sequence seq from the prepared list // Remove the transaction with prepare sequence seq from the prepared list
void RemovePrepared(const uint64_t seq, const size_t batch_cnt = 1); void RemovePrepared(const uint64_t seq, const size_t batch_cnt = 1);
// Add the transaction with prepare sequence prepare_seq and comtit sequence // Add the transaction with prepare sequence prepare_seq and commit sequence
// commit_seq to the commit map. loop_cnt is to detect infinite loops. // commit_seq to the commit map. loop_cnt is to detect infinite loops.
// Note: must be called serially.
void AddCommitted(uint64_t prepare_seq, uint64_t commit_seq, void AddCommitted(uint64_t prepare_seq, uint64_t commit_seq,
uint8_t loop_cnt = 0); uint8_t loop_cnt = 0);
@ -752,6 +754,7 @@ class AddPreparedCallback : public PreReleaseCallback {
#ifdef NDEBUG #ifdef NDEBUG
(void)is_mem_disabled; (void)is_mem_disabled;
#endif #endif
// Always Prepare from the main queue
assert(!two_write_queues_ || !is_mem_disabled); // implies the 1st queue assert(!two_write_queues_ || !is_mem_disabled); // implies the 1st queue
for (size_t i = 0; i < sub_batch_cnt_; i++) { for (size_t i = 0; i < sub_batch_cnt_; i++) {
db_->AddPrepared(prepare_seq + i); db_->AddPrepared(prepare_seq + i);
@ -769,21 +772,22 @@ class WritePreparedCommitEntryPreReleaseCallback : public PreReleaseCallback {
public: public:
// includes_data indicates that the commit also writes non-empty // includes_data indicates that the commit also writes non-empty
// CommitTimeWriteBatch to memtable, which needs to be committed separately. // CommitTimeWriteBatch to memtable, which needs to be committed separately.
WritePreparedCommitEntryPreReleaseCallback(WritePreparedTxnDB* db, WritePreparedCommitEntryPreReleaseCallback(
DBImpl* db_impl, WritePreparedTxnDB* db, DBImpl* db_impl, SequenceNumber prep_seq,
SequenceNumber prep_seq, size_t prep_batch_cnt, size_t data_batch_cnt = 0,
size_t prep_batch_cnt, SequenceNumber aux_seq = kMaxSequenceNumber, size_t aux_batch_cnt = 0)
size_t data_batch_cnt = 0,
bool publish_seq = true)
: db_(db), : db_(db),
db_impl_(db_impl), db_impl_(db_impl),
prep_seq_(prep_seq), prep_seq_(prep_seq),
prep_batch_cnt_(prep_batch_cnt), prep_batch_cnt_(prep_batch_cnt),
data_batch_cnt_(data_batch_cnt), data_batch_cnt_(data_batch_cnt),
includes_data_(data_batch_cnt_ > 0), includes_data_(data_batch_cnt_ > 0),
publish_seq_(publish_seq) { aux_seq_(aux_seq),
aux_batch_cnt_(aux_batch_cnt),
includes_aux_batch_(aux_batch_cnt > 0) {
assert((prep_batch_cnt_ > 0) != (prep_seq == kMaxSequenceNumber)); // xor assert((prep_batch_cnt_ > 0) != (prep_seq == kMaxSequenceNumber)); // xor
assert(prep_batch_cnt_ > 0 || data_batch_cnt_ > 0); assert(prep_batch_cnt_ > 0 || data_batch_cnt_ > 0);
assert((aux_batch_cnt_ > 0) != (aux_seq == kMaxSequenceNumber)); // xor
} }
virtual Status Callback(SequenceNumber commit_seq, virtual Status Callback(SequenceNumber commit_seq,
@ -791,7 +795,12 @@ class WritePreparedCommitEntryPreReleaseCallback : public PreReleaseCallback {
#ifdef NDEBUG #ifdef NDEBUG
(void)is_mem_disabled; (void)is_mem_disabled;
#endif #endif
// Always commit from the 2nd queue
assert(!db_impl_->immutable_db_options().two_write_queues ||
is_mem_disabled);
assert(includes_data_ || prep_seq_ != kMaxSequenceNumber); assert(includes_data_ || prep_seq_ != kMaxSequenceNumber);
// Data batch is what accompanied with the commit marker and affects the
// last seq in the commit batch.
const uint64_t last_commit_seq = LIKELY(data_batch_cnt_ <= 1) const uint64_t last_commit_seq = LIKELY(data_batch_cnt_ <= 1)
? commit_seq ? commit_seq
: commit_seq + data_batch_cnt_ - 1; : commit_seq + data_batch_cnt_ - 1;
@ -800,6 +809,11 @@ class WritePreparedCommitEntryPreReleaseCallback : public PreReleaseCallback {
db_->AddCommitted(prep_seq_ + i, last_commit_seq); db_->AddCommitted(prep_seq_ + i, last_commit_seq);
} }
} // else there was no prepare phase } // else there was no prepare phase
if (includes_aux_batch_) {
for (size_t i = 0; i < aux_batch_cnt_; i++) {
db_->AddCommitted(aux_seq_ + i, last_commit_seq);
}
}
if (includes_data_) { if (includes_data_) {
assert(data_batch_cnt_); assert(data_batch_cnt_);
// Commit the data that is accompanied with the commit request // Commit the data that is accompanied with the commit request
@ -810,7 +824,7 @@ class WritePreparedCommitEntryPreReleaseCallback : public PreReleaseCallback {
db_->AddCommitted(commit_seq + i, last_commit_seq); db_->AddCommitted(commit_seq + i, last_commit_seq);
} }
} }
if (db_impl_->immutable_db_options().two_write_queues && publish_seq_) { if (db_impl_->immutable_db_options().two_write_queues) {
assert(is_mem_disabled); // implies the 2nd queue assert(is_mem_disabled); // implies the 2nd queue
// Publish the sequence number. We can do that here assuming the callback // Publish the sequence number. We can do that here assuming the callback
// is invoked only from one write queue, which would guarantee that the // is invoked only from one write queue, which would guarantee that the
@ -830,11 +844,16 @@ class WritePreparedCommitEntryPreReleaseCallback : public PreReleaseCallback {
SequenceNumber prep_seq_; SequenceNumber prep_seq_;
size_t prep_batch_cnt_; size_t prep_batch_cnt_;
size_t data_batch_cnt_; size_t data_batch_cnt_;
// Either because it is commit without prepare or it has a // Data here is the batch that is written with the commit marker, either
// CommitTimeWriteBatch // because it is commit without prepare or commit has a CommitTimeWriteBatch.
bool includes_data_; bool includes_data_;
// Should the callback also publishes the commit seq number // Auxiliary batch (if there is any) is a batch that is written before, but
bool publish_seq_; // gets the same commit seq as prepare batch or data batch. This is used in
// two write queues where the CommitTimeWriteBatch becomes the aux batch and
// we do a separate write to actually commit everything.
SequenceNumber aux_seq_;
size_t aux_batch_cnt_;
bool includes_aux_batch_;
}; };
// For two_write_queues commit both the aborted batch and the cleanup batch and // For two_write_queues commit both the aborted batch and the cleanup batch and
@ -858,7 +877,9 @@ class WritePreparedRollbackPreReleaseCallback : public PreReleaseCallback {
virtual Status Callback(SequenceNumber commit_seq, virtual Status Callback(SequenceNumber commit_seq,
bool is_mem_disabled) override { bool is_mem_disabled) override {
// Always commit from the 2nd queue
assert(is_mem_disabled); // implies the 2nd queue assert(is_mem_disabled); // implies the 2nd queue
assert(db_impl_->immutable_db_options().two_write_queues);
#ifdef NDEBUG #ifdef NDEBUG
(void)is_mem_disabled; (void)is_mem_disabled;
#endif #endif