diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index e0140707d9..9e01f5b031 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -4809,19 +4809,6 @@ Status DBImpl::IngestExternalFiles( } } if (status.ok()) { - int consumed_seqno_count = - ingestion_jobs[0].ConsumedSequenceNumbersCount(); - for (size_t i = 1; i != num_cfs; ++i) { - consumed_seqno_count = - std::max(consumed_seqno_count, - ingestion_jobs[i].ConsumedSequenceNumbersCount()); - } - if (consumed_seqno_count > 0) { - const SequenceNumber last_seqno = versions_->LastSequence(); - versions_->SetLastAllocatedSequence(last_seqno + consumed_seqno_count); - versions_->SetLastPublishedSequence(last_seqno + consumed_seqno_count); - versions_->SetLastSequence(last_seqno + consumed_seqno_count); - } autovector cfds_to_commit; autovector mutable_cf_options_list; autovector> edit_lists; @@ -4851,6 +4838,27 @@ Status DBImpl::IngestExternalFiles( status = versions_->LogAndApply(cfds_to_commit, mutable_cf_options_list, edit_lists, &mutex_, directories_.GetDbDir()); + // It is safe to update VersionSet last seqno here after LogAndApply since + // LogAndApply persists last sequence number from VersionEdits, + // which are from file's largest seqno and not from VersionSet. + // + // It is necessary to update last seqno here since LogAndApply releases + // mutex when persisting MANIFEST file, and the snapshots taken during + // that period will not be stable if VersionSet last seqno is updated + // before LogAndApply. + int consumed_seqno_count = + ingestion_jobs[0].ConsumedSequenceNumbersCount(); + for (size_t i = 1; i != num_cfs; ++i) { + consumed_seqno_count = + std::max(consumed_seqno_count, + ingestion_jobs[i].ConsumedSequenceNumbersCount()); + } + if (consumed_seqno_count > 0) { + const SequenceNumber last_seqno = versions_->LastSequence(); + versions_->SetLastAllocatedSequence(last_seqno + consumed_seqno_count); + versions_->SetLastPublishedSequence(last_seqno + consumed_seqno_count); + versions_->SetLastSequence(last_seqno + consumed_seqno_count); + } } if (status.ok()) { diff --git a/db/external_sst_file_basic_test.cc b/db/external_sst_file_basic_test.cc index 8001a4a4a4..57ba677a80 100644 --- a/db/external_sst_file_basic_test.cc +++ b/db/external_sst_file_basic_test.cc @@ -1923,6 +1923,63 @@ TEST_F(ExternalSSTFileBasicTest, VerifySstUniqueId) { ASSERT_EQ(skipped, 1); } +TEST_F(ExternalSSTFileBasicTest, StableSnapshotWhileLoggingToManifest) { + const std::string kPutVal = "put_val"; + const std::string kIngestedVal = "ingested_val"; + + ASSERT_OK(Put("k", kPutVal, WriteOptions())); + ASSERT_OK(Flush()); + + std::string external_file = sst_files_dir_ + "/file_to_ingest.sst"; + { + SstFileWriter sst_file_writer{EnvOptions(), CurrentOptions()}; + ASSERT_OK(sst_file_writer.Open(external_file)); + ASSERT_OK(sst_file_writer.Put("k", kIngestedVal)); + ASSERT_OK(sst_file_writer.Finish()); + } + + const Snapshot* snapshot = nullptr; + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "VersionSet::LogAndApply:WriteManifest", [&](void* /* arg */) { + // Prevent memory leak: this callback may be called multiple times + // and previous snapshot need to be freed + db_->ReleaseSnapshot(snapshot); + snapshot = db_->GetSnapshot(); + ReadOptions read_opts; + read_opts.snapshot = snapshot; + std::string value; + ASSERT_OK(db_->Get(read_opts, "k", &value)); + ASSERT_EQ(kPutVal, value); + }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + + ASSERT_OK(db_->IngestExternalFile(db_->DefaultColumnFamily(), {external_file}, + IngestExternalFileOptions())); + auto ingested_file_seqno = db_->GetLatestSequenceNumber(); + ASSERT_NE(nullptr, snapshot); + // snapshot is taken before SST ingestion is done + ASSERT_EQ(ingested_file_seqno, snapshot->GetSequenceNumber() + 1); + + ReadOptions read_opts; + read_opts.snapshot = snapshot; + std::string value; + ASSERT_OK(db_->Get(read_opts, "k", &value)); + ASSERT_EQ(kPutVal, value); + + db_->ReleaseSnapshot(snapshot); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); + // After reopen, sequence number should be up current such that + // ingested value is read + Reopen(CurrentOptions()); + ASSERT_OK(db_->Get(ReadOptions(), "k", &value)); + ASSERT_EQ(kIngestedVal, value); + + // New write should get higher seqno compared to ingested file + ASSERT_OK(Put("k", kPutVal, WriteOptions())); + ASSERT_EQ(db_->GetLatestSequenceNumber(), ingested_file_seqno + 1); +} + INSTANTIATE_TEST_CASE_P(ExternalSSTFileBasicTest, ExternalSSTFileBasicTest, testing::Values(std::make_tuple(true, true), std::make_tuple(true, false),