mirror of https://github.com/facebook/rocksdb.git
Support ingesting SST files generated by a live DB (#12750)
Summary: ... to enable use cases like using RocksDB to merge sort data for ingestion. A new file ingestion option `IngestExternalFileOptions::allow_db_generated_files` is introduced to allows users to ingest SST files generated by live DBs instead of SstFileWriter. For now this only works if the SST files being ingested have zero as their largest sequence number AND do not overlap with any data in the DB (so we can assign seqno 0 which matches the seqno of all ingested keys). The feature is marked the option as experimental for now. Main changes needed to enable this: - ignore CF id mismatch during ingestion - ignore the missing external file version table property Rest of the change is mostly in new unit tests. A previous attempt is in https://github.com/facebook/rocksdb/issues/5602. Pull Request resolved: https://github.com/facebook/rocksdb/pull/12750 Test Plan: - new unit tests Reviewed By: ajkr, jowlyzhang Differential Revision: D58396673 Pulled By: cbi42 fbshipit-source-id: aae513afad7b1ff5d4faa48104df5f384926bf03
This commit is contained in:
parent
0fca5e31b4
commit
4384dd5eee
|
@ -5814,6 +5814,18 @@ Status DBImpl::IngestExternalFiles(
|
||||||
"timestamps enabled doesn't support ingest behind.");
|
"timestamps enabled doesn't support ingest behind.");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if (ingest_opts.allow_db_generated_files) {
|
||||||
|
if (ingest_opts.write_global_seqno) {
|
||||||
|
return Status::NotSupported(
|
||||||
|
"write_global_seqno is deprecated and does not work with "
|
||||||
|
"allow_db_generated_files.");
|
||||||
|
}
|
||||||
|
if (ingest_opts.move_files) {
|
||||||
|
return Status::NotSupported(
|
||||||
|
"Options move_files and allow_db_generated_files are not "
|
||||||
|
"compatible.");
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO (yanqin) maybe handle the case in which column_families have
|
// TODO (yanqin) maybe handle the case in which column_families have
|
||||||
|
|
|
@ -565,6 +565,11 @@ Options DBTestBase::GetOptions(
|
||||||
options.unordered_write = false;
|
options.unordered_write = false;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
case kBlockBasedTableWithBinarySearchWithFirstKeyIndex: {
|
||||||
|
table_options.index_type =
|
||||||
|
BlockBasedTableOptions::kBinarySearchWithFirstKey;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
default:
|
default:
|
||||||
break;
|
break;
|
||||||
|
|
|
@ -1041,6 +1041,7 @@ class DBTestBase : public testing::Test {
|
||||||
kPartitionedFilterWithNewTableReaderForCompactions,
|
kPartitionedFilterWithNewTableReaderForCompactions,
|
||||||
kUniversalSubcompactions,
|
kUniversalSubcompactions,
|
||||||
kUnorderedWrite,
|
kUnorderedWrite,
|
||||||
|
kBlockBasedTableWithBinarySearchWithFirstKeyIndex,
|
||||||
// This must be the last line
|
// This must be the last line
|
||||||
kEnd,
|
kEnd,
|
||||||
};
|
};
|
||||||
|
|
|
@ -44,9 +44,12 @@ Status ExternalSstFileIngestionJob::Prepare(
|
||||||
return status;
|
return status;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Files generated in another DB or CF may have a different column family
|
||||||
|
// ID, so we let it pass here.
|
||||||
if (file_to_ingest.cf_id !=
|
if (file_to_ingest.cf_id !=
|
||||||
TablePropertiesCollectorFactory::Context::kUnknownColumnFamily &&
|
TablePropertiesCollectorFactory::Context::kUnknownColumnFamily &&
|
||||||
file_to_ingest.cf_id != cfd_->GetID()) {
|
file_to_ingest.cf_id != cfd_->GetID() &&
|
||||||
|
!ingestion_options_.allow_db_generated_files) {
|
||||||
return Status::InvalidArgument(
|
return Status::InvalidArgument(
|
||||||
"External file column family id don't match");
|
"External file column family id don't match");
|
||||||
}
|
}
|
||||||
|
@ -111,6 +114,7 @@ Status ExternalSstFileIngestionJob::Prepare(
|
||||||
const std::string path_inside_db = TableFileName(
|
const std::string path_inside_db = TableFileName(
|
||||||
cfd_->ioptions()->cf_paths, f.fd.GetNumber(), f.fd.GetPathId());
|
cfd_->ioptions()->cf_paths, f.fd.GetNumber(), f.fd.GetPathId());
|
||||||
if (ingestion_options_.move_files) {
|
if (ingestion_options_.move_files) {
|
||||||
|
assert(!ingestion_options_.allow_db_generated_files);
|
||||||
status =
|
status =
|
||||||
fs_->LinkFile(path_outside_db, path_inside_db, IOOptions(), nullptr);
|
fs_->LinkFile(path_outside_db, path_inside_db, IOOptions(), nullptr);
|
||||||
if (status.ok()) {
|
if (status.ok()) {
|
||||||
|
@ -704,9 +708,16 @@ Status ExternalSstFileIngestionJob::SanityCheckTableProperties(
|
||||||
// Get table version
|
// Get table version
|
||||||
auto version_iter = uprops.find(ExternalSstFilePropertyNames::kVersion);
|
auto version_iter = uprops.find(ExternalSstFilePropertyNames::kVersion);
|
||||||
if (version_iter == uprops.end()) {
|
if (version_iter == uprops.end()) {
|
||||||
|
if (!ingestion_options_.allow_db_generated_files) {
|
||||||
return Status::Corruption("External file version not found");
|
return Status::Corruption("External file version not found");
|
||||||
|
} else {
|
||||||
|
// 0 is special version for when a file from live DB does not have the
|
||||||
|
// version table property
|
||||||
|
file_to_ingest->version = 0;
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
file_to_ingest->version = DecodeFixed32(version_iter->second.c_str());
|
file_to_ingest->version = DecodeFixed32(version_iter->second.c_str());
|
||||||
|
}
|
||||||
|
|
||||||
auto seqno_iter = uprops.find(ExternalSstFilePropertyNames::kGlobalSeqno);
|
auto seqno_iter = uprops.find(ExternalSstFilePropertyNames::kGlobalSeqno);
|
||||||
if (file_to_ingest->version == 2) {
|
if (file_to_ingest->version == 2) {
|
||||||
|
@ -733,8 +744,15 @@ Status ExternalSstFileIngestionJob::SanityCheckTableProperties(
|
||||||
return Status::InvalidArgument(
|
return Status::InvalidArgument(
|
||||||
"External SST file V1 does not support global seqno");
|
"External SST file V1 does not support global seqno");
|
||||||
}
|
}
|
||||||
|
} else if (file_to_ingest->version == 0) {
|
||||||
|
// allow_db_generated_files is true
|
||||||
|
assert(seqno_iter == uprops.end());
|
||||||
|
file_to_ingest->original_seqno = 0;
|
||||||
|
file_to_ingest->global_seqno_offset = 0;
|
||||||
} else {
|
} else {
|
||||||
return Status::InvalidArgument("External file version is not supported");
|
return Status::InvalidArgument("External file version " +
|
||||||
|
std::to_string(file_to_ingest->version) +
|
||||||
|
" is not supported");
|
||||||
}
|
}
|
||||||
|
|
||||||
file_to_ingest->cf_id = static_cast<uint32_t>(props->column_family_id);
|
file_to_ingest->cf_id = static_cast<uint32_t>(props->column_family_id);
|
||||||
|
@ -896,6 +914,25 @@ Status ExternalSstFileIngestionJob::GetIngestedFileInfo(
|
||||||
} else if (!iter->status().ok()) {
|
} else if (!iter->status().ok()) {
|
||||||
return iter->status();
|
return iter->status();
|
||||||
}
|
}
|
||||||
|
if (ingestion_options_.allow_db_generated_files) {
|
||||||
|
// Verify that all keys have seqno zero.
|
||||||
|
// TODO: store largest seqno in table property and validate it instead.
|
||||||
|
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
|
||||||
|
Status pik_status =
|
||||||
|
ParseInternalKey(iter->key(), &key, allow_data_in_errors);
|
||||||
|
if (!pik_status.ok()) {
|
||||||
|
return Status::Corruption("Corrupted key in external file. ",
|
||||||
|
pik_status.getState());
|
||||||
|
}
|
||||||
|
if (key.sequence != 0) {
|
||||||
|
return Status::NotSupported(
|
||||||
|
"External file has a key with non zero sequence number.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (!iter->status().ok()) {
|
||||||
|
return iter->status();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
std::unique_ptr<InternalIterator> range_del_iter(
|
std::unique_ptr<InternalIterator> range_del_iter(
|
||||||
table_reader->NewRangeTombstoneIterator(ro));
|
table_reader->NewRangeTombstoneIterator(ro));
|
||||||
|
@ -911,6 +948,11 @@ Status ExternalSstFileIngestionJob::GetIngestedFileInfo(
|
||||||
return Status::Corruption("Corrupted key in external file. ",
|
return Status::Corruption("Corrupted key in external file. ",
|
||||||
pik_status.getState());
|
pik_status.getState());
|
||||||
}
|
}
|
||||||
|
if (key.sequence != 0) {
|
||||||
|
return Status::Corruption(
|
||||||
|
"External file has a range deletion with non zero sequence "
|
||||||
|
"number.");
|
||||||
|
}
|
||||||
RangeTombstone tombstone(key, range_del_iter->value());
|
RangeTombstone tombstone(key, range_del_iter->value());
|
||||||
|
|
||||||
InternalKey start_key = tombstone.SerializeKey();
|
InternalKey start_key = tombstone.SerializeKey();
|
||||||
|
@ -1045,11 +1087,18 @@ Status ExternalSstFileIngestionJob::AssignLevelAndSeqnoForIngestedFile(
|
||||||
"Column family enables user-defined timestamps, please make sure the "
|
"Column family enables user-defined timestamps, please make sure the "
|
||||||
"key range (without timestamp) of external file does not overlap "
|
"key range (without timestamp) of external file does not overlap "
|
||||||
"with key range (without timestamp) in the db");
|
"with key range (without timestamp) in the db");
|
||||||
|
return status;
|
||||||
}
|
}
|
||||||
if (*assigned_seqno == 0) {
|
if (*assigned_seqno == 0) {
|
||||||
*assigned_seqno = last_seqno + 1;
|
*assigned_seqno = last_seqno + 1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (ingestion_options_.allow_db_generated_files && *assigned_seqno != 0) {
|
||||||
|
return Status::InvalidArgument(
|
||||||
|
"An ingested file is assigned to a non-zero sequence number, which is "
|
||||||
|
"incompatible with ingestion option allow_db_generated_files.");
|
||||||
|
}
|
||||||
return status;
|
return status;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -3719,10 +3719,7 @@ TEST_F(ExternalSSTFileWithTimestampTest, TimestampsNotPersistedBasic) {
|
||||||
}
|
}
|
||||||
|
|
||||||
INSTANTIATE_TEST_CASE_P(ExternalSSTFileTest, ExternalSSTFileTest,
|
INSTANTIATE_TEST_CASE_P(ExternalSSTFileTest, ExternalSSTFileTest,
|
||||||
testing::Values(std::make_tuple(false, false),
|
testing::Combine(testing::Bool(), testing::Bool()));
|
||||||
std::make_tuple(false, true),
|
|
||||||
std::make_tuple(true, false),
|
|
||||||
std::make_tuple(true, true)));
|
|
||||||
|
|
||||||
INSTANTIATE_TEST_CASE_P(ExternSSTFileLinkFailFallbackTest,
|
INSTANTIATE_TEST_CASE_P(ExternSSTFileLinkFailFallbackTest,
|
||||||
ExternSSTFileLinkFailFallbackTest,
|
ExternSSTFileLinkFailFallbackTest,
|
||||||
|
@ -3730,6 +3727,339 @@ INSTANTIATE_TEST_CASE_P(ExternSSTFileLinkFailFallbackTest,
|
||||||
std::make_tuple(true, true),
|
std::make_tuple(true, true),
|
||||||
std::make_tuple(false, false)));
|
std::make_tuple(false, false)));
|
||||||
|
|
||||||
|
class IngestDBGeneratedFileTest : public ExternalSSTFileTestBase,
|
||||||
|
public ::testing::WithParamInterface<bool> {
|
||||||
|
public:
|
||||||
|
IngestDBGeneratedFileTest() {
|
||||||
|
ingest_opts.allow_db_generated_files = true;
|
||||||
|
ingest_opts.move_files = false;
|
||||||
|
ingest_opts.verify_checksums_before_ingest = GetParam();
|
||||||
|
ingest_opts.snapshot_consistency = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected:
|
||||||
|
IngestExternalFileOptions ingest_opts;
|
||||||
|
};
|
||||||
|
|
||||||
|
INSTANTIATE_TEST_CASE_P(BasicMultiConfig, IngestDBGeneratedFileTest,
|
||||||
|
testing::Bool());
|
||||||
|
|
||||||
|
TEST_P(IngestDBGeneratedFileTest, FailureCase) {
|
||||||
|
// Ingesting overlapping data should always fail.
|
||||||
|
do {
|
||||||
|
SCOPED_TRACE("option_config_ = " + std::to_string(option_config_));
|
||||||
|
|
||||||
|
Options options = CurrentOptions();
|
||||||
|
CreateAndReopenWithCF({"toto"}, options);
|
||||||
|
// Fill CFs with overlapping keys. Will try to ingest CF1 into default CF.
|
||||||
|
for (int k = 0; k < 50; ++k) {
|
||||||
|
ASSERT_OK(Put(Key(k), "default_cf_" + Key(k)));
|
||||||
|
}
|
||||||
|
for (int k = 49; k < 100; ++k) {
|
||||||
|
ASSERT_OK(Put(1, Key(k), "cf1_" + Key(k)));
|
||||||
|
}
|
||||||
|
ASSERT_OK(Flush(/*cf=*/1));
|
||||||
|
{
|
||||||
|
// Verify that largest key of the file has non-zero seqno.
|
||||||
|
std::vector<std::vector<FileMetaData>> metadata;
|
||||||
|
dbfull()->TEST_GetFilesMetaData(handles_[1], &metadata, nullptr);
|
||||||
|
const FileMetaData& file = metadata[0][0];
|
||||||
|
ValueType vtype;
|
||||||
|
SequenceNumber seq;
|
||||||
|
UnPackSequenceAndType(ExtractInternalKeyFooter(file.largest.Encode()),
|
||||||
|
&seq, &vtype);
|
||||||
|
ASSERT_GE(seq, 0);
|
||||||
|
}
|
||||||
|
std::vector<LiveFileMetaData> live_meta;
|
||||||
|
db_->GetLiveFilesMetaData(&live_meta);
|
||||||
|
ASSERT_EQ(live_meta.size(), 1);
|
||||||
|
std::vector<std::string> to_ingest_files;
|
||||||
|
to_ingest_files.emplace_back(live_meta[0].directory + "/" +
|
||||||
|
live_meta[0].relative_filename);
|
||||||
|
// Ingesting a file whose boundary key has non-zero seqno.
|
||||||
|
Status s = db_->IngestExternalFile(to_ingest_files, ingest_opts);
|
||||||
|
ASSERT_TRUE(
|
||||||
|
s.ToString().find("External file has non zero sequence number") !=
|
||||||
|
std::string::npos);
|
||||||
|
ASSERT_NOK(s);
|
||||||
|
|
||||||
|
{
|
||||||
|
// Only non-boundary key with non-zero seqno.
|
||||||
|
const Snapshot* snapshot = db_->GetSnapshot();
|
||||||
|
ASSERT_OK(Put(1, Key(70), "cf1_" + Key(70)));
|
||||||
|
ASSERT_OK(Flush(1));
|
||||||
|
CompactRangeOptions cro;
|
||||||
|
cro.bottommost_level_compaction =
|
||||||
|
BottommostLevelCompaction::kForceOptimized;
|
||||||
|
ASSERT_OK(db_->CompactRange(cro, handles_[1], nullptr, nullptr));
|
||||||
|
|
||||||
|
// Verify that only the non-boundary key of the file has non-zero seqno.
|
||||||
|
std::vector<std::vector<FileMetaData>> metadata;
|
||||||
|
// File may be at different level for different options.
|
||||||
|
dbfull()->TEST_GetFilesMetaData(handles_[1], &metadata, nullptr);
|
||||||
|
bool found_file = false;
|
||||||
|
for (const auto& level : metadata) {
|
||||||
|
if (level.empty()) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
ASSERT_FALSE(found_file);
|
||||||
|
found_file = true;
|
||||||
|
ASSERT_EQ(1, level.size());
|
||||||
|
const FileMetaData& file = level[0];
|
||||||
|
ValueType vtype;
|
||||||
|
SequenceNumber seq;
|
||||||
|
UnPackSequenceAndType(ExtractInternalKeyFooter(file.largest.Encode()),
|
||||||
|
&seq, &vtype);
|
||||||
|
ASSERT_EQ(seq, 0);
|
||||||
|
UnPackSequenceAndType(ExtractInternalKeyFooter(file.smallest.Encode()),
|
||||||
|
&seq, &vtype);
|
||||||
|
ASSERT_EQ(seq, 0);
|
||||||
|
ASSERT_GT(file.fd.largest_seqno, 0);
|
||||||
|
}
|
||||||
|
ASSERT_TRUE(found_file);
|
||||||
|
live_meta.clear();
|
||||||
|
db_->GetLiveFilesMetaData(&live_meta);
|
||||||
|
ASSERT_EQ(live_meta.size(), 1);
|
||||||
|
to_ingest_files[0] =
|
||||||
|
live_meta[0].directory + "/" + live_meta[0].relative_filename;
|
||||||
|
s = db_->IngestExternalFile(to_ingest_files, ingest_opts);
|
||||||
|
ASSERT_NOK(s);
|
||||||
|
ASSERT_TRUE(
|
||||||
|
s.ToString().find(
|
||||||
|
"External file has a key with non zero sequence number") !=
|
||||||
|
std::string::npos);
|
||||||
|
db_->ReleaseSnapshot(snapshot);
|
||||||
|
}
|
||||||
|
|
||||||
|
CompactRangeOptions cro;
|
||||||
|
cro.bottommost_level_compaction =
|
||||||
|
BottommostLevelCompaction::kForceOptimized;
|
||||||
|
ASSERT_OK(db_->CompactRange(cro, handles_[1], nullptr, nullptr));
|
||||||
|
live_meta.clear();
|
||||||
|
db_->GetLiveFilesMetaData(&live_meta);
|
||||||
|
ASSERT_EQ(live_meta.size(), 1);
|
||||||
|
ASSERT_EQ(0, live_meta[0].largest_seqno);
|
||||||
|
to_ingest_files[0] =
|
||||||
|
live_meta[0].directory + "/" + live_meta[0].relative_filename;
|
||||||
|
|
||||||
|
ingest_opts.allow_db_generated_files = false;
|
||||||
|
// Ingesting a DB genrate file with allow_db_generated_files = false;
|
||||||
|
s = db_->IngestExternalFile(to_ingest_files, ingest_opts);
|
||||||
|
ASSERT_TRUE(s.ToString().find("External file version not found") !=
|
||||||
|
std::string::npos);
|
||||||
|
ASSERT_NOK(s);
|
||||||
|
|
||||||
|
const std::string err =
|
||||||
|
"An ingested file is assigned to a non-zero sequence number, which is "
|
||||||
|
"incompatible with ingestion option allow_db_generated_files";
|
||||||
|
ingest_opts.allow_db_generated_files = true;
|
||||||
|
s = db_->IngestExternalFile(to_ingest_files, ingest_opts);
|
||||||
|
ASSERT_TRUE(s.ToString().find(err) != std::string::npos);
|
||||||
|
ASSERT_NOK(s);
|
||||||
|
if (options.compaction_style != kCompactionStyleUniversal) {
|
||||||
|
// FIXME: after fixing ingestion with universal compaction, currently
|
||||||
|
// will always ingest into L0.
|
||||||
|
ingest_opts.fail_if_not_bottommost_level = true;
|
||||||
|
s = db_->IngestExternalFile(to_ingest_files, ingest_opts);
|
||||||
|
ASSERT_NOK(s);
|
||||||
|
ASSERT_TRUE(s.ToString().find("Files cannot be ingested to Lmax") !=
|
||||||
|
std::string::npos);
|
||||||
|
ingest_opts.fail_if_not_bottommost_level = false;
|
||||||
|
}
|
||||||
|
ingest_opts.write_global_seqno = true;
|
||||||
|
s = db_->IngestExternalFile(to_ingest_files, ingest_opts);
|
||||||
|
ASSERT_TRUE(s.ToString().find("write_global_seqno is deprecated and does "
|
||||||
|
"not work with allow_db_generated_files") !=
|
||||||
|
std::string::npos);
|
||||||
|
ASSERT_NOK(s);
|
||||||
|
ingest_opts.write_global_seqno = false;
|
||||||
|
|
||||||
|
// Delete the overlapping key.
|
||||||
|
ASSERT_OK(db_->Delete(WriteOptions(), handles_[1], Key(49)));
|
||||||
|
ASSERT_OK(db_->CompactRange(cro, handles_[1], nullptr, nullptr));
|
||||||
|
live_meta.clear();
|
||||||
|
db_->GetLiveFilesMetaData(&live_meta);
|
||||||
|
bool cf1_file_found = false;
|
||||||
|
for (const auto& f : live_meta) {
|
||||||
|
if (f.column_family_name == "toto") {
|
||||||
|
ASSERT_FALSE(cf1_file_found);
|
||||||
|
cf1_file_found = true;
|
||||||
|
ASSERT_EQ(0, f.largest_seqno);
|
||||||
|
to_ingest_files[0] = f.directory + "/" + f.relative_filename;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
ASSERT_TRUE(cf1_file_found);
|
||||||
|
|
||||||
|
const Snapshot* snapshot = db_->GetSnapshot();
|
||||||
|
ingest_opts.snapshot_consistency = true;
|
||||||
|
s = db_->IngestExternalFile(to_ingest_files, ingest_opts);
|
||||||
|
// snapshot_consistency with snapshot will assign a newest sequence number.
|
||||||
|
ASSERT_TRUE(s.ToString().find(err) != std::string::npos);
|
||||||
|
ASSERT_NOK(s);
|
||||||
|
|
||||||
|
ingest_opts.move_files = true;
|
||||||
|
s = db_->IngestExternalFile(to_ingest_files, ingest_opts);
|
||||||
|
ingest_opts.move_files = false;
|
||||||
|
ASSERT_TRUE(
|
||||||
|
s.ToString().find("Options move_files and allow_db_generated_files are "
|
||||||
|
"not compatible") != std::string::npos);
|
||||||
|
ASSERT_NOK(s);
|
||||||
|
|
||||||
|
ingest_opts.snapshot_consistency = false;
|
||||||
|
ASSERT_OK(db_->IngestExternalFile(to_ingest_files, ingest_opts));
|
||||||
|
db_->ReleaseSnapshot(snapshot);
|
||||||
|
|
||||||
|
// Verify default CF content.
|
||||||
|
std::string val;
|
||||||
|
for (int k = 0; k < 100; ++k) {
|
||||||
|
ASSERT_OK(db_->Get(ReadOptions(), Key(k), &val));
|
||||||
|
if (k < 50) {
|
||||||
|
ASSERT_EQ(val, "default_cf_" + Key(k));
|
||||||
|
} else {
|
||||||
|
ASSERT_EQ(val, "cf1_" + Key(k));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} while (ChangeOptions(kSkipPlainTable | kSkipFIFOCompaction));
|
||||||
|
}
|
||||||
|
|
||||||
|
class IngestDBGeneratedFileTest2
|
||||||
|
: public ExternalSSTFileTestBase,
|
||||||
|
public ::testing::WithParamInterface<std::tuple<bool, bool, bool, bool>> {
|
||||||
|
public:
|
||||||
|
IngestDBGeneratedFileTest2() = default;
|
||||||
|
};
|
||||||
|
|
||||||
|
INSTANTIATE_TEST_CASE_P(VaryingOptions, IngestDBGeneratedFileTest2,
|
||||||
|
testing::Combine(testing::Bool(), testing::Bool(),
|
||||||
|
testing::Bool(), testing::Bool()));
|
||||||
|
|
||||||
|
TEST_P(IngestDBGeneratedFileTest2, NotOverlapWithDB) {
|
||||||
|
// Use a separate column family to sort some data, generate multiple SST
|
||||||
|
// files. Then ingest these files into another column family or DB. The data
|
||||||
|
// to be ingested does not overlap with existing data.
|
||||||
|
IngestExternalFileOptions ingest_opts;
|
||||||
|
ingest_opts.allow_db_generated_files = true;
|
||||||
|
ingest_opts.move_files = false;
|
||||||
|
ingest_opts.snapshot_consistency = std::get<0>(GetParam());
|
||||||
|
ingest_opts.allow_global_seqno = std::get<1>(GetParam());
|
||||||
|
ingest_opts.allow_blocking_flush = std::get<2>(GetParam());
|
||||||
|
ingest_opts.fail_if_not_bottommost_level = std::get<3>(GetParam());
|
||||||
|
|
||||||
|
do {
|
||||||
|
SCOPED_TRACE("option_config_ = " + std::to_string(option_config_));
|
||||||
|
Options options = CurrentOptions();
|
||||||
|
// vector memtable for temp CF does not support concurrent write
|
||||||
|
options.allow_concurrent_memtable_write = false;
|
||||||
|
CreateAndReopenWithCF({"toto"}, options);
|
||||||
|
|
||||||
|
// non-empty bottommost level
|
||||||
|
WriteOptions wo;
|
||||||
|
for (int k = 0; k < 50; ++k) {
|
||||||
|
ASSERT_OK(db_->Put(wo, handles_[1], Key(k), "base_val_" + Key(k)));
|
||||||
|
}
|
||||||
|
ASSERT_OK(Flush());
|
||||||
|
CompactRangeOptions cro;
|
||||||
|
cro.bottommost_level_compaction =
|
||||||
|
BottommostLevelCompaction::kForceOptimized;
|
||||||
|
ASSERT_OK(db_->CompactRange(cro, handles_[1], nullptr, nullptr));
|
||||||
|
// non-empty memtable
|
||||||
|
for (int k = 50; k < 100; ++k) {
|
||||||
|
ASSERT_OK(db_->Put(wo, handles_[1], Key(k), "base_val_" + Key(k)));
|
||||||
|
}
|
||||||
|
|
||||||
|
// load external data to sort, generate multiple files
|
||||||
|
Options temp_cf_opts;
|
||||||
|
ColumnFamilyHandle* temp_cfh;
|
||||||
|
temp_cf_opts.target_file_size_base = 4 << 10;
|
||||||
|
temp_cf_opts.memtable_factory.reset(new VectorRepFactory());
|
||||||
|
temp_cf_opts.allow_concurrent_memtable_write = false;
|
||||||
|
temp_cf_opts.compaction_style = kCompactionStyleUniversal;
|
||||||
|
ASSERT_OK(db_->CreateColumnFamily(temp_cf_opts, "temp_cf", &temp_cfh));
|
||||||
|
|
||||||
|
Random rnd(301);
|
||||||
|
std::vector<std::string> expected_value;
|
||||||
|
expected_value.resize(100);
|
||||||
|
// Out of order insertion of keys from 100 to 199.
|
||||||
|
for (int k = 99; k >= 0; --k) {
|
||||||
|
expected_value[k] = rnd.RandomString(200);
|
||||||
|
ASSERT_OK(db_->Put(wo, temp_cfh, Key(k + 100), expected_value[k]));
|
||||||
|
}
|
||||||
|
ASSERT_OK(db_->CompactRange(cro, temp_cfh, nullptr, nullptr));
|
||||||
|
std::vector<std::string> sst_file_paths;
|
||||||
|
ColumnFamilyMetaData cf_meta;
|
||||||
|
db_->GetColumnFamilyMetaData(temp_cfh, &cf_meta);
|
||||||
|
ASSERT_GT(cf_meta.file_count, 1);
|
||||||
|
for (const auto& level_meta : cf_meta.levels) {
|
||||||
|
if (level_meta.level + 1 < temp_cf_opts.num_levels) {
|
||||||
|
ASSERT_EQ(0, level_meta.files.size());
|
||||||
|
} else {
|
||||||
|
ASSERT_GT(level_meta.files.size(), 1);
|
||||||
|
for (const auto& meta : level_meta.files) {
|
||||||
|
ASSERT_EQ(0, meta.largest_seqno);
|
||||||
|
sst_file_paths.emplace_back(meta.directory + "/" +
|
||||||
|
meta.relative_filename);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
ASSERT_OK(
|
||||||
|
db_->IngestExternalFile(handles_[1], sst_file_paths, ingest_opts));
|
||||||
|
// Verify state of the CF1
|
||||||
|
ReadOptions ro;
|
||||||
|
std::string val;
|
||||||
|
for (int k = 0; k < 100; ++k) {
|
||||||
|
ASSERT_OK(db_->Get(ro, handles_[1], Key(k), &val));
|
||||||
|
ASSERT_EQ(val, "base_val_" + Key(k));
|
||||||
|
ASSERT_OK(db_->Get(ro, handles_[1], Key(100 + k), &val));
|
||||||
|
ASSERT_EQ(val, expected_value[k]);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Ingest into another DB.
|
||||||
|
if (!encrypted_env_) {
|
||||||
|
// Ingestion between encrypted env and non-encrypted env won't work.
|
||||||
|
std::string db2_path = test::PerThreadDBPath("DB2");
|
||||||
|
Options db2_options;
|
||||||
|
db2_options.create_if_missing = true;
|
||||||
|
DB* db2 = nullptr;
|
||||||
|
ASSERT_OK(DB::Open(db2_options, db2_path, &db2));
|
||||||
|
// Write some base data.
|
||||||
|
expected_value.emplace_back(rnd.RandomString(100));
|
||||||
|
ASSERT_OK(db2->Put(WriteOptions(), Key(200), expected_value.back()));
|
||||||
|
ASSERT_OK(db2->CompactRange(cro, nullptr, nullptr));
|
||||||
|
expected_value.emplace_back(rnd.RandomString(100));
|
||||||
|
ASSERT_OK(db2->Put(WriteOptions(), Key(201), expected_value.back()));
|
||||||
|
|
||||||
|
ASSERT_OK(db2->IngestExternalFile({sst_file_paths}, ingest_opts));
|
||||||
|
{
|
||||||
|
std::unique_ptr<Iterator> iter{db2->NewIterator(ReadOptions())};
|
||||||
|
iter->SeekToFirst();
|
||||||
|
// The DB should have keys 100-199 from ingested files, and keys 200 and
|
||||||
|
// 201 from itself.
|
||||||
|
for (int k = 100; k <= 201; ++k, iter->Next()) {
|
||||||
|
ASSERT_TRUE(iter->Valid());
|
||||||
|
ASSERT_EQ(iter->key(), Key(k));
|
||||||
|
ASSERT_EQ(iter->value(), expected_value[k - 100]);
|
||||||
|
}
|
||||||
|
ASSERT_FALSE(iter->Valid());
|
||||||
|
ASSERT_OK(iter->status());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Dropping the original CF should not affect db2, reopening it should not
|
||||||
|
// miss SST files.
|
||||||
|
ASSERT_OK(db_->DropColumnFamily(temp_cfh));
|
||||||
|
ASSERT_OK(db_->DestroyColumnFamilyHandle(temp_cfh));
|
||||||
|
ASSERT_OK(db2->Close());
|
||||||
|
delete db2;
|
||||||
|
ASSERT_OK(DB::Open(db2_options, db2_path, &db2));
|
||||||
|
ASSERT_OK(db2->Close());
|
||||||
|
delete db2;
|
||||||
|
ASSERT_OK(DestroyDB(db2_path, db2_options));
|
||||||
|
} else {
|
||||||
|
ASSERT_OK(db_->DropColumnFamily(temp_cfh));
|
||||||
|
ASSERT_OK(db_->DestroyColumnFamilyHandle(temp_cfh));
|
||||||
|
}
|
||||||
|
} while (ChangeOptions(kSkipPlainTable | kSkipFIFOCompaction));
|
||||||
|
}
|
||||||
} // namespace ROCKSDB_NAMESPACE
|
} // namespace ROCKSDB_NAMESPACE
|
||||||
|
|
||||||
int main(int argc, char** argv) {
|
int main(int argc, char** argv) {
|
||||||
|
|
|
@ -255,7 +255,6 @@ bool VersionEdit::EncodeTo(std::string* dst,
|
||||||
char p = static_cast<char>(0);
|
char p = static_cast<char>(0);
|
||||||
PutLengthPrefixedSlice(dst, Slice(&p, 1));
|
PutLengthPrefixedSlice(dst, Slice(&p, 1));
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST_SYNC_POINT_CALLBACK("VersionEdit::EncodeTo:NewFile4:CustomizeFields",
|
TEST_SYNC_POINT_CALLBACK("VersionEdit::EncodeTo:NewFile4:CustomizeFields",
|
||||||
dst);
|
dst);
|
||||||
|
|
||||||
|
|
|
@ -2179,6 +2179,24 @@ struct IngestExternalFileOptions {
|
||||||
//
|
//
|
||||||
// XXX: "bottommost" is obsolete/confusing terminology to refer to last level
|
// XXX: "bottommost" is obsolete/confusing terminology to refer to last level
|
||||||
bool fail_if_not_bottommost_level = false;
|
bool fail_if_not_bottommost_level = false;
|
||||||
|
// EXPERIMENTAL
|
||||||
|
// If set to true, ingestion will
|
||||||
|
// - allow the files to not be generated by SstFileWriter, and
|
||||||
|
// - ignore cf_id mismatch between cf_id in the files and the CF they are
|
||||||
|
// being ingested into.
|
||||||
|
//
|
||||||
|
// REQUIRES:
|
||||||
|
// - files to be ingested do not overlap with existing keys.
|
||||||
|
// - write_global_seqno = false
|
||||||
|
// - move_files = false
|
||||||
|
//
|
||||||
|
// Warning: This ONLY works for SST files where all keys have sequence number
|
||||||
|
// zero and with no duplicated user keys (this should be guaranteed if the
|
||||||
|
// file is generated by a DB with zero as the largest sequence number).
|
||||||
|
// We scan the entire SST files to validate sequence numbers.
|
||||||
|
// Warning: If a DB contains ingested files generated by another DB/CF,
|
||||||
|
// RepairDB() may not correctly recover these files. It may lose these files.
|
||||||
|
bool allow_db_generated_files = false;
|
||||||
};
|
};
|
||||||
|
|
||||||
enum TraceFilterType : uint64_t {
|
enum TraceFilterType : uint64_t {
|
||||||
|
|
|
@ -478,6 +478,7 @@ bool IsFeatureSupported(const TableProperties& table_properties,
|
||||||
}
|
}
|
||||||
|
|
||||||
// Caller has to ensure seqno is not nullptr.
|
// Caller has to ensure seqno is not nullptr.
|
||||||
|
// Set *seqno to the global sequence number for reading this file.
|
||||||
Status GetGlobalSequenceNumber(const TableProperties& table_properties,
|
Status GetGlobalSequenceNumber(const TableProperties& table_properties,
|
||||||
SequenceNumber largest_seqno,
|
SequenceNumber largest_seqno,
|
||||||
SequenceNumber* seqno) {
|
SequenceNumber* seqno) {
|
||||||
|
@ -500,12 +501,17 @@ Status GetGlobalSequenceNumber(const TableProperties& table_properties,
|
||||||
}
|
}
|
||||||
|
|
||||||
uint32_t version = DecodeFixed32(version_pos->second.c_str());
|
uint32_t version = DecodeFixed32(version_pos->second.c_str());
|
||||||
if (version < 2) {
|
if (version != 2) {
|
||||||
if (seqno_pos != props.end() || version != 1) {
|
|
||||||
std::array<char, 200> msg_buf;
|
std::array<char, 200> msg_buf;
|
||||||
|
if (version != 1) {
|
||||||
|
snprintf(msg_buf.data(), msg_buf.max_size(),
|
||||||
|
"An external sst file has corrupted version %u.", version);
|
||||||
|
return Status::Corruption(msg_buf.data());
|
||||||
|
}
|
||||||
|
if (seqno_pos != props.end()) {
|
||||||
// This is a v1 external sst file, global_seqno is not supported.
|
// This is a v1 external sst file, global_seqno is not supported.
|
||||||
snprintf(msg_buf.data(), msg_buf.max_size(),
|
snprintf(msg_buf.data(), msg_buf.max_size(),
|
||||||
"An external sst file with version %u have global seqno "
|
"An external sst file with version %u has global seqno "
|
||||||
"property with value %s",
|
"property with value %s",
|
||||||
version, seqno_pos->second.c_str());
|
version, seqno_pos->second.c_str());
|
||||||
return Status::Corruption(msg_buf.data());
|
return Status::Corruption(msg_buf.data());
|
||||||
|
|
|
@ -0,0 +1 @@
|
||||||
|
* Add support to ingest SST files generated by a DB instead of SstFileWriter. This can be enabled with experimental option `IngestExternalFileOptions::allow_db_generated_files`.
|
Loading…
Reference in New Issue