diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index d9cd321f37..dbf6e46a23 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -5961,9 +5961,9 @@ Status DBImpl::IngestExternalFiles( uint64_t start_file_number = next_file_number; for (size_t i = 1; i != num_cfs; ++i) { start_file_number += args[i - 1].external_files.size(); - auto* cfd = - static_cast(args[i].column_family)->cfd(); - SuperVersion* super_version = cfd->GetReferencedSuperVersion(this); + SuperVersion* super_version = + ingestion_jobs[i].GetColumnFamilyData()->GetReferencedSuperVersion( + this); Status es = ingestion_jobs[i].Prepare( args[i].external_files, args[i].files_checksums, args[i].files_checksum_func_names, args[i].file_temperature, @@ -5977,9 +5977,9 @@ Status DBImpl::IngestExternalFiles( TEST_SYNC_POINT("DBImpl::IngestExternalFiles:BeforeLastJobPrepare:0"); TEST_SYNC_POINT("DBImpl::IngestExternalFiles:BeforeLastJobPrepare:1"); { - auto* cfd = - static_cast(args[0].column_family)->cfd(); - SuperVersion* super_version = cfd->GetReferencedSuperVersion(this); + SuperVersion* super_version = + ingestion_jobs[0].GetColumnFamilyData()->GetReferencedSuperVersion( + this); Status es = ingestion_jobs[0].Prepare( args[0].external_files, args[0].files_checksums, args[0].files_checksum_func_names, args[0].file_temperature, @@ -6030,8 +6030,7 @@ Status DBImpl::IngestExternalFiles( bool at_least_one_cf_need_flush = false; std::vector need_flush(num_cfs, false); for (size_t i = 0; i != num_cfs; ++i) { - auto* cfd = - static_cast(args[i].column_family)->cfd(); + auto* cfd = ingestion_jobs[i].GetColumnFamilyData(); if (cfd->IsDropped()) { // TODO (yanqin) investigate whether we should abort ingestion or // proceed with other non-dropped column families. @@ -6063,12 +6062,10 @@ Status DBImpl::IngestExternalFiles( for (size_t i = 0; i != num_cfs; ++i) { if (need_flush[i]) { mutex_.Unlock(); - auto* cfd = - static_cast(args[i].column_family) - ->cfd(); - status = FlushMemTable(cfd, flush_opts, - FlushReason::kExternalFileIngestion, - true /* entered_write_thread */); + status = + FlushMemTable(ingestion_jobs[i].GetColumnFamilyData(), + flush_opts, FlushReason::kExternalFileIngestion, + true /* entered_write_thread */); mutex_.Lock(); if (!status.ok()) { break; @@ -6076,6 +6073,13 @@ Status DBImpl::IngestExternalFiles( } } } + if (status.ok()) { + for (size_t i = 0; i != num_cfs; ++i) { + if (immutable_db_options_.atomic_flush || need_flush[i]) { + ingestion_jobs[i].SetFlushedBeforeRun(); + } + } + } } // Run ingestion jobs. if (status.ok()) { @@ -6096,11 +6100,8 @@ Status DBImpl::IngestExternalFiles( autovector> edit_lists; uint32_t num_entries = 0; for (size_t i = 0; i != num_cfs; ++i) { - auto* cfd = - static_cast(args[i].column_family)->cfd(); - if (cfd->IsDropped()) { - continue; - } + auto* cfd = ingestion_jobs[i].GetColumnFamilyData(); + assert(!cfd->IsDropped()); cfds_to_commit.push_back(cfd); mutable_cf_options_list.push_back(cfd->GetLatestMutableCFOptions()); autovector edit_list; @@ -6150,20 +6151,16 @@ Status DBImpl::IngestExternalFiles( if (status.ok()) { for (size_t i = 0; i != num_cfs; ++i) { - auto* cfd = - static_cast(args[i].column_family)->cfd(); - if (!cfd->IsDropped()) { - InstallSuperVersionAndScheduleWork(cfd, &sv_ctxs[i], - *cfd->GetLatestMutableCFOptions()); + auto* cfd = ingestion_jobs[i].GetColumnFamilyData(); + assert(!cfd->IsDropped()); + InstallSuperVersionAndScheduleWork(cfd, &sv_ctxs[i], + *cfd->GetLatestMutableCFOptions()); #ifndef NDEBUG - if (0 == i && num_cfs > 1) { - TEST_SYNC_POINT( - "DBImpl::IngestExternalFiles:InstallSVForFirstCF:0"); - TEST_SYNC_POINT( - "DBImpl::IngestExternalFiles:InstallSVForFirstCF:1"); - } -#endif // !NDEBUG + if (0 == i && num_cfs > 1) { + TEST_SYNC_POINT("DBImpl::IngestExternalFiles:InstallSVForFirstCF:0"); + TEST_SYNC_POINT("DBImpl::IngestExternalFiles:InstallSVForFirstCF:1"); } +#endif // !NDEBUG } } else if (versions_->io_status().IsIOError()) { // Error while writing to MANIFEST. @@ -6205,8 +6202,7 @@ Status DBImpl::IngestExternalFiles( } if (status.ok()) { for (size_t i = 0; i != num_cfs; ++i) { - auto* cfd = - static_cast(args[i].column_family)->cfd(); + auto* cfd = ingestion_jobs[i].GetColumnFamilyData(); if (!cfd->IsDropped()) { NotifyOnExternalFileIngested(cfd, ingestion_jobs[i]); } diff --git a/db/external_sst_file_basic_test.cc b/db/external_sst_file_basic_test.cc index 1c57102c3c..cccf84efde 100644 --- a/db/external_sst_file_basic_test.cc +++ b/db/external_sst_file_basic_test.cc @@ -1818,6 +1818,9 @@ TEST_F(ExternalSSTFileBasicTest, OverlappingFiles) { } IngestExternalFileOptions ifo; + ifo.allow_global_seqno = false; + ASSERT_NOK(db_->IngestExternalFile(files, ifo)); + ifo.allow_global_seqno = true; ASSERT_OK(db_->IngestExternalFile(files, ifo)); ASSERT_EQ(Get("a"), "a1"); ASSERT_EQ(Get("i"), "i2"); @@ -2575,6 +2578,57 @@ TEST_F(ExternalSSTFileBasicTest, StableSnapshotWhileLoggingToManifest) { ASSERT_EQ(db_->GetLatestSequenceNumber(), ingested_file_seqno + 1); } +TEST_F(ExternalSSTFileBasicTest, ConcurrentIngestionAndDropColumnFamily) { + int kNumCFs = 10; + Options options = CurrentOptions(); + CreateColumnFamilies({"cf_0", "cf_1", "cf_2", "cf_3", "cf_4", "cf_5", "cf_6", + "cf_7", "cf_8", "cf_9"}, + options); + + IngestExternalFileArg ingest_arg; + IngestExternalFileOptions ifo; + std::string external_file = sst_files_dir_ + "/file_to_ingest.sst"; + SstFileWriter sst_file_writer{EnvOptions(), CurrentOptions()}; + ASSERT_OK(sst_file_writer.Open(external_file)); + ASSERT_OK(sst_file_writer.Put("key", "value")); + ASSERT_OK(sst_file_writer.Finish()); + ifo.move_files = false; + ingest_arg.external_files = {external_file}; + ingest_arg.options = ifo; + + std::vector threads; + threads.reserve(2 * kNumCFs); + std::atomic success_ingestion_count = 0; + std::atomic failed_ingestion_count = 0; + for (int i = 0; i < kNumCFs; i++) { + threads.emplace_back( + [this, i]() { ASSERT_OK(db_->DropColumnFamily(handles_[i])); }); + threads.emplace_back([this, i, ingest_arg, &success_ingestion_count, + &failed_ingestion_count]() { + IngestExternalFileArg arg_copy = ingest_arg; + arg_copy.column_family = handles_[i]; + Status s = db_->IngestExternalFiles({arg_copy}); + ReadOptions ropts; + std::string value; + if (s.ok()) { + ASSERT_OK(db_->Get(ropts, handles_[i], "key", &value)); + ASSERT_EQ("value", value); + success_ingestion_count.fetch_add(1); + } else { + ASSERT_TRUE(db_->Get(ropts, handles_[i], "key", &value).IsNotFound()); + failed_ingestion_count.fetch_add(1); + } + }); + } + + for (auto& t : threads) { + t.join(); + } + + ASSERT_EQ(kNumCFs, success_ingestion_count + failed_ingestion_count); + Close(); +} + INSTANTIATE_TEST_CASE_P(ExternalSSTFileBasicTest, ExternalSSTFileBasicTest, testing::Values(std::make_tuple(true, true), std::make_tuple(true, false), diff --git a/db/external_sst_file_ingestion_job.cc b/db/external_sst_file_ingestion_job.cc index 63a5f6fc8d..61588b2453 100644 --- a/db/external_sst_file_ingestion_job.cc +++ b/db/external_sst_file_ingestion_job.cc @@ -95,6 +95,14 @@ Status ExternalSstFileIngestionJob::Prepare( "behind mode."); } + // Overlapping files need at least two different sequence numbers. If settings + // disables global seqno, ingestion will fail anyway, so fail fast in prepare. + if (!ingestion_options_.allow_global_seqno && files_overlap_) { + return Status::InvalidArgument( + "Global seqno is required, but disabled (because external files key " + "range overlaps)."); + } + if (ucmp_->timestamp_size() > 0 && files_overlap_) { return Status::NotSupported( "Files with overlapping ranges cannot be ingested to column " @@ -387,8 +395,16 @@ Status ExternalSstFileIngestionJob::NeedsFlush(bool* flush_needed, // REQUIRES: we have become the only writer by entering both write_thread_ and // nonmem_write_thread_ Status ExternalSstFileIngestionJob::Run() { - Status status; SuperVersion* super_version = cfd_->GetSuperVersion(); + // If column family is flushed after Prepare and before Run, we should have a + // specific state of Memtables. The mutable Memtable should be empty, and the + // immutable Memtable list should be empty. + if (flushed_before_run_ && (super_version->imm->NumNotFlushed() != 0 || + super_version->mem->GetDataSize() != 0)) { + return Status::TryAgain( + "Inconsistent memtable state detected when flushed before run."); + } + Status status; #ifndef NDEBUG // We should never run the job with a memtable that is overlapping // with the files we are ingesting diff --git a/db/external_sst_file_ingestion_job.h b/db/external_sst_file_ingestion_job.h index df66e9e918..4a853afed9 100644 --- a/db/external_sst_file_ingestion_job.h +++ b/db/external_sst_file_ingestion_job.h @@ -212,6 +212,8 @@ class ExternalSstFileIngestionJob { ~ExternalSstFileIngestionJob() { UnregisterRange(); } + ColumnFamilyData* GetColumnFamilyData() const { return cfd_; } + // Prepare the job by copying external files into the DB. Status Prepare(const std::vector& external_files_paths, const std::vector& files_checksums, @@ -229,6 +231,8 @@ class ExternalSstFileIngestionJob { // Thread-safe Status NeedsFlush(bool* flush_needed, SuperVersion* super_version); + void SetFlushedBeforeRun() { flushed_before_run_ = true; } + // Will execute the ingestion job and prepare edit() to be applied. // REQUIRES: Mutex held Status Run(); @@ -371,6 +375,10 @@ class ExternalSstFileIngestionJob { bool need_generate_file_checksum_{true}; std::shared_ptr io_tracer_; + // Flag indicating whether the column family is flushed after `Prepare` and + // before `Run`. + bool flushed_before_run_{false}; + // Below are variables used in (un)registering range for this ingestion job // // FileMetaData used in inputs of compactions equivalent to this ingestion diff --git a/db/memtable_list.cc b/db/memtable_list.cc index 5f4029af2e..4e1b38b81f 100644 --- a/db/memtable_list.cc +++ b/db/memtable_list.cc @@ -92,14 +92,12 @@ void MemTableListVersion::Unref(autovector* to_delete) { } int MemTableList::NumNotFlushed() const { - int size = static_cast(current_->memlist_.size()); + int size = current_->NumNotFlushed(); assert(num_flush_not_started_ <= size); return size; } -int MemTableList::NumFlushed() const { - return static_cast(current_->memlist_history_.size()); -} +int MemTableList::NumFlushed() const { return current_->NumFlushed(); } // Search all the memtables starting from the most recent one. // Return the most recent value found, if any. diff --git a/db/memtable_list.h b/db/memtable_list.h index a1e9311aff..48075768cf 100644 --- a/db/memtable_list.h +++ b/db/memtable_list.h @@ -146,6 +146,10 @@ class MemTableListVersion { uint64_t GetID() const { return id_; } + int NumNotFlushed() const { return static_cast(memlist_.size()); } + + int NumFlushed() const { return static_cast(memlist_history_.size()); } + private: friend class MemTableList;