mirror of https://github.com/facebook/rocksdb.git
Refactor external sst file ingestion job (#12305)
Summary: Updates some documentations and invariant assertions after https://github.com/facebook/rocksdb/issues/12257 and https://github.com/facebook/rocksdb/issues/12284. Also refactored some duplicate code and improved some error message and preconditions for errors. Pull Request resolved: https://github.com/facebook/rocksdb/pull/12305 Test Plan: Existing unit tests Reviewed By: hx235 Differential Revision: D53371325 Pulled By: jowlyzhang fbshipit-source-id: fb0edcb3a3602cdf0a292ef437cfdfe897fc6c99
This commit is contained in:
parent
5620efc794
commit
4eaa771c01
|
@ -97,7 +97,9 @@ Status ExternalSstFileIngestionJob::Prepare(
|
|||
}
|
||||
|
||||
if (ingestion_options_.ingest_behind && files_overlap_) {
|
||||
return Status::NotSupported("Files have overlapping ranges");
|
||||
return Status::NotSupported(
|
||||
"Files with overlapping ranges cannot be ingested with ingestion "
|
||||
"behind mode.");
|
||||
}
|
||||
|
||||
// Copy/Move external files into DB
|
||||
|
@ -142,7 +144,7 @@ Status ExternalSstFileIngestionJob::Prepare(
|
|||
// Original file is on a different FS, use copy instead of hard linking.
|
||||
f.copy_file = true;
|
||||
ROCKS_LOG_INFO(db_options_.info_log,
|
||||
"Triy to link file %s but it's not supported : %s",
|
||||
"Tried to link file %s but it's not supported : %s",
|
||||
path_outside_db.c_str(), status.ToString().c_str());
|
||||
}
|
||||
} else {
|
||||
|
@ -188,7 +190,7 @@ Status ExternalSstFileIngestionJob::Prepare(
|
|||
// Generate and check the sst file checksum. Note that, if
|
||||
// IngestExternalFileOptions::write_global_seqno is true, we will not update
|
||||
// the checksum information in the files_to_ingests_ here, since the file is
|
||||
// upadted with the new global_seqno. After global_seqno is updated, DB will
|
||||
// updated with the new global_seqno. After global_seqno is updated, DB will
|
||||
// generate the new checksum and store it in the Manifest. In all other cases
|
||||
// if ingestion_options_.write_global_seqno == true and
|
||||
// verify_file_checksum is false, we only check the checksum function name.
|
||||
|
@ -299,8 +301,7 @@ Status ExternalSstFileIngestionJob::Prepare(
|
|||
}
|
||||
}
|
||||
} else if (files_checksums.size() != files_checksum_func_names.size() ||
|
||||
(files_checksums.size() == files_checksum_func_names.size() &&
|
||||
files_checksums.size() != 0)) {
|
||||
files_checksums.size() != 0) {
|
||||
// The checksum or checksum function name vector are not both empty
|
||||
// and they are incomplete.
|
||||
status = Status::InvalidArgument(
|
||||
|
@ -316,21 +317,9 @@ Status ExternalSstFileIngestionJob::Prepare(
|
|||
}
|
||||
}
|
||||
|
||||
// TODO: The following is duplicated with Cleanup().
|
||||
if (!status.ok()) {
|
||||
IOOptions io_opts;
|
||||
// We failed, remove all files that we copied into the db
|
||||
for (IngestedFileInfo& f : files_to_ingest_) {
|
||||
if (f.internal_file_path.empty()) {
|
||||
continue;
|
||||
}
|
||||
Status s = fs_->DeleteFile(f.internal_file_path, io_opts, nullptr);
|
||||
if (!s.ok()) {
|
||||
ROCKS_LOG_WARN(db_options_.info_log,
|
||||
"AddFile() clean up for file %s failed : %s",
|
||||
f.internal_file_path.c_str(), s.ToString().c_str());
|
||||
}
|
||||
}
|
||||
DeleteInternalFiles();
|
||||
}
|
||||
|
||||
return status;
|
||||
|
@ -431,26 +420,26 @@ Status ExternalSstFileIngestionJob::Run() {
|
|||
if (!status.ok()) {
|
||||
return status;
|
||||
}
|
||||
if (smallest_parsed.sequence == 0) {
|
||||
if (smallest_parsed.sequence == 0 && assigned_seqno != 0) {
|
||||
UpdateInternalKey(f.smallest_internal_key.rep(), assigned_seqno,
|
||||
smallest_parsed.type);
|
||||
}
|
||||
if (largest_parsed.sequence == 0) {
|
||||
if (largest_parsed.sequence == 0 && assigned_seqno != 0) {
|
||||
UpdateInternalKey(f.largest_internal_key.rep(), assigned_seqno,
|
||||
largest_parsed.type);
|
||||
}
|
||||
|
||||
status = AssignGlobalSeqnoForIngestedFile(&f, assigned_seqno);
|
||||
TEST_SYNC_POINT_CALLBACK("ExternalSstFileIngestionJob::Run",
|
||||
&assigned_seqno);
|
||||
if (assigned_seqno > last_seqno) {
|
||||
assert(assigned_seqno == last_seqno + 1);
|
||||
last_seqno = assigned_seqno;
|
||||
++consumed_seqno_count_;
|
||||
}
|
||||
if (!status.ok()) {
|
||||
return status;
|
||||
}
|
||||
TEST_SYNC_POINT_CALLBACK("ExternalSstFileIngestionJob::Run",
|
||||
&assigned_seqno);
|
||||
assert(assigned_seqno == 0 || assigned_seqno == last_seqno + 1);
|
||||
if (assigned_seqno > last_seqno) {
|
||||
last_seqno = assigned_seqno;
|
||||
++consumed_seqno_count_;
|
||||
}
|
||||
|
||||
status = GenerateChecksumForIngestedFile(&f);
|
||||
if (!status.ok()) {
|
||||
|
@ -631,17 +620,7 @@ void ExternalSstFileIngestionJob::Cleanup(const Status& status) {
|
|||
if (!status.ok()) {
|
||||
// We failed to add the files to the database
|
||||
// remove all the files we copied
|
||||
for (IngestedFileInfo& f : files_to_ingest_) {
|
||||
if (f.internal_file_path.empty()) {
|
||||
continue;
|
||||
}
|
||||
Status s = fs_->DeleteFile(f.internal_file_path, io_opts, nullptr);
|
||||
if (!s.ok()) {
|
||||
ROCKS_LOG_WARN(db_options_.info_log,
|
||||
"AddFile() clean up for file %s failed : %s",
|
||||
f.internal_file_path.c_str(), s.ToString().c_str());
|
||||
}
|
||||
}
|
||||
DeleteInternalFiles();
|
||||
consumed_seqno_count_ = 0;
|
||||
files_overlap_ = false;
|
||||
} else if (status.ok() && ingestion_options_.move_files) {
|
||||
|
@ -659,6 +638,21 @@ void ExternalSstFileIngestionJob::Cleanup(const Status& status) {
|
|||
}
|
||||
}
|
||||
|
||||
void ExternalSstFileIngestionJob::DeleteInternalFiles() {
|
||||
IOOptions io_opts;
|
||||
for (IngestedFileInfo& f : files_to_ingest_) {
|
||||
if (f.internal_file_path.empty()) {
|
||||
continue;
|
||||
}
|
||||
Status s = fs_->DeleteFile(f.internal_file_path, io_opts, nullptr);
|
||||
if (!s.ok()) {
|
||||
ROCKS_LOG_WARN(db_options_.info_log,
|
||||
"AddFile() clean up for file %s failed : %s",
|
||||
f.internal_file_path.c_str(), s.ToString().c_str());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Status ExternalSstFileIngestionJob::GetIngestedFileInfo(
|
||||
const std::string& external_file, uint64_t new_file_number,
|
||||
IngestedFileInfo* file_to_ingest, SuperVersion* sv) {
|
||||
|
@ -1001,7 +995,8 @@ Status ExternalSstFileIngestionJob::AssignGlobalSeqnoForIngestedFile(
|
|||
return Status::OK();
|
||||
} else if (!ingestion_options_.allow_global_seqno) {
|
||||
return Status::InvalidArgument("Global seqno is required, but disabled");
|
||||
} else if (file_to_ingest->global_seqno_offset == 0) {
|
||||
} else if (ingestion_options_.write_global_seqno &&
|
||||
file_to_ingest->global_seqno_offset == 0) {
|
||||
return Status::InvalidArgument(
|
||||
"Trying to set global seqno for a file that don't have a global seqno "
|
||||
"field");
|
||||
|
@ -1073,8 +1068,8 @@ IOStatus ExternalSstFileIngestionJob::GenerateChecksumForIngestedFile(
|
|||
if (!io_s.ok()) {
|
||||
return io_s;
|
||||
}
|
||||
file_to_ingest->file_checksum = file_checksum;
|
||||
file_to_ingest->file_checksum_func_name = file_checksum_func_name;
|
||||
file_to_ingest->file_checksum = std::move(file_checksum);
|
||||
file_to_ingest->file_checksum_func_name = std::move(file_checksum_func_name);
|
||||
return IOStatus::OK();
|
||||
}
|
||||
|
||||
|
|
|
@ -43,7 +43,7 @@ struct IngestedFileInfo {
|
|||
uint64_t num_entries;
|
||||
// total number of range deletions in external file
|
||||
uint64_t num_range_deletions;
|
||||
// Id of column family this file shoule be ingested into
|
||||
// Id of column family this file should be ingested into
|
||||
uint32_t cf_id;
|
||||
// TableProperties read from external file
|
||||
TableProperties table_properties;
|
||||
|
@ -102,16 +102,7 @@ class ExternalSstFileIngestionJob {
|
|||
assert(directories != nullptr);
|
||||
}
|
||||
|
||||
~ExternalSstFileIngestionJob() {
|
||||
for (const auto& c : file_ingesting_compactions_) {
|
||||
cfd_->compaction_picker()->UnregisterCompaction(c);
|
||||
delete c;
|
||||
}
|
||||
|
||||
for (const auto& f : compaction_input_metdatas_) {
|
||||
delete f;
|
||||
}
|
||||
}
|
||||
~ExternalSstFileIngestionJob() { UnregisterRange(); }
|
||||
|
||||
// Prepare the job by copying external files into the DB.
|
||||
Status Prepare(const std::vector<std::string>& external_files_paths,
|
||||
|
@ -156,7 +147,7 @@ class ExternalSstFileIngestionJob {
|
|||
return files_to_ingest_;
|
||||
}
|
||||
|
||||
// How many sequence numbers did we consume as part of the ingest job?
|
||||
// How many sequence numbers did we consume as part of the ingestion job?
|
||||
int ConsumedSequenceNumbersCount() const { return consumed_seqno_count_; }
|
||||
|
||||
private:
|
||||
|
@ -203,6 +194,9 @@ class ExternalSstFileIngestionJob {
|
|||
// compactions.
|
||||
void CreateEquivalentFileIngestingCompactions();
|
||||
|
||||
// Remove all the internal files created, called when ingestion job fails.
|
||||
void DeleteInternalFiles();
|
||||
|
||||
SystemClock* clock_;
|
||||
FileSystemPtr fs_;
|
||||
VersionSet* versions_;
|
||||
|
|
Loading…
Reference in New Issue