diff --git a/db/db_impl_write.cc b/db/db_impl_write.cc index 9f84fe905a..ddd0aa6532 100644 --- a/db/db_impl_write.cc +++ b/db/db_impl_write.cc @@ -146,17 +146,6 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, if (write_thread_.CompleteParallelMemTableWriter(&w)) { // we're responsible for exit batch group - for (auto* writer : *(w.write_group)) { - if (!writer->CallbackFailed() && writer->pre_release_callback) { - assert(writer->sequence != kMaxSequenceNumber); - Status ws = writer->pre_release_callback->Callback(writer->sequence, - disable_memtable); - if (!ws.ok()) { - status = ws; - break; - } - } - } // TODO(myabandeh): propagate status to write_group auto last_sequence = w.write_group->last_sequence; versions_->SetLastSequence(last_sequence); @@ -309,6 +298,35 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, const SequenceNumber current_sequence = last_sequence + 1; last_sequence += seq_inc; + // PreReleaseCallback is called after WAL write and before memtable write + if (status.ok()) { + SequenceNumber next_sequence = current_sequence; + // Note: the logic for advancing seq here must be consistent with the + // logic in WriteBatchInternal::InsertInto(write_group...) as well as + // with WriteBatchInternal::InsertInto(write_batch...) that is called on + // the merged batch during recovery from the WAL. + for (auto* writer : write_group) { + if (writer->CallbackFailed()) { + continue; + } + writer->sequence = next_sequence; + if (writer->pre_release_callback) { + Status ws = writer->pre_release_callback->Callback(writer->sequence, + disable_memtable); + if (!ws.ok()) { + status = ws; + break; + } + } + if (seq_per_batch_) { + assert(writer->batch_cnt); + next_sequence += writer->batch_cnt; + } else if (writer->ShouldWriteToMemtable()) { + next_sequence += WriteBatchInternal::Count(writer->batch); + } + } + } + if (status.ok()) { PERF_TIMER_GUARD(write_memtable_time); @@ -320,23 +338,6 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, 0 /*recovery_log_number*/, this, parallel, seq_per_batch_, batch_per_txn_); } else { - SequenceNumber next_sequence = current_sequence; - // Note: the logic for advancing seq here must be consistent with the - // logic in WriteBatchInternal::InsertInto(write_group...) as well as - // with WriteBatchInternal::InsertInto(write_batch...) that is called on - // the merged batch during recovery from the WAL. - for (auto* writer : write_group) { - if (writer->CallbackFailed()) { - continue; - } - writer->sequence = next_sequence; - if (seq_per_batch_) { - assert(writer->batch_cnt); - next_sequence += writer->batch_cnt; - } else if (writer->ShouldWriteToMemtable()) { - next_sequence += WriteBatchInternal::Count(writer->batch); - } - } write_group.last_sequence = last_sequence; write_thread_.LaunchParallelMemTableWriters(&write_group); in_parallel_group = true; @@ -388,17 +389,8 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, } if (should_exit_batch_group) { if (status.ok()) { - for (auto* writer : write_group) { - if (!writer->CallbackFailed() && writer->pre_release_callback) { - assert(writer->sequence != kMaxSequenceNumber); - Status ws = writer->pre_release_callback->Callback(writer->sequence, - disable_memtable); - if (!ws.ok()) { - status = ws; - break; - } - } - } + // Note: if we are to resume after non-OK statuses we need to revisit how + // we reacts to non-OK statuses here. versions_->SetLastSequence(last_sequence); } MemTableInsertStatusCheck(w.status); diff --git a/db/pre_release_callback.h b/db/pre_release_callback.h index 0cee4d5477..09564ba33b 100644 --- a/db/pre_release_callback.h +++ b/db/pre_release_callback.h @@ -15,8 +15,8 @@ class PreReleaseCallback { public: virtual ~PreReleaseCallback() {} - // Will be called while on the write thread after the write and before the - // release of the sequence number. This is useful if any operation needs to be + // Will be called while on the write thread after the write to the WAL and + // before the write to memtable. This is useful if any operation needs to be // done before the write gets visible to the readers, or if we want to reduce // the overhead of locking by updating something sequentially while we are on // the write thread. If the callback fails, this function returns a non-OK