mirror of https://github.com/facebook/rocksdb.git
Support to create a CF by importing multiple non-overlapping CFs (#11378)
Summary: The original Feature Request is from [https://github.com/facebook/rocksdb/issues/11317](https://github.com/facebook/rocksdb/issues/11317). Flink uses rocksdb as the state backend, all DB options are the same, and the keys of each DB instance are adjacent and there is no key overlap between two db instances. In the Flink rescaling scenario, it is necessary to quickly split the DB according to a certain key range or quickly merge multiple DBs into one. This PR is mainly used to quickly merge multiple DBs into one. We hope to extend the function of `CreateColumnFamilyWithImports` to support creating ColumnFamily by importing multiple ColumnFamily with no overlapping keys. The import logic is almost the same as `CreateColumnFamilyWithImport`, but it will check whether there is key overlap between CF when importing. The import will fail if there are key overlaps. Pull Request resolved: https://github.com/facebook/rocksdb/pull/11378 Reviewed By: ajkr Differential Revision: D46413709 Pulled By: cbi42 fbshipit-source-id: 846d0049fad11c59cf460fa846c345b26c658dfb
This commit is contained in:
parent
70bf5ef093
commit
fa878a0107
|
@ -118,12 +118,13 @@ class CompactedDBImpl : public DBImpl {
|
||||||
const IngestExternalFileOptions& /*ingestion_options*/) override {
|
const IngestExternalFileOptions& /*ingestion_options*/) override {
|
||||||
return Status::NotSupported("Not supported in compacted db mode.");
|
return Status::NotSupported("Not supported in compacted db mode.");
|
||||||
}
|
}
|
||||||
|
|
||||||
using DB::CreateColumnFamilyWithImport;
|
using DB::CreateColumnFamilyWithImport;
|
||||||
virtual Status CreateColumnFamilyWithImport(
|
virtual Status CreateColumnFamilyWithImport(
|
||||||
const ColumnFamilyOptions& /*options*/,
|
const ColumnFamilyOptions& /*options*/,
|
||||||
const std::string& /*column_family_name*/,
|
const std::string& /*column_family_name*/,
|
||||||
const ImportColumnFamilyOptions& /*import_options*/,
|
const ImportColumnFamilyOptions& /*import_options*/,
|
||||||
const ExportImportFilesMetaData& /*metadata*/,
|
const std::vector<const ExportImportFilesMetaData*>& /*metadatas*/,
|
||||||
ColumnFamilyHandle** /*handle*/) override {
|
ColumnFamilyHandle** /*handle*/) override {
|
||||||
return Status::NotSupported("Not supported in compacted db mode.");
|
return Status::NotSupported("Not supported in compacted db mode.");
|
||||||
}
|
}
|
||||||
|
|
|
@ -5575,14 +5575,24 @@ Status DBImpl::IngestExternalFiles(
|
||||||
Status DBImpl::CreateColumnFamilyWithImport(
|
Status DBImpl::CreateColumnFamilyWithImport(
|
||||||
const ColumnFamilyOptions& options, const std::string& column_family_name,
|
const ColumnFamilyOptions& options, const std::string& column_family_name,
|
||||||
const ImportColumnFamilyOptions& import_options,
|
const ImportColumnFamilyOptions& import_options,
|
||||||
const ExportImportFilesMetaData& metadata, ColumnFamilyHandle** handle) {
|
const std::vector<const ExportImportFilesMetaData*>& metadatas,
|
||||||
|
ColumnFamilyHandle** handle) {
|
||||||
assert(handle != nullptr);
|
assert(handle != nullptr);
|
||||||
assert(*handle == nullptr);
|
assert(*handle == nullptr);
|
||||||
// TODO: plumb Env::IOActivity
|
// TODO: plumb Env::IOActivity
|
||||||
const ReadOptions read_options;
|
const ReadOptions read_options;
|
||||||
std::string cf_comparator_name = options.comparator->Name();
|
std::string cf_comparator_name = options.comparator->Name();
|
||||||
if (cf_comparator_name != metadata.db_comparator_name) {
|
|
||||||
return Status::InvalidArgument("Comparator name mismatch");
|
size_t total_file_num = 0;
|
||||||
|
std::vector<std::vector<LiveFileMetaData*>> metadata_files(metadatas.size());
|
||||||
|
for (size_t i = 0; i < metadatas.size(); i++) {
|
||||||
|
if (cf_comparator_name != metadatas[i]->db_comparator_name) {
|
||||||
|
return Status::InvalidArgument("Comparator name mismatch");
|
||||||
|
}
|
||||||
|
for (auto& file : metadatas[i]->files) {
|
||||||
|
metadata_files[i].push_back((LiveFileMetaData*)&file);
|
||||||
|
}
|
||||||
|
total_file_num += metadatas[i]->files.size();
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create column family.
|
// Create column family.
|
||||||
|
@ -5596,7 +5606,7 @@ Status DBImpl::CreateColumnFamilyWithImport(
|
||||||
auto cfd = cfh->cfd();
|
auto cfd = cfh->cfd();
|
||||||
ImportColumnFamilyJob import_job(versions_.get(), cfd, immutable_db_options_,
|
ImportColumnFamilyJob import_job(versions_.get(), cfd, immutable_db_options_,
|
||||||
file_options_, import_options,
|
file_options_, import_options,
|
||||||
metadata.files, io_tracer_);
|
metadata_files, io_tracer_);
|
||||||
|
|
||||||
SuperVersionContext dummy_sv_ctx(/* create_superversion */ true);
|
SuperVersionContext dummy_sv_ctx(/* create_superversion */ true);
|
||||||
VersionEdit dummy_edit;
|
VersionEdit dummy_edit;
|
||||||
|
@ -5619,7 +5629,7 @@ Status DBImpl::CreateColumnFamilyWithImport(
|
||||||
// reuse the file number that has already assigned to the internal file,
|
// reuse the file number that has already assigned to the internal file,
|
||||||
// and this will overwrite the external file. To protect the external
|
// and this will overwrite the external file. To protect the external
|
||||||
// file, we have to make sure the file number will never being reused.
|
// file, we have to make sure the file number will never being reused.
|
||||||
next_file_number = versions_->FetchAddFileNumber(metadata.files.size());
|
next_file_number = versions_->FetchAddFileNumber(total_file_num);
|
||||||
auto cf_options = cfd->GetLatestMutableCFOptions();
|
auto cf_options = cfd->GetLatestMutableCFOptions();
|
||||||
status =
|
status =
|
||||||
versions_->LogAndApply(cfd, *cf_options, read_options, &dummy_edit,
|
versions_->LogAndApply(cfd, *cf_options, read_options, &dummy_edit,
|
||||||
|
|
|
@ -531,7 +531,7 @@ class DBImpl : public DB {
|
||||||
virtual Status CreateColumnFamilyWithImport(
|
virtual Status CreateColumnFamilyWithImport(
|
||||||
const ColumnFamilyOptions& options, const std::string& column_family_name,
|
const ColumnFamilyOptions& options, const std::string& column_family_name,
|
||||||
const ImportColumnFamilyOptions& import_options,
|
const ImportColumnFamilyOptions& import_options,
|
||||||
const ExportImportFilesMetaData& metadata,
|
const std::vector<const ExportImportFilesMetaData*>& metadatas,
|
||||||
ColumnFamilyHandle** handle) override;
|
ColumnFamilyHandle** handle) override;
|
||||||
|
|
||||||
using DB::ClipColumnFamily;
|
using DB::ClipColumnFamily;
|
||||||
|
|
|
@ -137,7 +137,7 @@ class DBImplReadOnly : public DBImpl {
|
||||||
const ColumnFamilyOptions& /*options*/,
|
const ColumnFamilyOptions& /*options*/,
|
||||||
const std::string& /*column_family_name*/,
|
const std::string& /*column_family_name*/,
|
||||||
const ImportColumnFamilyOptions& /*import_options*/,
|
const ImportColumnFamilyOptions& /*import_options*/,
|
||||||
const ExportImportFilesMetaData& /*metadata*/,
|
const std::vector<const ExportImportFilesMetaData*>& /*metadatas*/,
|
||||||
ColumnFamilyHandle** /*handle*/) override {
|
ColumnFamilyHandle** /*handle*/) override {
|
||||||
return Status::NotSupported("Not supported operation in read only mode.");
|
return Status::NotSupported("Not supported operation in read only mode.");
|
||||||
}
|
}
|
||||||
|
|
|
@ -3097,7 +3097,7 @@ class ModelDB : public DB {
|
||||||
const ColumnFamilyOptions& /*options*/,
|
const ColumnFamilyOptions& /*options*/,
|
||||||
const std::string& /*column_family_name*/,
|
const std::string& /*column_family_name*/,
|
||||||
const ImportColumnFamilyOptions& /*import_options*/,
|
const ImportColumnFamilyOptions& /*import_options*/,
|
||||||
const ExportImportFilesMetaData& /*metadata*/,
|
const std::vector<const ExportImportFilesMetaData*>& /*metadatas*/,
|
||||||
ColumnFamilyHandle** /*handle*/) override {
|
ColumnFamilyHandle** /*handle*/) override {
|
||||||
return Status::NotSupported("Not implemented.");
|
return Status::NotSupported("Not implemented.");
|
||||||
}
|
}
|
||||||
|
|
|
@ -29,79 +29,132 @@ namespace ROCKSDB_NAMESPACE {
|
||||||
Status ImportColumnFamilyJob::Prepare(uint64_t next_file_number,
|
Status ImportColumnFamilyJob::Prepare(uint64_t next_file_number,
|
||||||
SuperVersion* sv) {
|
SuperVersion* sv) {
|
||||||
Status status;
|
Status status;
|
||||||
|
std::vector<ColumnFamilyIngestFileInfo> cf_ingest_infos;
|
||||||
|
for (const auto& metadata_per_cf : metadatas_) {
|
||||||
|
// Read the information of files we are importing
|
||||||
|
ColumnFamilyIngestFileInfo cf_file_info;
|
||||||
|
InternalKey smallest, largest;
|
||||||
|
int num_files = 0;
|
||||||
|
std::vector<IngestedFileInfo> files_to_import_per_cf;
|
||||||
|
for (size_t i = 0; i < metadata_per_cf.size(); i++) {
|
||||||
|
auto file_metadata = *metadata_per_cf[i];
|
||||||
|
const auto file_path = file_metadata.db_path + "/" + file_metadata.name;
|
||||||
|
IngestedFileInfo file_to_import;
|
||||||
|
status = GetIngestedFileInfo(file_path, next_file_number++, sv,
|
||||||
|
file_metadata, &file_to_import);
|
||||||
|
if (!status.ok()) {
|
||||||
|
return status;
|
||||||
|
}
|
||||||
|
|
||||||
// Read the information of files we are importing
|
if (file_to_import.num_entries == 0) {
|
||||||
for (const auto& file_metadata : metadata_) {
|
status = Status::InvalidArgument("File contain no entries");
|
||||||
const auto file_path = file_metadata.db_path + "/" + file_metadata.name;
|
return status;
|
||||||
IngestedFileInfo file_to_import;
|
}
|
||||||
status = GetIngestedFileInfo(file_path, next_file_number++, sv,
|
|
||||||
file_metadata, &file_to_import);
|
|
||||||
if (!status.ok()) {
|
|
||||||
return status;
|
|
||||||
}
|
|
||||||
files_to_import_.push_back(file_to_import);
|
|
||||||
}
|
|
||||||
|
|
||||||
auto num_files = files_to_import_.size();
|
if (!file_to_import.smallest_internal_key.Valid() ||
|
||||||
if (num_files == 0) {
|
!file_to_import.largest_internal_key.Valid()) {
|
||||||
status = Status::InvalidArgument("The list of files is empty");
|
status = Status::Corruption("File has corrupted keys");
|
||||||
return status;
|
return status;
|
||||||
}
|
}
|
||||||
|
|
||||||
for (const auto& f : files_to_import_) {
|
files_to_import_per_cf.push_back(file_to_import);
|
||||||
if (f.num_entries == 0) {
|
num_files++;
|
||||||
status = Status::InvalidArgument("File contain no entries");
|
|
||||||
return status;
|
// Calculate the smallest and largest keys of all files in this CF
|
||||||
|
if (i == 0) {
|
||||||
|
smallest = file_to_import.smallest_internal_key;
|
||||||
|
largest = file_to_import.largest_internal_key;
|
||||||
|
} else {
|
||||||
|
if (cfd_->internal_comparator().Compare(
|
||||||
|
smallest, file_to_import.smallest_internal_key) < 0) {
|
||||||
|
smallest = file_to_import.smallest_internal_key;
|
||||||
|
}
|
||||||
|
if (cfd_->internal_comparator().Compare(
|
||||||
|
largest, file_to_import.largest_internal_key) > 0) {
|
||||||
|
largest = file_to_import.largest_internal_key;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!f.smallest_internal_key.Valid() || !f.largest_internal_key.Valid()) {
|
if (num_files == 0) {
|
||||||
status = Status::Corruption("File has corrupted keys");
|
status = Status::InvalidArgument("The list of files is empty");
|
||||||
|
return status;
|
||||||
|
}
|
||||||
|
files_to_import_.push_back(files_to_import_per_cf);
|
||||||
|
cf_file_info.smallest_internal_key = smallest;
|
||||||
|
cf_file_info.largest_internal_key = largest;
|
||||||
|
cf_ingest_infos.push_back(cf_file_info);
|
||||||
|
}
|
||||||
|
|
||||||
|
std::sort(cf_ingest_infos.begin(), cf_ingest_infos.end(),
|
||||||
|
[this](const ColumnFamilyIngestFileInfo& info1,
|
||||||
|
const ColumnFamilyIngestFileInfo& info2) {
|
||||||
|
return cfd_->user_comparator()->Compare(
|
||||||
|
info1.smallest_internal_key.user_key(),
|
||||||
|
info2.smallest_internal_key.user_key()) < 0;
|
||||||
|
});
|
||||||
|
|
||||||
|
for (size_t i = 0; i + 1 < cf_ingest_infos.size(); i++) {
|
||||||
|
if (cfd_->user_comparator()->Compare(
|
||||||
|
cf_ingest_infos[i].largest_internal_key.user_key(),
|
||||||
|
cf_ingest_infos[i + 1].smallest_internal_key.user_key()) >= 0) {
|
||||||
|
status = Status::InvalidArgument("CFs have overlapping ranges");
|
||||||
return status;
|
return status;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Copy/Move external files into DB
|
// Copy/Move external files into DB
|
||||||
auto hardlink_files = import_options_.move_files;
|
auto hardlink_files = import_options_.move_files;
|
||||||
for (auto& f : files_to_import_) {
|
|
||||||
const auto path_outside_db = f.external_file_path;
|
|
||||||
const auto path_inside_db = TableFileName(
|
|
||||||
cfd_->ioptions()->cf_paths, f.fd.GetNumber(), f.fd.GetPathId());
|
|
||||||
|
|
||||||
if (hardlink_files) {
|
for (auto& files_to_import_per_cf : files_to_import_) {
|
||||||
status =
|
for (auto& f : files_to_import_per_cf) {
|
||||||
fs_->LinkFile(path_outside_db, path_inside_db, IOOptions(), nullptr);
|
const auto path_outside_db = f.external_file_path;
|
||||||
if (status.IsNotSupported()) {
|
const auto path_inside_db = TableFileName(
|
||||||
// Original file is on a different FS, use copy instead of hard linking
|
cfd_->ioptions()->cf_paths, f.fd.GetNumber(), f.fd.GetPathId());
|
||||||
hardlink_files = false;
|
|
||||||
ROCKS_LOG_INFO(db_options_.info_log,
|
if (hardlink_files) {
|
||||||
"Try to link file %s but it's not supported : %s",
|
status = fs_->LinkFile(path_outside_db, path_inside_db, IOOptions(),
|
||||||
f.internal_file_path.c_str(), status.ToString().c_str());
|
nullptr);
|
||||||
|
if (status.IsNotSupported()) {
|
||||||
|
// Original file is on a different FS, use copy instead of hard
|
||||||
|
// linking
|
||||||
|
hardlink_files = false;
|
||||||
|
ROCKS_LOG_INFO(db_options_.info_log,
|
||||||
|
"Try to link file %s but it's not supported : %s",
|
||||||
|
f.internal_file_path.c_str(),
|
||||||
|
status.ToString().c_str());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
if (!hardlink_files) {
|
||||||
if (!hardlink_files) {
|
status =
|
||||||
status =
|
CopyFile(fs_.get(), path_outside_db, path_inside_db, 0,
|
||||||
CopyFile(fs_.get(), path_outside_db, path_inside_db, 0,
|
db_options_.use_fsync, io_tracer_, Temperature::kUnknown);
|
||||||
db_options_.use_fsync, io_tracer_, Temperature::kUnknown);
|
}
|
||||||
|
if (!status.ok()) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
f.copy_file = !hardlink_files;
|
||||||
|
f.internal_file_path = path_inside_db;
|
||||||
}
|
}
|
||||||
if (!status.ok()) {
|
if (!status.ok()) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
f.copy_file = !hardlink_files;
|
|
||||||
f.internal_file_path = path_inside_db;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!status.ok()) {
|
if (!status.ok()) {
|
||||||
// We failed, remove all files that we copied into the db
|
// We failed, remove all files that we copied into the db
|
||||||
for (const auto& f : files_to_import_) {
|
for (auto& files_to_import_per_cf : files_to_import_) {
|
||||||
if (f.internal_file_path.empty()) {
|
for (auto& f : files_to_import_per_cf) {
|
||||||
break;
|
if (f.internal_file_path.empty()) {
|
||||||
}
|
break;
|
||||||
const auto s =
|
}
|
||||||
fs_->DeleteFile(f.internal_file_path, IOOptions(), nullptr);
|
const auto s =
|
||||||
if (!s.ok()) {
|
fs_->DeleteFile(f.internal_file_path, IOOptions(), nullptr);
|
||||||
ROCKS_LOG_WARN(db_options_.info_log,
|
if (!s.ok()) {
|
||||||
"AddFile() clean up for file %s failed : %s",
|
ROCKS_LOG_WARN(db_options_.info_log,
|
||||||
f.internal_file_path.c_str(), s.ToString().c_str());
|
"AddFile() clean up for file %s failed : %s",
|
||||||
|
f.internal_file_path.c_str(), s.ToString().c_str());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -134,30 +187,35 @@ Status ImportColumnFamilyJob::Run() {
|
||||||
nullptr /* src_vstorage */, cfd_->ioptions()->force_consistency_checks,
|
nullptr /* src_vstorage */, cfd_->ioptions()->force_consistency_checks,
|
||||||
EpochNumberRequirement::kMightMissing);
|
EpochNumberRequirement::kMightMissing);
|
||||||
Status s;
|
Status s;
|
||||||
|
|
||||||
for (size_t i = 0; s.ok() && i < files_to_import_.size(); ++i) {
|
for (size_t i = 0; s.ok() && i < files_to_import_.size(); ++i) {
|
||||||
const auto& f = files_to_import_[i];
|
for (size_t j = 0; s.ok() && j < files_to_import_[i].size(); ++j) {
|
||||||
const auto& file_metadata = metadata_[i];
|
const auto& f = files_to_import_[i][j];
|
||||||
|
const auto& file_metadata = *metadatas_[i][j];
|
||||||
|
|
||||||
uint64_t tail_size = 0;
|
uint64_t tail_size = 0;
|
||||||
bool contain_no_data_blocks = f.table_properties.num_entries > 0 &&
|
bool contain_no_data_blocks = f.table_properties.num_entries > 0 &&
|
||||||
(f.table_properties.num_entries ==
|
(f.table_properties.num_entries ==
|
||||||
f.table_properties.num_range_deletions);
|
f.table_properties.num_range_deletions);
|
||||||
if (f.table_properties.tail_start_offset > 0 || contain_no_data_blocks) {
|
if (f.table_properties.tail_start_offset > 0 || contain_no_data_blocks) {
|
||||||
uint64_t file_size = f.fd.GetFileSize();
|
uint64_t file_size = f.fd.GetFileSize();
|
||||||
assert(f.table_properties.tail_start_offset <= file_size);
|
assert(f.table_properties.tail_start_offset <= file_size);
|
||||||
tail_size = file_size - f.table_properties.tail_start_offset;
|
tail_size = file_size - f.table_properties.tail_start_offset;
|
||||||
|
}
|
||||||
|
|
||||||
|
VersionEdit dummy_version_edit;
|
||||||
|
dummy_version_edit.AddFile(
|
||||||
|
file_metadata.level, f.fd.GetNumber(), f.fd.GetPathId(),
|
||||||
|
f.fd.GetFileSize(), f.smallest_internal_key, f.largest_internal_key,
|
||||||
|
file_metadata.smallest_seqno, file_metadata.largest_seqno, false,
|
||||||
|
file_metadata.temperature, kInvalidBlobFileNumber,
|
||||||
|
oldest_ancester_time, current_time, file_metadata.epoch_number,
|
||||||
|
kUnknownFileChecksum, kUnknownFileChecksumFuncName, f.unique_id, 0,
|
||||||
|
tail_size);
|
||||||
|
s = dummy_version_builder.Apply(&dummy_version_edit);
|
||||||
}
|
}
|
||||||
|
|
||||||
VersionEdit dummy_version_edit;
|
|
||||||
dummy_version_edit.AddFile(
|
|
||||||
file_metadata.level, f.fd.GetNumber(), f.fd.GetPathId(),
|
|
||||||
f.fd.GetFileSize(), f.smallest_internal_key, f.largest_internal_key,
|
|
||||||
file_metadata.smallest_seqno, file_metadata.largest_seqno, false,
|
|
||||||
file_metadata.temperature, kInvalidBlobFileNumber, oldest_ancester_time,
|
|
||||||
current_time, file_metadata.epoch_number, kUnknownFileChecksum,
|
|
||||||
kUnknownFileChecksumFuncName, f.unique_id, 0, tail_size);
|
|
||||||
s = dummy_version_builder.Apply(&dummy_version_edit);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (s.ok()) {
|
if (s.ok()) {
|
||||||
s = dummy_version_builder.SaveTo(&dummy_vstorage);
|
s = dummy_version_builder.SaveTo(&dummy_vstorage);
|
||||||
}
|
}
|
||||||
|
@ -198,26 +256,30 @@ Status ImportColumnFamilyJob::Run() {
|
||||||
void ImportColumnFamilyJob::Cleanup(const Status& status) {
|
void ImportColumnFamilyJob::Cleanup(const Status& status) {
|
||||||
if (!status.ok()) {
|
if (!status.ok()) {
|
||||||
// We failed to add files to the database remove all the files we copied.
|
// We failed to add files to the database remove all the files we copied.
|
||||||
for (const auto& f : files_to_import_) {
|
for (auto& files_to_import_per_cf : files_to_import_) {
|
||||||
const auto s =
|
for (auto& f : files_to_import_per_cf) {
|
||||||
fs_->DeleteFile(f.internal_file_path, IOOptions(), nullptr);
|
const auto s =
|
||||||
if (!s.ok()) {
|
fs_->DeleteFile(f.internal_file_path, IOOptions(), nullptr);
|
||||||
ROCKS_LOG_WARN(db_options_.info_log,
|
if (!s.ok()) {
|
||||||
"AddFile() clean up for file %s failed : %s",
|
ROCKS_LOG_WARN(db_options_.info_log,
|
||||||
f.internal_file_path.c_str(), s.ToString().c_str());
|
"AddFile() clean up for file %s failed : %s",
|
||||||
|
f.internal_file_path.c_str(), s.ToString().c_str());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else if (status.ok() && import_options_.move_files) {
|
} else if (status.ok() && import_options_.move_files) {
|
||||||
// The files were moved and added successfully, remove original file links
|
// The files were moved and added successfully, remove original file links
|
||||||
for (IngestedFileInfo& f : files_to_import_) {
|
for (auto& files_to_import_per_cf : files_to_import_) {
|
||||||
const auto s =
|
for (auto& f : files_to_import_per_cf) {
|
||||||
fs_->DeleteFile(f.external_file_path, IOOptions(), nullptr);
|
const auto s =
|
||||||
if (!s.ok()) {
|
fs_->DeleteFile(f.external_file_path, IOOptions(), nullptr);
|
||||||
ROCKS_LOG_WARN(
|
if (!s.ok()) {
|
||||||
db_options_.info_log,
|
ROCKS_LOG_WARN(
|
||||||
"%s was added to DB successfully but failed to remove original "
|
db_options_.info_log,
|
||||||
"file link : %s",
|
"%s was added to DB successfully but failed to remove original "
|
||||||
f.external_file_path.c_str(), s.ToString().c_str());
|
"file link : %s",
|
||||||
|
f.external_file_path.c_str(), s.ToString().c_str());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -25,13 +25,22 @@ class SystemClock;
|
||||||
// Imports a set of sst files as is into a new column family. Logic is similar
|
// Imports a set of sst files as is into a new column family. Logic is similar
|
||||||
// to ExternalSstFileIngestionJob.
|
// to ExternalSstFileIngestionJob.
|
||||||
class ImportColumnFamilyJob {
|
class ImportColumnFamilyJob {
|
||||||
|
// All file information of an imported CF, mainly used to
|
||||||
|
// calculate whether there is overlap between CFs
|
||||||
|
struct ColumnFamilyIngestFileInfo {
|
||||||
|
// Smallest internal key in cf
|
||||||
|
InternalKey smallest_internal_key;
|
||||||
|
// Largest internal key in cf
|
||||||
|
InternalKey largest_internal_key;
|
||||||
|
};
|
||||||
|
|
||||||
public:
|
public:
|
||||||
ImportColumnFamilyJob(VersionSet* versions, ColumnFamilyData* cfd,
|
ImportColumnFamilyJob(
|
||||||
const ImmutableDBOptions& db_options,
|
VersionSet* versions, ColumnFamilyData* cfd,
|
||||||
const EnvOptions& env_options,
|
const ImmutableDBOptions& db_options, const EnvOptions& env_options,
|
||||||
const ImportColumnFamilyOptions& import_options,
|
const ImportColumnFamilyOptions& import_options,
|
||||||
const std::vector<LiveFileMetaData>& metadata,
|
const std::vector<std::vector<LiveFileMetaData*>>& metadatas,
|
||||||
const std::shared_ptr<IOTracer>& io_tracer)
|
const std::shared_ptr<IOTracer>& io_tracer)
|
||||||
: clock_(db_options.clock),
|
: clock_(db_options.clock),
|
||||||
versions_(versions),
|
versions_(versions),
|
||||||
cfd_(cfd),
|
cfd_(cfd),
|
||||||
|
@ -39,7 +48,7 @@ class ImportColumnFamilyJob {
|
||||||
fs_(db_options_.fs, io_tracer),
|
fs_(db_options_.fs, io_tracer),
|
||||||
env_options_(env_options),
|
env_options_(env_options),
|
||||||
import_options_(import_options),
|
import_options_(import_options),
|
||||||
metadata_(metadata),
|
metadatas_(metadatas),
|
||||||
io_tracer_(io_tracer) {}
|
io_tracer_(io_tracer) {}
|
||||||
|
|
||||||
// Prepare the job by copying external files into the DB.
|
// Prepare the job by copying external files into the DB.
|
||||||
|
@ -54,7 +63,7 @@ class ImportColumnFamilyJob {
|
||||||
|
|
||||||
VersionEdit* edit() { return &edit_; }
|
VersionEdit* edit() { return &edit_; }
|
||||||
|
|
||||||
const autovector<IngestedFileInfo>& files_to_import() const {
|
const std::vector<std::vector<IngestedFileInfo>>& files_to_import() const {
|
||||||
return files_to_import_;
|
return files_to_import_;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -72,10 +81,10 @@ class ImportColumnFamilyJob {
|
||||||
const ImmutableDBOptions& db_options_;
|
const ImmutableDBOptions& db_options_;
|
||||||
const FileSystemPtr fs_;
|
const FileSystemPtr fs_;
|
||||||
const EnvOptions& env_options_;
|
const EnvOptions& env_options_;
|
||||||
autovector<IngestedFileInfo> files_to_import_;
|
std::vector<std::vector<IngestedFileInfo>> files_to_import_;
|
||||||
VersionEdit edit_;
|
VersionEdit edit_;
|
||||||
const ImportColumnFamilyOptions& import_options_;
|
const ImportColumnFamilyOptions& import_options_;
|
||||||
std::vector<LiveFileMetaData> metadata_;
|
const std::vector<std::vector<LiveFileMetaData*>> metadatas_;
|
||||||
const std::shared_ptr<IOTracer> io_tracer_;
|
const std::shared_ptr<IOTracer> io_tracer_;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
@ -22,10 +22,13 @@ class ImportColumnFamilyTest : public DBTestBase {
|
||||||
: DBTestBase("import_column_family_test", /*env_do_fsync=*/true) {
|
: DBTestBase("import_column_family_test", /*env_do_fsync=*/true) {
|
||||||
sst_files_dir_ = dbname_ + "/sst_files/";
|
sst_files_dir_ = dbname_ + "/sst_files/";
|
||||||
export_files_dir_ = test::PerThreadDBPath(env_, "export");
|
export_files_dir_ = test::PerThreadDBPath(env_, "export");
|
||||||
|
export_files_dir2_ = test::PerThreadDBPath(env_, "export2");
|
||||||
|
|
||||||
DestroyAndRecreateExternalSSTFilesDir();
|
DestroyAndRecreateExternalSSTFilesDir();
|
||||||
import_cfh_ = nullptr;
|
import_cfh_ = nullptr;
|
||||||
import_cfh2_ = nullptr;
|
import_cfh2_ = nullptr;
|
||||||
metadata_ptr_ = nullptr;
|
metadata_ptr_ = nullptr;
|
||||||
|
metadata_ptr2_ = nullptr;
|
||||||
}
|
}
|
||||||
|
|
||||||
~ImportColumnFamilyTest() {
|
~ImportColumnFamilyTest() {
|
||||||
|
@ -43,14 +46,21 @@ class ImportColumnFamilyTest : public DBTestBase {
|
||||||
delete metadata_ptr_;
|
delete metadata_ptr_;
|
||||||
metadata_ptr_ = nullptr;
|
metadata_ptr_ = nullptr;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (metadata_ptr2_) {
|
||||||
|
delete metadata_ptr2_;
|
||||||
|
metadata_ptr2_ = nullptr;
|
||||||
|
}
|
||||||
EXPECT_OK(DestroyDir(env_, sst_files_dir_));
|
EXPECT_OK(DestroyDir(env_, sst_files_dir_));
|
||||||
EXPECT_OK(DestroyDir(env_, export_files_dir_));
|
EXPECT_OK(DestroyDir(env_, export_files_dir_));
|
||||||
|
EXPECT_OK(DestroyDir(env_, export_files_dir2_));
|
||||||
}
|
}
|
||||||
|
|
||||||
void DestroyAndRecreateExternalSSTFilesDir() {
|
void DestroyAndRecreateExternalSSTFilesDir() {
|
||||||
EXPECT_OK(DestroyDir(env_, sst_files_dir_));
|
EXPECT_OK(DestroyDir(env_, sst_files_dir_));
|
||||||
EXPECT_OK(env_->CreateDir(sst_files_dir_));
|
EXPECT_OK(env_->CreateDir(sst_files_dir_));
|
||||||
EXPECT_OK(DestroyDir(env_, export_files_dir_));
|
EXPECT_OK(DestroyDir(env_, export_files_dir_));
|
||||||
|
EXPECT_OK(DestroyDir(env_, export_files_dir2_));
|
||||||
}
|
}
|
||||||
|
|
||||||
LiveFileMetaData LiveFileMetaDataInit(std::string name, std::string path,
|
LiveFileMetaData LiveFileMetaDataInit(std::string name, std::string path,
|
||||||
|
@ -69,9 +79,11 @@ class ImportColumnFamilyTest : public DBTestBase {
|
||||||
protected:
|
protected:
|
||||||
std::string sst_files_dir_;
|
std::string sst_files_dir_;
|
||||||
std::string export_files_dir_;
|
std::string export_files_dir_;
|
||||||
|
std::string export_files_dir2_;
|
||||||
ColumnFamilyHandle* import_cfh_;
|
ColumnFamilyHandle* import_cfh_;
|
||||||
ColumnFamilyHandle* import_cfh2_;
|
ColumnFamilyHandle* import_cfh2_;
|
||||||
ExportImportFilesMetaData* metadata_ptr_;
|
ExportImportFilesMetaData* metadata_ptr_;
|
||||||
|
ExportImportFilesMetaData* metadata_ptr2_;
|
||||||
};
|
};
|
||||||
|
|
||||||
TEST_F(ImportColumnFamilyTest, ImportSSTFileWriterFiles) {
|
TEST_F(ImportColumnFamilyTest, ImportSSTFileWriterFiles) {
|
||||||
|
@ -738,6 +750,137 @@ TEST_F(ImportColumnFamilyTest, ImportColumnFamilyNegativeTest) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST_F(ImportColumnFamilyTest, ImportMultiColumnFamilyTest) {
|
||||||
|
Options options = CurrentOptions();
|
||||||
|
CreateAndReopenWithCF({"koko"}, options);
|
||||||
|
|
||||||
|
for (int i = 0; i < 100; ++i) {
|
||||||
|
ASSERT_OK(Put(1, Key(i), Key(i) + "_val"));
|
||||||
|
}
|
||||||
|
ASSERT_OK(Flush(1));
|
||||||
|
|
||||||
|
ASSERT_OK(
|
||||||
|
db_->CompactRange(CompactRangeOptions(), handles_[1], nullptr, nullptr));
|
||||||
|
|
||||||
|
// Overwrite the value in the same set of keys.
|
||||||
|
for (int i = 0; i < 100; ++i) {
|
||||||
|
ASSERT_OK(Put(1, Key(i), Key(i) + "_overwrite"));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Flush again to create another L0 file. It should have higher sequencer.
|
||||||
|
ASSERT_OK(Flush(1));
|
||||||
|
|
||||||
|
Checkpoint* checkpoint1;
|
||||||
|
Checkpoint* checkpoint2;
|
||||||
|
ASSERT_OK(Checkpoint::Create(db_, &checkpoint1));
|
||||||
|
ASSERT_OK(checkpoint1->ExportColumnFamily(handles_[1], export_files_dir_,
|
||||||
|
&metadata_ptr_));
|
||||||
|
|
||||||
|
// Create a new db and import the files.
|
||||||
|
DB* db_copy;
|
||||||
|
ASSERT_OK(DestroyDir(env_, dbname_ + "/db_copy"));
|
||||||
|
ASSERT_OK(DB::Open(options, dbname_ + "/db_copy", &db_copy));
|
||||||
|
ColumnFamilyHandle* copy_cfh = nullptr;
|
||||||
|
ASSERT_OK(db_copy->CreateColumnFamily(options, "koko", ©_cfh));
|
||||||
|
WriteOptions wo;
|
||||||
|
for (int i = 100; i < 200; ++i) {
|
||||||
|
ASSERT_OK(db_copy->Put(wo, copy_cfh, Key(i), Key(i) + "_val"));
|
||||||
|
}
|
||||||
|
ASSERT_OK(db_copy->Flush(FlushOptions()));
|
||||||
|
for (int i = 100; i < 200; ++i) {
|
||||||
|
ASSERT_OK(db_copy->Put(wo, copy_cfh, Key(i), Key(i) + "_overwrite"));
|
||||||
|
}
|
||||||
|
ASSERT_OK(db_copy->Flush(FlushOptions()));
|
||||||
|
for (int i = 100; i < 200; ++i) {
|
||||||
|
ASSERT_OK(db_copy->Put(wo, copy_cfh, Key(i), Key(i) + "_overwrite2"));
|
||||||
|
}
|
||||||
|
ASSERT_OK(db_copy->Flush(FlushOptions()));
|
||||||
|
|
||||||
|
// Flush again to create another L0 file. It should have higher sequencer.
|
||||||
|
ASSERT_OK(Checkpoint::Create(db_copy, &checkpoint2));
|
||||||
|
ASSERT_OK(checkpoint2->ExportColumnFamily(copy_cfh, export_files_dir2_,
|
||||||
|
&metadata_ptr2_));
|
||||||
|
|
||||||
|
ASSERT_NE(metadata_ptr_, nullptr);
|
||||||
|
ASSERT_NE(metadata_ptr2_, nullptr);
|
||||||
|
delete checkpoint1;
|
||||||
|
delete checkpoint2;
|
||||||
|
ImportColumnFamilyOptions import_options;
|
||||||
|
import_options.move_files = false;
|
||||||
|
|
||||||
|
std::vector<const ExportImportFilesMetaData*> metadatas = {metadata_ptr_,
|
||||||
|
metadata_ptr2_};
|
||||||
|
ASSERT_OK(db_->CreateColumnFamilyWithImport(options, "toto", import_options,
|
||||||
|
metadatas, &import_cfh_));
|
||||||
|
|
||||||
|
std::string value1, value2;
|
||||||
|
for (int i = 0; i < 100; ++i) {
|
||||||
|
ASSERT_OK(db_->Get(ReadOptions(), import_cfh_, Key(i), &value1));
|
||||||
|
ASSERT_EQ(Get(1, Key(i)), value1);
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int i = 100; i < 200; ++i) {
|
||||||
|
ASSERT_OK(db_->Get(ReadOptions(), import_cfh_, Key(i), &value1));
|
||||||
|
ASSERT_OK(db_copy->Get(ReadOptions(), copy_cfh, Key(i), &value2));
|
||||||
|
ASSERT_EQ(value1, value2);
|
||||||
|
}
|
||||||
|
|
||||||
|
ASSERT_OK(db_copy->DropColumnFamily(copy_cfh));
|
||||||
|
ASSERT_OK(db_copy->DestroyColumnFamilyHandle(copy_cfh));
|
||||||
|
delete db_copy;
|
||||||
|
ASSERT_OK(DestroyDir(env_, dbname_ + "/db_copy"));
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_F(ImportColumnFamilyTest, ImportMultiColumnFamilyWithOverlap) {
|
||||||
|
Options options = CurrentOptions();
|
||||||
|
CreateAndReopenWithCF({"koko"}, options);
|
||||||
|
|
||||||
|
for (int i = 0; i < 100; ++i) {
|
||||||
|
ASSERT_OK(Put(1, Key(i), Key(i) + "_val"));
|
||||||
|
}
|
||||||
|
|
||||||
|
Checkpoint* checkpoint1;
|
||||||
|
Checkpoint* checkpoint2;
|
||||||
|
ASSERT_OK(Checkpoint::Create(db_, &checkpoint1));
|
||||||
|
ASSERT_OK(checkpoint1->ExportColumnFamily(handles_[1], export_files_dir_,
|
||||||
|
&metadata_ptr_));
|
||||||
|
|
||||||
|
// Create a new db and import the files.
|
||||||
|
DB* db_copy;
|
||||||
|
ASSERT_OK(DestroyDir(env_, dbname_ + "/db_copy"));
|
||||||
|
ASSERT_OK(DB::Open(options, dbname_ + "/db_copy", &db_copy));
|
||||||
|
ColumnFamilyHandle* copy_cfh = nullptr;
|
||||||
|
ASSERT_OK(db_copy->CreateColumnFamily(options, "koko", ©_cfh));
|
||||||
|
WriteOptions wo;
|
||||||
|
for (int i = 50; i < 150; ++i) {
|
||||||
|
ASSERT_OK(db_copy->Put(wo, copy_cfh, Key(i), Key(i) + "_val"));
|
||||||
|
}
|
||||||
|
ASSERT_OK(db_copy->Flush(FlushOptions()));
|
||||||
|
|
||||||
|
// Flush again to create another L0 file. It should have higher sequencer.
|
||||||
|
ASSERT_OK(Checkpoint::Create(db_copy, &checkpoint2));
|
||||||
|
ASSERT_OK(checkpoint2->ExportColumnFamily(copy_cfh, export_files_dir2_,
|
||||||
|
&metadata_ptr2_));
|
||||||
|
|
||||||
|
ASSERT_NE(metadata_ptr_, nullptr);
|
||||||
|
ASSERT_NE(metadata_ptr2_, nullptr);
|
||||||
|
delete checkpoint1;
|
||||||
|
delete checkpoint2;
|
||||||
|
ImportColumnFamilyOptions import_options;
|
||||||
|
import_options.move_files = false;
|
||||||
|
|
||||||
|
std::vector<const ExportImportFilesMetaData*> metadatas = {metadata_ptr_,
|
||||||
|
metadata_ptr2_};
|
||||||
|
|
||||||
|
ASSERT_EQ(db_->CreateColumnFamilyWithImport(options, "toto", import_options,
|
||||||
|
metadatas, &import_cfh_),
|
||||||
|
Status::InvalidArgument("CFs have overlapping ranges"));
|
||||||
|
|
||||||
|
ASSERT_OK(db_copy->DropColumnFamily(copy_cfh));
|
||||||
|
ASSERT_OK(db_copy->DestroyColumnFamilyHandle(copy_cfh));
|
||||||
|
delete db_copy;
|
||||||
|
ASSERT_OK(DestroyDir(env_, dbname_ + "/db_copy"));
|
||||||
|
}
|
||||||
} // namespace ROCKSDB_NAMESPACE
|
} // namespace ROCKSDB_NAMESPACE
|
||||||
|
|
||||||
int main(int argc, char** argv) {
|
int main(int argc, char** argv) {
|
||||||
|
|
|
@ -1782,14 +1782,27 @@ class DB {
|
||||||
virtual Status CreateColumnFamilyWithImport(
|
virtual Status CreateColumnFamilyWithImport(
|
||||||
const ColumnFamilyOptions& options, const std::string& column_family_name,
|
const ColumnFamilyOptions& options, const std::string& column_family_name,
|
||||||
const ImportColumnFamilyOptions& import_options,
|
const ImportColumnFamilyOptions& import_options,
|
||||||
const ExportImportFilesMetaData& metadata,
|
const ExportImportFilesMetaData& metadata, ColumnFamilyHandle** handle) {
|
||||||
|
const std::vector<const ExportImportFilesMetaData*>& metadatas{&metadata};
|
||||||
|
return CreateColumnFamilyWithImport(options, column_family_name,
|
||||||
|
import_options, metadatas, handle);
|
||||||
|
}
|
||||||
|
|
||||||
|
// EXPERIMENTAL
|
||||||
|
// Overload of the CreateColumnFamilyWithImport() that allows the caller to
|
||||||
|
// pass a list of ExportImportFilesMetaData pointers to support creating
|
||||||
|
// ColumnFamily by importing multiple ColumnFamilies.
|
||||||
|
// It should be noticed that if the user keys of the imported column families
|
||||||
|
// overlap with each other, an error will be returned.
|
||||||
|
virtual Status CreateColumnFamilyWithImport(
|
||||||
|
const ColumnFamilyOptions& options, const std::string& column_family_name,
|
||||||
|
const ImportColumnFamilyOptions& import_options,
|
||||||
|
const std::vector<const ExportImportFilesMetaData*>& metadatas,
|
||||||
ColumnFamilyHandle** handle) = 0;
|
ColumnFamilyHandle** handle) = 0;
|
||||||
|
|
||||||
// EXPERIMENTAL
|
// EXPERIMENTAL
|
||||||
// ClipColumnFamily() will clip the entries in the CF according to the range
|
// ClipColumnFamily() will clip the entries in the CF according to the range
|
||||||
// [begin_key,
|
// [begin_key, end_key). Returns OK on success, and a non-OK status on error.
|
||||||
// end_key).
|
|
||||||
// Returns OK on success, and a non-OK status on error.
|
|
||||||
// Any entries outside this range will be completely deleted (including
|
// Any entries outside this range will be completely deleted (including
|
||||||
// tombstones).
|
// tombstones).
|
||||||
// The main difference between ClipColumnFamily(begin, end) and
|
// The main difference between ClipColumnFamily(begin, end) and
|
||||||
|
@ -1797,8 +1810,7 @@ class DB {
|
||||||
// is that the former physically deletes all keys outside the range, but is
|
// is that the former physically deletes all keys outside the range, but is
|
||||||
// more heavyweight than the latter.
|
// more heavyweight than the latter.
|
||||||
// This feature is mainly used to ensure that there is no overlapping Key when
|
// This feature is mainly used to ensure that there is no overlapping Key when
|
||||||
// calling
|
// calling CreateColumnFamilyWithImport() to import multiple CFs.
|
||||||
// CreateColumnFamilyWithImports() to import multiple CFs.
|
|
||||||
// Note that: concurrent updates cannot be performed during Clip.
|
// Note that: concurrent updates cannot be performed during Clip.
|
||||||
virtual Status ClipColumnFamily(ColumnFamilyHandle* column_family,
|
virtual Status ClipColumnFamily(ColumnFamilyHandle* column_family,
|
||||||
const Slice& begin_key,
|
const Slice& begin_key,
|
||||||
|
|
|
@ -172,10 +172,10 @@ class StackableDB : public DB {
|
||||||
virtual Status CreateColumnFamilyWithImport(
|
virtual Status CreateColumnFamilyWithImport(
|
||||||
const ColumnFamilyOptions& options, const std::string& column_family_name,
|
const ColumnFamilyOptions& options, const std::string& column_family_name,
|
||||||
const ImportColumnFamilyOptions& import_options,
|
const ImportColumnFamilyOptions& import_options,
|
||||||
const ExportImportFilesMetaData& metadata,
|
const std::vector<const ExportImportFilesMetaData*>& metadatas,
|
||||||
ColumnFamilyHandle** handle) override {
|
ColumnFamilyHandle** handle) override {
|
||||||
return db_->CreateColumnFamilyWithImport(options, column_family_name,
|
return db_->CreateColumnFamilyWithImport(options, column_family_name,
|
||||||
import_options, metadata, handle);
|
import_options, metadatas, handle);
|
||||||
}
|
}
|
||||||
|
|
||||||
using DB::ClipColumnFamily;
|
using DB::ClipColumnFamily;
|
||||||
|
|
|
@ -0,0 +1 @@
|
||||||
|
Overload the API CreateColumnFamilyWithImport() to support creating ColumnFamily by importing multiple ColumnFamilies It requires that CFs should not overlap in user key range.
|
Loading…
Reference in New Issue