Add some checks for the file ingestion flow (#13100)

Summary:
This PR does a few misc things for file ingestion flow:

- Add an invalid argument status return for the combination of `allow_global_seqno = false` and external files' key range overlap in `Prepare` stage.
- Add a MemTables status check for when column family is flushed before `Run`.
- Replace the column family dropped check with an assertion after thread enters the write queue and before it exits the write queue, since dropping column family can only happen in the single threaded write queue too and we already checked once after enter write queue.
- Add an `ExternalSstFileIngestionJob::GetColumnFamilyData` API.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/13100

Test Plan: Added unit tests, and stress tested the ingestion path

Reviewed By: hx235

Differential Revision: D65180472

Pulled By: jowlyzhang

fbshipit-source-id: 180145dd248a7507a13a543481b135e5a31ebe2d
This commit is contained in:
Yu Zhang 2024-11-05 15:44:56 -08:00 committed by Facebook GitHub Bot
parent 8089eae240
commit dc34a0ff1e
6 changed files with 114 additions and 38 deletions

View File

@ -5961,9 +5961,9 @@ Status DBImpl::IngestExternalFiles(
uint64_t start_file_number = next_file_number;
for (size_t i = 1; i != num_cfs; ++i) {
start_file_number += args[i - 1].external_files.size();
auto* cfd =
static_cast<ColumnFamilyHandleImpl*>(args[i].column_family)->cfd();
SuperVersion* super_version = cfd->GetReferencedSuperVersion(this);
SuperVersion* super_version =
ingestion_jobs[i].GetColumnFamilyData()->GetReferencedSuperVersion(
this);
Status es = ingestion_jobs[i].Prepare(
args[i].external_files, args[i].files_checksums,
args[i].files_checksum_func_names, args[i].file_temperature,
@ -5977,9 +5977,9 @@ Status DBImpl::IngestExternalFiles(
TEST_SYNC_POINT("DBImpl::IngestExternalFiles:BeforeLastJobPrepare:0");
TEST_SYNC_POINT("DBImpl::IngestExternalFiles:BeforeLastJobPrepare:1");
{
auto* cfd =
static_cast<ColumnFamilyHandleImpl*>(args[0].column_family)->cfd();
SuperVersion* super_version = cfd->GetReferencedSuperVersion(this);
SuperVersion* super_version =
ingestion_jobs[0].GetColumnFamilyData()->GetReferencedSuperVersion(
this);
Status es = ingestion_jobs[0].Prepare(
args[0].external_files, args[0].files_checksums,
args[0].files_checksum_func_names, args[0].file_temperature,
@ -6030,8 +6030,7 @@ Status DBImpl::IngestExternalFiles(
bool at_least_one_cf_need_flush = false;
std::vector<bool> need_flush(num_cfs, false);
for (size_t i = 0; i != num_cfs; ++i) {
auto* cfd =
static_cast<ColumnFamilyHandleImpl*>(args[i].column_family)->cfd();
auto* cfd = ingestion_jobs[i].GetColumnFamilyData();
if (cfd->IsDropped()) {
// TODO (yanqin) investigate whether we should abort ingestion or
// proceed with other non-dropped column families.
@ -6063,11 +6062,9 @@ Status DBImpl::IngestExternalFiles(
for (size_t i = 0; i != num_cfs; ++i) {
if (need_flush[i]) {
mutex_.Unlock();
auto* cfd =
static_cast<ColumnFamilyHandleImpl*>(args[i].column_family)
->cfd();
status = FlushMemTable(cfd, flush_opts,
FlushReason::kExternalFileIngestion,
status =
FlushMemTable(ingestion_jobs[i].GetColumnFamilyData(),
flush_opts, FlushReason::kExternalFileIngestion,
true /* entered_write_thread */);
mutex_.Lock();
if (!status.ok()) {
@ -6076,6 +6073,13 @@ Status DBImpl::IngestExternalFiles(
}
}
}
if (status.ok()) {
for (size_t i = 0; i != num_cfs; ++i) {
if (immutable_db_options_.atomic_flush || need_flush[i]) {
ingestion_jobs[i].SetFlushedBeforeRun();
}
}
}
}
// Run ingestion jobs.
if (status.ok()) {
@ -6096,11 +6100,8 @@ Status DBImpl::IngestExternalFiles(
autovector<autovector<VersionEdit*>> edit_lists;
uint32_t num_entries = 0;
for (size_t i = 0; i != num_cfs; ++i) {
auto* cfd =
static_cast<ColumnFamilyHandleImpl*>(args[i].column_family)->cfd();
if (cfd->IsDropped()) {
continue;
}
auto* cfd = ingestion_jobs[i].GetColumnFamilyData();
assert(!cfd->IsDropped());
cfds_to_commit.push_back(cfd);
mutable_cf_options_list.push_back(cfd->GetLatestMutableCFOptions());
autovector<VersionEdit*> edit_list;
@ -6150,21 +6151,17 @@ Status DBImpl::IngestExternalFiles(
if (status.ok()) {
for (size_t i = 0; i != num_cfs; ++i) {
auto* cfd =
static_cast<ColumnFamilyHandleImpl*>(args[i].column_family)->cfd();
if (!cfd->IsDropped()) {
auto* cfd = ingestion_jobs[i].GetColumnFamilyData();
assert(!cfd->IsDropped());
InstallSuperVersionAndScheduleWork(cfd, &sv_ctxs[i],
*cfd->GetLatestMutableCFOptions());
#ifndef NDEBUG
if (0 == i && num_cfs > 1) {
TEST_SYNC_POINT(
"DBImpl::IngestExternalFiles:InstallSVForFirstCF:0");
TEST_SYNC_POINT(
"DBImpl::IngestExternalFiles:InstallSVForFirstCF:1");
TEST_SYNC_POINT("DBImpl::IngestExternalFiles:InstallSVForFirstCF:0");
TEST_SYNC_POINT("DBImpl::IngestExternalFiles:InstallSVForFirstCF:1");
}
#endif // !NDEBUG
}
}
} else if (versions_->io_status().IsIOError()) {
// Error while writing to MANIFEST.
// In fact, versions_->io_status() can also be the result of renaming
@ -6205,8 +6202,7 @@ Status DBImpl::IngestExternalFiles(
}
if (status.ok()) {
for (size_t i = 0; i != num_cfs; ++i) {
auto* cfd =
static_cast<ColumnFamilyHandleImpl*>(args[i].column_family)->cfd();
auto* cfd = ingestion_jobs[i].GetColumnFamilyData();
if (!cfd->IsDropped()) {
NotifyOnExternalFileIngested(cfd, ingestion_jobs[i]);
}

View File

@ -1818,6 +1818,9 @@ TEST_F(ExternalSSTFileBasicTest, OverlappingFiles) {
}
IngestExternalFileOptions ifo;
ifo.allow_global_seqno = false;
ASSERT_NOK(db_->IngestExternalFile(files, ifo));
ifo.allow_global_seqno = true;
ASSERT_OK(db_->IngestExternalFile(files, ifo));
ASSERT_EQ(Get("a"), "a1");
ASSERT_EQ(Get("i"), "i2");
@ -2575,6 +2578,57 @@ TEST_F(ExternalSSTFileBasicTest, StableSnapshotWhileLoggingToManifest) {
ASSERT_EQ(db_->GetLatestSequenceNumber(), ingested_file_seqno + 1);
}
TEST_F(ExternalSSTFileBasicTest, ConcurrentIngestionAndDropColumnFamily) {
int kNumCFs = 10;
Options options = CurrentOptions();
CreateColumnFamilies({"cf_0", "cf_1", "cf_2", "cf_3", "cf_4", "cf_5", "cf_6",
"cf_7", "cf_8", "cf_9"},
options);
IngestExternalFileArg ingest_arg;
IngestExternalFileOptions ifo;
std::string external_file = sst_files_dir_ + "/file_to_ingest.sst";
SstFileWriter sst_file_writer{EnvOptions(), CurrentOptions()};
ASSERT_OK(sst_file_writer.Open(external_file));
ASSERT_OK(sst_file_writer.Put("key", "value"));
ASSERT_OK(sst_file_writer.Finish());
ifo.move_files = false;
ingest_arg.external_files = {external_file};
ingest_arg.options = ifo;
std::vector<std::thread> threads;
threads.reserve(2 * kNumCFs);
std::atomic<int> success_ingestion_count = 0;
std::atomic<int> failed_ingestion_count = 0;
for (int i = 0; i < kNumCFs; i++) {
threads.emplace_back(
[this, i]() { ASSERT_OK(db_->DropColumnFamily(handles_[i])); });
threads.emplace_back([this, i, ingest_arg, &success_ingestion_count,
&failed_ingestion_count]() {
IngestExternalFileArg arg_copy = ingest_arg;
arg_copy.column_family = handles_[i];
Status s = db_->IngestExternalFiles({arg_copy});
ReadOptions ropts;
std::string value;
if (s.ok()) {
ASSERT_OK(db_->Get(ropts, handles_[i], "key", &value));
ASSERT_EQ("value", value);
success_ingestion_count.fetch_add(1);
} else {
ASSERT_TRUE(db_->Get(ropts, handles_[i], "key", &value).IsNotFound());
failed_ingestion_count.fetch_add(1);
}
});
}
for (auto& t : threads) {
t.join();
}
ASSERT_EQ(kNumCFs, success_ingestion_count + failed_ingestion_count);
Close();
}
INSTANTIATE_TEST_CASE_P(ExternalSSTFileBasicTest, ExternalSSTFileBasicTest,
testing::Values(std::make_tuple(true, true),
std::make_tuple(true, false),

View File

@ -95,6 +95,14 @@ Status ExternalSstFileIngestionJob::Prepare(
"behind mode.");
}
// Overlapping files need at least two different sequence numbers. If settings
// disables global seqno, ingestion will fail anyway, so fail fast in prepare.
if (!ingestion_options_.allow_global_seqno && files_overlap_) {
return Status::InvalidArgument(
"Global seqno is required, but disabled (because external files key "
"range overlaps).");
}
if (ucmp_->timestamp_size() > 0 && files_overlap_) {
return Status::NotSupported(
"Files with overlapping ranges cannot be ingested to column "
@ -387,8 +395,16 @@ Status ExternalSstFileIngestionJob::NeedsFlush(bool* flush_needed,
// REQUIRES: we have become the only writer by entering both write_thread_ and
// nonmem_write_thread_
Status ExternalSstFileIngestionJob::Run() {
Status status;
SuperVersion* super_version = cfd_->GetSuperVersion();
// If column family is flushed after Prepare and before Run, we should have a
// specific state of Memtables. The mutable Memtable should be empty, and the
// immutable Memtable list should be empty.
if (flushed_before_run_ && (super_version->imm->NumNotFlushed() != 0 ||
super_version->mem->GetDataSize() != 0)) {
return Status::TryAgain(
"Inconsistent memtable state detected when flushed before run.");
}
Status status;
#ifndef NDEBUG
// We should never run the job with a memtable that is overlapping
// with the files we are ingesting

View File

@ -212,6 +212,8 @@ class ExternalSstFileIngestionJob {
~ExternalSstFileIngestionJob() { UnregisterRange(); }
ColumnFamilyData* GetColumnFamilyData() const { return cfd_; }
// Prepare the job by copying external files into the DB.
Status Prepare(const std::vector<std::string>& external_files_paths,
const std::vector<std::string>& files_checksums,
@ -229,6 +231,8 @@ class ExternalSstFileIngestionJob {
// Thread-safe
Status NeedsFlush(bool* flush_needed, SuperVersion* super_version);
void SetFlushedBeforeRun() { flushed_before_run_ = true; }
// Will execute the ingestion job and prepare edit() to be applied.
// REQUIRES: Mutex held
Status Run();
@ -371,6 +375,10 @@ class ExternalSstFileIngestionJob {
bool need_generate_file_checksum_{true};
std::shared_ptr<IOTracer> io_tracer_;
// Flag indicating whether the column family is flushed after `Prepare` and
// before `Run`.
bool flushed_before_run_{false};
// Below are variables used in (un)registering range for this ingestion job
//
// FileMetaData used in inputs of compactions equivalent to this ingestion

View File

@ -92,14 +92,12 @@ void MemTableListVersion::Unref(autovector<ReadOnlyMemTable*>* to_delete) {
}
int MemTableList::NumNotFlushed() const {
int size = static_cast<int>(current_->memlist_.size());
int size = current_->NumNotFlushed();
assert(num_flush_not_started_ <= size);
return size;
}
int MemTableList::NumFlushed() const {
return static_cast<int>(current_->memlist_history_.size());
}
int MemTableList::NumFlushed() const { return current_->NumFlushed(); }
// Search all the memtables starting from the most recent one.
// Return the most recent value found, if any.

View File

@ -146,6 +146,10 @@ class MemTableListVersion {
uint64_t GetID() const { return id_; }
int NumNotFlushed() const { return static_cast<int>(memlist_.size()); }
int NumFlushed() const { return static_cast<int>(memlist_history_.size()); }
private:
friend class MemTableList;