From b0e190604bacb2dd47f71260b78a9f93df5842f3 Mon Sep 17 00:00:00 2001 From: Changyu Bi Date: Wed, 25 May 2022 10:05:17 -0700 Subject: [PATCH] Update VersionSet last seqno after LogAndApply (#10051) Summary: This PR fixes the issue of unstable snapshot during external SST file ingestion. Credit ajkr for the following walk through: consider these relevant steps for of IngestExternalFile(): (1) increase seqno while holding mutex -- https://github.com/facebook/rocksdb/blob/677d2b4a8f8fd19d0c39a9ee8f648742e610688d/db/db_impl/db_impl.cc#L4768 (2) LogAndApply() -- https://github.com/facebook/rocksdb/blob/677d2b4a8f8fd19d0c39a9ee8f648742e610688d/db/db_impl/db_impl.cc#L4797-L4798 (a) write to MANIFEST with mutex released https://github.com/facebook/rocksdb/blob/a96a4a2f7ba7633ab2cc51defd1e923e20d239a6/db/version_set.cc#L4407 (b) apply to in-memory state with mutex held A snapshot taken during (2a) will be unstable. In particular, queries against that snapshot will not include data from the ingested file before (2b), and will include data from the ingested file after (2b). Pull Request resolved: https://github.com/facebook/rocksdb/pull/10051 Test Plan: Added a new unit test: `ExternalSSTFileBasicTest.WriteAfterReopenStableSnapshotWhileLoggingToManifest`. ``` make external_sst_file_basic_test ./external_sst_file_basic_test ``` Reviewed By: ajkr Differential Revision: D36654033 Pulled By: cbi42 fbshipit-source-id: bf720cca313e0cf211585960f3aff04853a31b96 --- db/db_impl/db_impl.cc | 34 +++++++++++------- db/external_sst_file_basic_test.cc | 57 ++++++++++++++++++++++++++++++ 2 files changed, 78 insertions(+), 13 deletions(-) diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index e0140707d9..9e01f5b031 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -4809,19 +4809,6 @@ Status DBImpl::IngestExternalFiles( } } if (status.ok()) { - int consumed_seqno_count = - ingestion_jobs[0].ConsumedSequenceNumbersCount(); - for (size_t i = 1; i != num_cfs; ++i) { - consumed_seqno_count = - std::max(consumed_seqno_count, - ingestion_jobs[i].ConsumedSequenceNumbersCount()); - } - if (consumed_seqno_count > 0) { - const SequenceNumber last_seqno = versions_->LastSequence(); - versions_->SetLastAllocatedSequence(last_seqno + consumed_seqno_count); - versions_->SetLastPublishedSequence(last_seqno + consumed_seqno_count); - versions_->SetLastSequence(last_seqno + consumed_seqno_count); - } autovector cfds_to_commit; autovector mutable_cf_options_list; autovector> edit_lists; @@ -4851,6 +4838,27 @@ Status DBImpl::IngestExternalFiles( status = versions_->LogAndApply(cfds_to_commit, mutable_cf_options_list, edit_lists, &mutex_, directories_.GetDbDir()); + // It is safe to update VersionSet last seqno here after LogAndApply since + // LogAndApply persists last sequence number from VersionEdits, + // which are from file's largest seqno and not from VersionSet. + // + // It is necessary to update last seqno here since LogAndApply releases + // mutex when persisting MANIFEST file, and the snapshots taken during + // that period will not be stable if VersionSet last seqno is updated + // before LogAndApply. + int consumed_seqno_count = + ingestion_jobs[0].ConsumedSequenceNumbersCount(); + for (size_t i = 1; i != num_cfs; ++i) { + consumed_seqno_count = + std::max(consumed_seqno_count, + ingestion_jobs[i].ConsumedSequenceNumbersCount()); + } + if (consumed_seqno_count > 0) { + const SequenceNumber last_seqno = versions_->LastSequence(); + versions_->SetLastAllocatedSequence(last_seqno + consumed_seqno_count); + versions_->SetLastPublishedSequence(last_seqno + consumed_seqno_count); + versions_->SetLastSequence(last_seqno + consumed_seqno_count); + } } if (status.ok()) { diff --git a/db/external_sst_file_basic_test.cc b/db/external_sst_file_basic_test.cc index 8001a4a4a4..57ba677a80 100644 --- a/db/external_sst_file_basic_test.cc +++ b/db/external_sst_file_basic_test.cc @@ -1923,6 +1923,63 @@ TEST_F(ExternalSSTFileBasicTest, VerifySstUniqueId) { ASSERT_EQ(skipped, 1); } +TEST_F(ExternalSSTFileBasicTest, StableSnapshotWhileLoggingToManifest) { + const std::string kPutVal = "put_val"; + const std::string kIngestedVal = "ingested_val"; + + ASSERT_OK(Put("k", kPutVal, WriteOptions())); + ASSERT_OK(Flush()); + + std::string external_file = sst_files_dir_ + "/file_to_ingest.sst"; + { + SstFileWriter sst_file_writer{EnvOptions(), CurrentOptions()}; + ASSERT_OK(sst_file_writer.Open(external_file)); + ASSERT_OK(sst_file_writer.Put("k", kIngestedVal)); + ASSERT_OK(sst_file_writer.Finish()); + } + + const Snapshot* snapshot = nullptr; + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "VersionSet::LogAndApply:WriteManifest", [&](void* /* arg */) { + // Prevent memory leak: this callback may be called multiple times + // and previous snapshot need to be freed + db_->ReleaseSnapshot(snapshot); + snapshot = db_->GetSnapshot(); + ReadOptions read_opts; + read_opts.snapshot = snapshot; + std::string value; + ASSERT_OK(db_->Get(read_opts, "k", &value)); + ASSERT_EQ(kPutVal, value); + }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + + ASSERT_OK(db_->IngestExternalFile(db_->DefaultColumnFamily(), {external_file}, + IngestExternalFileOptions())); + auto ingested_file_seqno = db_->GetLatestSequenceNumber(); + ASSERT_NE(nullptr, snapshot); + // snapshot is taken before SST ingestion is done + ASSERT_EQ(ingested_file_seqno, snapshot->GetSequenceNumber() + 1); + + ReadOptions read_opts; + read_opts.snapshot = snapshot; + std::string value; + ASSERT_OK(db_->Get(read_opts, "k", &value)); + ASSERT_EQ(kPutVal, value); + + db_->ReleaseSnapshot(snapshot); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); + // After reopen, sequence number should be up current such that + // ingested value is read + Reopen(CurrentOptions()); + ASSERT_OK(db_->Get(ReadOptions(), "k", &value)); + ASSERT_EQ(kIngestedVal, value); + + // New write should get higher seqno compared to ingested file + ASSERT_OK(Put("k", kPutVal, WriteOptions())); + ASSERT_EQ(db_->GetLatestSequenceNumber(), ingested_file_seqno + 1); +} + INSTANTIATE_TEST_CASE_P(ExternalSSTFileBasicTest, ExternalSSTFileBasicTest, testing::Values(std::make_tuple(true, true), std::make_tuple(true, false),