diff --git a/db/compaction_iterator.cc b/db/compaction_iterator.cc index 413ecab08d..7f15a186c3 100644 --- a/db/compaction_iterator.cc +++ b/db/compaction_iterator.cc @@ -91,6 +91,7 @@ CompactionIterator::CompactionIterator( ignore_snapshots_ = false; } input_->SetPinnedItersMgr(&pinned_iters_mgr_); + TEST_SYNC_POINT_CALLBACK("CompactionIterator:AfterInit", compaction_.get()); } CompactionIterator::~CompactionIterator() { diff --git a/db/db_impl.cc b/db/db_impl.cc index cdc66701be..ba9d7ff706 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1960,21 +1960,32 @@ const Snapshot* DBImpl::GetSnapshotForWriteConflictBoundary() { } #endif // ROCKSDB_LITE -SnapshotImpl* DBImpl::GetSnapshotImpl(bool is_write_conflict_boundary) { +SnapshotImpl* DBImpl::GetSnapshotImpl(bool is_write_conflict_boundary, + bool lock) { int64_t unix_time = 0; env_->GetCurrentTime(&unix_time); // Ignore error SnapshotImpl* s = new SnapshotImpl; - InstrumentedMutexLock l(&mutex_); + if (lock) { + mutex_.Lock(); + } // returns null if the underlying memtable does not support snapshot. if (!is_snapshot_supported_) { + if (lock) { + mutex_.Unlock(); + } delete s; return nullptr; } auto snapshot_seq = last_seq_same_as_publish_seq_ ? versions_->LastSequence() : versions_->LastPublishedSequence(); - return snapshots_.New(s, snapshot_seq, unix_time, is_write_conflict_boundary); + SnapshotImpl* snapshot = + snapshots_.New(s, snapshot_seq, unix_time, is_write_conflict_boundary); + if (lock) { + mutex_.Unlock(); + } + return snapshot; } void DBImpl::ReleaseSnapshot(const Snapshot* s) { diff --git a/db/db_impl.h b/db/db_impl.h index 166dc6abe7..4b663cf238 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -700,6 +700,12 @@ class DBImpl : public DB { void SetSnapshotChecker(SnapshotChecker* snapshot_checker); + // Fill JobContext with snapshot information needed by flush and compaction. + void GetSnapshotContext(JobContext* job_context, + std::vector* snapshot_seqs, + SequenceNumber* earliest_write_conflict_snapshot, + SnapshotChecker** snapshot_checker); + // Not thread-safe. void SetRecoverableStatePreReleaseCallback(PreReleaseCallback* callback); @@ -1148,7 +1154,8 @@ class DBImpl : public DB { // helper function to call after some of the logs_ were synced void MarkLogsSynced(uint64_t up_to, bool synced_dir, const Status& status); - SnapshotImpl* GetSnapshotImpl(bool is_write_conflict_boundary); + SnapshotImpl* GetSnapshotImpl(bool is_write_conflict_boundary, + bool lock = true); uint64_t GetMaxTotalWalSize() const; diff --git a/db/db_impl_compaction_flush.cc b/db/db_impl_compaction_flush.cc index dccc73debd..d0feb9c40d 100644 --- a/db/db_impl_compaction_flush.cc +++ b/db/db_impl_compaction_flush.cc @@ -137,14 +137,12 @@ Status DBImpl::FlushMemTableToOutputFile( assert(cfd->imm()->NumNotFlushed() != 0); assert(cfd->imm()->IsFlushPending()); + std::vector snapshot_seqs; SequenceNumber earliest_write_conflict_snapshot; - std::vector snapshot_seqs = - snapshots_.GetAll(&earliest_write_conflict_snapshot); + SnapshotChecker* snapshot_checker; + GetSnapshotContext(job_context, &snapshot_seqs, + &earliest_write_conflict_snapshot, &snapshot_checker); - auto snapshot_checker = snapshot_checker_.get(); - if (use_custom_gc_ && snapshot_checker == nullptr) { - snapshot_checker = DisableGCSnapshotChecker::Instance(); - } FlushJob flush_job( dbname_, cfd, immutable_db_options_, mutable_cf_options, nullptr /* memtable_id */, env_options_for_compaction_, versions_.get(), @@ -287,14 +285,12 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( } #endif /* !NDEBUG */ + std::vector snapshot_seqs; SequenceNumber earliest_write_conflict_snapshot; - std::vector snapshot_seqs = - snapshots_.GetAll(&earliest_write_conflict_snapshot); + SnapshotChecker* snapshot_checker; + GetSnapshotContext(job_context, &snapshot_seqs, + &earliest_write_conflict_snapshot, &snapshot_checker); - auto snapshot_checker = snapshot_checker_.get(); - if (use_custom_gc_ && snapshot_checker == nullptr) { - snapshot_checker = DisableGCSnapshotChecker::Instance(); - } autovector distinct_output_dirs; std::vector jobs; std::vector all_mutable_cf_options; @@ -936,17 +932,15 @@ Status DBImpl::CompactFilesImpl( // deletion compaction currently not allowed in CompactFiles. assert(!c->deletion_compaction()); + std::vector snapshot_seqs; SequenceNumber earliest_write_conflict_snapshot; - std::vector snapshot_seqs = - snapshots_.GetAll(&earliest_write_conflict_snapshot); + SnapshotChecker* snapshot_checker; + GetSnapshotContext(job_context, &snapshot_seqs, + &earliest_write_conflict_snapshot, &snapshot_checker); auto pending_outputs_inserted_elem = CaptureCurrentFileNumberInPendingOutputs(); - auto snapshot_checker = snapshot_checker_.get(); - if (use_custom_gc_ && snapshot_checker == nullptr) { - snapshot_checker = DisableGCSnapshotChecker::Instance(); - } assert(is_snapshot_supported_ || snapshots_.empty()); CompactionJobStats compaction_job_stats; CompactionJob compaction_job( @@ -2576,14 +2570,11 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, output_level = c->output_level(); TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:NonTrivial", &output_level); + std::vector snapshot_seqs; SequenceNumber earliest_write_conflict_snapshot; - std::vector snapshot_seqs = - snapshots_.GetAll(&earliest_write_conflict_snapshot); - - auto snapshot_checker = snapshot_checker_.get(); - if (use_custom_gc_ && snapshot_checker == nullptr) { - snapshot_checker = DisableGCSnapshotChecker::Instance(); - } + SnapshotChecker* snapshot_checker; + GetSnapshotContext(job_context, &snapshot_seqs, + &earliest_write_conflict_snapshot, &snapshot_checker); assert(is_snapshot_supported_ || snapshots_.empty()); CompactionJob compaction_job( job_context->job_id, c.get(), immutable_db_options_, @@ -2914,4 +2905,31 @@ void DBImpl::SetSnapshotChecker(SnapshotChecker* snapshot_checker) { assert(!snapshot_checker_); snapshot_checker_.reset(snapshot_checker); } + +void DBImpl::GetSnapshotContext( + JobContext* job_context, std::vector* snapshot_seqs, + SequenceNumber* earliest_write_conflict_snapshot, + SnapshotChecker** snapshot_checker_ptr) { + mutex_.AssertHeld(); + assert(job_context != nullptr); + assert(snapshot_seqs != nullptr); + assert(earliest_write_conflict_snapshot != nullptr); + assert(snapshot_checker_ptr != nullptr); + + *snapshot_checker_ptr = snapshot_checker_.get(); + if (use_custom_gc_ && *snapshot_checker_ptr == nullptr) { + *snapshot_checker_ptr = DisableGCSnapshotChecker::Instance(); + } + if (*snapshot_checker_ptr != nullptr) { + // If snapshot_checker is used, that means the flush/compaction may + // contain values not visible to snapshot taken after + // flush/compaction job starts. Take a snapshot and it will appear + // in snapshot_seqs and force compaction iterator to consider such + // snapshots. + const Snapshot* job_snapshot = + GetSnapshotImpl(false /*write_conflict_boundary*/, false /*lock*/); + job_context->job_snapshot.reset(new ManagedSnapshot(this, job_snapshot)); + } + *snapshot_seqs = snapshots_.GetAll(earliest_write_conflict_snapshot); +} } // namespace rocksdb diff --git a/db/job_context.h b/db/job_context.h index 498ef7d176..3978fad33c 100644 --- a/db/job_context.h +++ b/db/job_context.h @@ -175,6 +175,9 @@ struct JobContext { size_t num_alive_log_files = 0; uint64_t size_log_to_delete = 0; + // Snapshot taken before flush/compaction job. + std::unique_ptr job_snapshot; + explicit JobContext(int _job_id, bool create_superversion = false) { job_id = _job_id; manifest_file_number = 0; @@ -204,6 +207,7 @@ struct JobContext { memtables_to_free.clear(); logs_to_free.clear(); + job_snapshot.reset(); } ~JobContext() { diff --git a/utilities/transactions/write_prepared_transaction_test.cc b/utilities/transactions/write_prepared_transaction_test.cc index d0085de408..2ddfb9758b 100644 --- a/utilities/transactions/write_prepared_transaction_test.cc +++ b/utilities/transactions/write_prepared_transaction_test.cc @@ -2374,6 +2374,34 @@ TEST_P(WritePreparedTransactionTest, delete transaction; } +TEST_P(WritePreparedTransactionTest, CommitAndSnapshotDuringCompaction) { + options.disable_auto_compactions = true; + ReOpen(); + + const Snapshot* snapshot = nullptr; + ASSERT_OK(db->Put(WriteOptions(), "key1", "value1")); + auto* txn = db->BeginTransaction(WriteOptions()); + ASSERT_OK(txn->SetName("txn")); + ASSERT_OK(txn->Put("key1", "value2")); + ASSERT_OK(txn->Prepare()); + + auto callback = [&](void*) { + // Snapshot is taken after compaction start. It should be taken into + // consideration for whether to compact out value1. + snapshot = db->GetSnapshot(); + ASSERT_OK(txn->Commit()); + delete txn; + }; + SyncPoint::GetInstance()->SetCallBack("CompactionIterator:AfterInit", + callback); + SyncPoint::GetInstance()->EnableProcessing(); + ASSERT_OK(db->Flush(FlushOptions())); + ASSERT_NE(nullptr, snapshot); + VerifyKeys({{"key1", "value2"}}); + VerifyKeys({{"key1", "value1"}}, snapshot); + db->ReleaseSnapshot(snapshot); +} + TEST_P(WritePreparedTransactionTest, Iterate) { auto verify_state = [](Iterator* iter, const std::string& key, const std::string& value) {