rocksdb/db/import_column_family_test.cc
Changyu Bi 9aa3b6f9ae Support range deletion tombstones in CreateColumnFamilyWithImport (#11252)
Summary:
CreateColumnFamilyWithImport() did not support range tombstones for two reasons:
1. it uses point keys of a input file to determine its boundary (smallest and largest internal key), which means range tombstones outside of the point key range will be effectively dropped.
2. it does not handle files with no point keys.

Also included a fix in external_sst_file_ingestion_job.cc where the blocks read in `GetIngestedFileInfo()` can be added to block cache now (issue fixed in https://github.com/facebook/rocksdb/pull/6429).

This PR adds support for exporting and importing column family with range tombstones. The main change is to add smallest internal key and largest internal key to `SstFileMetaData` that will be part of the output of `ExportColumnFamily()`. Then during `CreateColumnFamilyWithImport(...,const ExportImportFilesMetaData& metadata,...)`, file boundaries can be set from `metadata` directly. This is needed since when file boundaries are extended by range tombstones, sometimes they cannot be deduced from a file's content alone.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/11252

Test Plan:
- added unit tests that fails before this change

Closes https://github.com/facebook/rocksdb/issues/11245

Reviewed By: ajkr

Differential Revision: D43577443

Pulled By: cbi42

fbshipit-source-id: 6bff78e583cc50c44854994dea0a8dd519398f2f
2023-03-13 11:06:59 -07:00

747 lines
26 KiB
C++

// Copyright (c) Meta Platforms, Inc. and affiliates.
//
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include <functional>
#include "db/db_test_util.h"
#include "port/port.h"
#include "port/stack_trace.h"
#include "rocksdb/sst_file_writer.h"
#include "test_util/testutil.h"
#include "util/random.h"
namespace ROCKSDB_NAMESPACE {
class ImportColumnFamilyTest : public DBTestBase {
public:
ImportColumnFamilyTest()
: DBTestBase("import_column_family_test", /*env_do_fsync=*/true) {
sst_files_dir_ = dbname_ + "/sst_files/";
export_files_dir_ = test::PerThreadDBPath(env_, "export");
DestroyAndRecreateExternalSSTFilesDir();
import_cfh_ = nullptr;
import_cfh2_ = nullptr;
metadata_ptr_ = nullptr;
}
~ImportColumnFamilyTest() {
if (import_cfh_) {
EXPECT_OK(db_->DropColumnFamily(import_cfh_));
EXPECT_OK(db_->DestroyColumnFamilyHandle(import_cfh_));
import_cfh_ = nullptr;
}
if (import_cfh2_) {
EXPECT_OK(db_->DropColumnFamily(import_cfh2_));
EXPECT_OK(db_->DestroyColumnFamilyHandle(import_cfh2_));
import_cfh2_ = nullptr;
}
if (metadata_ptr_) {
delete metadata_ptr_;
metadata_ptr_ = nullptr;
}
EXPECT_OK(DestroyDir(env_, sst_files_dir_));
EXPECT_OK(DestroyDir(env_, export_files_dir_));
}
void DestroyAndRecreateExternalSSTFilesDir() {
EXPECT_OK(DestroyDir(env_, sst_files_dir_));
EXPECT_OK(env_->CreateDir(sst_files_dir_));
EXPECT_OK(DestroyDir(env_, export_files_dir_));
}
LiveFileMetaData LiveFileMetaDataInit(std::string name, std::string path,
int level,
SequenceNumber smallest_seqno,
SequenceNumber largest_seqno) {
LiveFileMetaData metadata;
metadata.name = name;
metadata.db_path = path;
metadata.smallest_seqno = smallest_seqno;
metadata.largest_seqno = largest_seqno;
metadata.level = level;
return metadata;
}
protected:
std::string sst_files_dir_;
std::string export_files_dir_;
ColumnFamilyHandle* import_cfh_;
ColumnFamilyHandle* import_cfh2_;
ExportImportFilesMetaData* metadata_ptr_;
};
TEST_F(ImportColumnFamilyTest, ImportSSTFileWriterFiles) {
Options options = CurrentOptions();
CreateAndReopenWithCF({"koko"}, options);
SstFileWriter sfw_cf1(EnvOptions(), options, handles_[1]);
SstFileWriter sfw_unknown(EnvOptions(), options);
// cf1.sst
const std::string cf1_sst_name = "cf1.sst";
const std::string cf1_sst = sst_files_dir_ + cf1_sst_name;
ASSERT_OK(sfw_cf1.Open(cf1_sst));
ASSERT_OK(sfw_cf1.Put("K1", "V1"));
ASSERT_OK(sfw_cf1.Put("K2", "V2"));
ASSERT_OK(sfw_cf1.Finish());
// cf_unknown.sst
const std::string unknown_sst_name = "cf_unknown.sst";
const std::string unknown_sst = sst_files_dir_ + unknown_sst_name;
ASSERT_OK(sfw_unknown.Open(unknown_sst));
ASSERT_OK(sfw_unknown.Put("K3", "V1"));
ASSERT_OK(sfw_unknown.Put("K4", "V2"));
ASSERT_OK(sfw_unknown.Finish());
{
// Import sst file corresponding to cf1 onto a new cf and verify
ExportImportFilesMetaData metadata;
metadata.files.push_back(
LiveFileMetaDataInit(cf1_sst_name, sst_files_dir_, 0, 10, 19));
metadata.db_comparator_name = options.comparator->Name();
ASSERT_OK(db_->CreateColumnFamilyWithImport(
options, "toto", ImportColumnFamilyOptions(), metadata, &import_cfh_));
ASSERT_NE(import_cfh_, nullptr);
std::string value;
ASSERT_OK(db_->Get(ReadOptions(), import_cfh_, "K1", &value));
ASSERT_EQ(value, "V1");
ASSERT_OK(db_->Get(ReadOptions(), import_cfh_, "K2", &value));
ASSERT_EQ(value, "V2");
ASSERT_OK(db_->DropColumnFamily(import_cfh_));
ASSERT_OK(db_->DestroyColumnFamilyHandle(import_cfh_));
import_cfh_ = nullptr;
}
{
// Import sst file corresponding to unknown cf onto a new cf and verify
ExportImportFilesMetaData metadata;
metadata.files.push_back(
LiveFileMetaDataInit(unknown_sst_name, sst_files_dir_, 0, 20, 29));
metadata.db_comparator_name = options.comparator->Name();
ASSERT_OK(db_->CreateColumnFamilyWithImport(
options, "yoyo", ImportColumnFamilyOptions(), metadata, &import_cfh_));
ASSERT_NE(import_cfh_, nullptr);
std::string value;
ASSERT_OK(db_->Get(ReadOptions(), import_cfh_, "K3", &value));
ASSERT_EQ(value, "V1");
ASSERT_OK(db_->Get(ReadOptions(), import_cfh_, "K4", &value));
ASSERT_EQ(value, "V2");
}
EXPECT_OK(db_->DestroyColumnFamilyHandle(import_cfh_));
import_cfh_ = nullptr;
// verify sst unique id during reopen
options.verify_sst_unique_id_in_manifest = true;
ReopenWithColumnFamilies({"default", "koko", "yoyo"}, options);
}
TEST_F(ImportColumnFamilyTest, ImportSSTFileWriterFilesWithOverlap) {
Options options = CurrentOptions();
CreateAndReopenWithCF({"koko"}, options);
SstFileWriter sfw_cf1(EnvOptions(), options, handles_[1]);
// file3.sst
const std::string file3_sst_name = "file3.sst";
const std::string file3_sst = sst_files_dir_ + file3_sst_name;
ASSERT_OK(sfw_cf1.Open(file3_sst));
for (int i = 0; i < 100; ++i) {
ASSERT_OK(sfw_cf1.Put(Key(i), Key(i) + "_val"));
}
ASSERT_OK(sfw_cf1.Finish());
// file2.sst
const std::string file2_sst_name = "file2.sst";
const std::string file2_sst = sst_files_dir_ + file2_sst_name;
ASSERT_OK(sfw_cf1.Open(file2_sst));
for (int i = 0; i < 100; i += 2) {
ASSERT_OK(sfw_cf1.Put(Key(i), Key(i) + "_overwrite1"));
}
ASSERT_OK(sfw_cf1.Finish());
// file1a.sst
const std::string file1a_sst_name = "file1a.sst";
const std::string file1a_sst = sst_files_dir_ + file1a_sst_name;
ASSERT_OK(sfw_cf1.Open(file1a_sst));
for (int i = 0; i < 52; i += 4) {
ASSERT_OK(sfw_cf1.Put(Key(i), Key(i) + "_overwrite2"));
}
ASSERT_OK(sfw_cf1.Finish());
// file1b.sst
const std::string file1b_sst_name = "file1b.sst";
const std::string file1b_sst = sst_files_dir_ + file1b_sst_name;
ASSERT_OK(sfw_cf1.Open(file1b_sst));
for (int i = 52; i < 100; i += 4) {
ASSERT_OK(sfw_cf1.Put(Key(i), Key(i) + "_overwrite2"));
}
ASSERT_OK(sfw_cf1.Finish());
// file0a.sst
const std::string file0a_sst_name = "file0a.sst";
const std::string file0a_sst = sst_files_dir_ + file0a_sst_name;
ASSERT_OK(sfw_cf1.Open(file0a_sst));
for (int i = 0; i < 100; i += 16) {
ASSERT_OK(sfw_cf1.Put(Key(i), Key(i) + "_overwrite3"));
}
ASSERT_OK(sfw_cf1.Finish());
// file0b.sst
const std::string file0b_sst_name = "file0b.sst";
const std::string file0b_sst = sst_files_dir_ + file0b_sst_name;
ASSERT_OK(sfw_cf1.Open(file0b_sst));
for (int i = 0; i < 100; i += 16) {
ASSERT_OK(sfw_cf1.Put(Key(i), Key(i) + "_overwrite4"));
}
ASSERT_OK(sfw_cf1.Finish());
// Import sst files and verify
ExportImportFilesMetaData metadata;
metadata.files.push_back(
LiveFileMetaDataInit(file3_sst_name, sst_files_dir_, 3, 10, 19));
metadata.files.push_back(
LiveFileMetaDataInit(file2_sst_name, sst_files_dir_, 2, 20, 29));
metadata.files.push_back(
LiveFileMetaDataInit(file1a_sst_name, sst_files_dir_, 1, 30, 34));
metadata.files.push_back(
LiveFileMetaDataInit(file1b_sst_name, sst_files_dir_, 1, 35, 39));
metadata.files.push_back(
LiveFileMetaDataInit(file0a_sst_name, sst_files_dir_, 0, 40, 49));
metadata.files.push_back(
LiveFileMetaDataInit(file0b_sst_name, sst_files_dir_, 0, 50, 59));
metadata.db_comparator_name = options.comparator->Name();
ASSERT_OK(db_->CreateColumnFamilyWithImport(
options, "toto", ImportColumnFamilyOptions(), metadata, &import_cfh_));
ASSERT_NE(import_cfh_, nullptr);
for (int i = 0; i < 100; i++) {
std::string value;
ASSERT_OK(db_->Get(ReadOptions(), import_cfh_, Key(i), &value));
if (i % 16 == 0) {
ASSERT_EQ(value, Key(i) + "_overwrite4");
} else if (i % 4 == 0) {
ASSERT_EQ(value, Key(i) + "_overwrite2");
} else if (i % 2 == 0) {
ASSERT_EQ(value, Key(i) + "_overwrite1");
} else {
ASSERT_EQ(value, Key(i) + "_val");
}
}
for (int i = 0; i < 100; i += 5) {
ASSERT_OK(
db_->Put(WriteOptions(), import_cfh_, Key(i), Key(i) + "_overwrite5"));
}
// Flush and check again
ASSERT_OK(db_->Flush(FlushOptions(), import_cfh_));
for (int i = 0; i < 100; i++) {
std::string value;
ASSERT_OK(db_->Get(ReadOptions(), import_cfh_, Key(i), &value));
if (i % 5 == 0) {
ASSERT_EQ(value, Key(i) + "_overwrite5");
} else if (i % 16 == 0) {
ASSERT_EQ(value, Key(i) + "_overwrite4");
} else if (i % 4 == 0) {
ASSERT_EQ(value, Key(i) + "_overwrite2");
} else if (i % 2 == 0) {
ASSERT_EQ(value, Key(i) + "_overwrite1");
} else {
ASSERT_EQ(value, Key(i) + "_val");
}
}
// Compact and check again.
ASSERT_OK(
db_->CompactRange(CompactRangeOptions(), import_cfh_, nullptr, nullptr));
for (int i = 0; i < 100; i++) {
std::string value;
ASSERT_OK(db_->Get(ReadOptions(), import_cfh_, Key(i), &value));
if (i % 5 == 0) {
ASSERT_EQ(value, Key(i) + "_overwrite5");
} else if (i % 16 == 0) {
ASSERT_EQ(value, Key(i) + "_overwrite4");
} else if (i % 4 == 0) {
ASSERT_EQ(value, Key(i) + "_overwrite2");
} else if (i % 2 == 0) {
ASSERT_EQ(value, Key(i) + "_overwrite1");
} else {
ASSERT_EQ(value, Key(i) + "_val");
}
}
}
TEST_F(ImportColumnFamilyTest, ImportSSTFileWriterFilesWithRangeTombstone) {
// Test for a bug where import file's smallest and largest key did not
// consider range tombstone.
Options options = CurrentOptions();
CreateAndReopenWithCF({"koko"}, options);
SstFileWriter sfw_cf1(EnvOptions(), options, handles_[1]);
// cf1.sst
const std::string cf1_sst_name = "cf1.sst";
const std::string cf1_sst = sst_files_dir_ + cf1_sst_name;
ASSERT_OK(sfw_cf1.Open(cf1_sst));
ASSERT_OK(sfw_cf1.Put("K1", "V1"));
ASSERT_OK(sfw_cf1.Put("K2", "V2"));
ASSERT_OK(sfw_cf1.DeleteRange("K3", "K4"));
ASSERT_OK(sfw_cf1.Finish());
// Import sst file corresponding to cf1 onto a new cf and verify
ExportImportFilesMetaData metadata;
metadata.files.push_back(
LiveFileMetaDataInit(cf1_sst_name, sst_files_dir_, 0, 0, 19));
metadata.db_comparator_name = options.comparator->Name();
ASSERT_OK(db_->CreateColumnFamilyWithImport(
options, "toto", ImportColumnFamilyOptions(), metadata, &import_cfh_));
ASSERT_NE(import_cfh_, nullptr);
ColumnFamilyMetaData import_cf_meta;
db_->GetColumnFamilyMetaData(import_cfh_, &import_cf_meta);
ASSERT_EQ(import_cf_meta.file_count, 1);
const SstFileMetaData* file_meta = nullptr;
for (const auto& level_meta : import_cf_meta.levels) {
if (!level_meta.files.empty()) {
file_meta = &(level_meta.files[0]);
break;
}
}
ASSERT_TRUE(file_meta != nullptr);
InternalKey largest;
largest.DecodeFrom(file_meta->largest);
ASSERT_EQ(largest.user_key(), "K4");
std::string value;
ASSERT_OK(db_->Get(ReadOptions(), import_cfh_, "K1", &value));
ASSERT_EQ(value, "V1");
ASSERT_OK(db_->Get(ReadOptions(), import_cfh_, "K2", &value));
ASSERT_EQ(value, "V2");
ASSERT_OK(db_->DropColumnFamily(import_cfh_));
ASSERT_OK(db_->DestroyColumnFamilyHandle(import_cfh_));
import_cfh_ = nullptr;
}
TEST_F(ImportColumnFamilyTest, ImportExportedSSTFromAnotherCF) {
Options options = CurrentOptions();
CreateAndReopenWithCF({"koko"}, options);
for (int i = 0; i < 100; ++i) {
ASSERT_OK(Put(1, Key(i), Key(i) + "_val"));
}
ASSERT_OK(Flush(1));
ASSERT_OK(
db_->CompactRange(CompactRangeOptions(), handles_[1], nullptr, nullptr));
// Overwrite the value in the same set of keys.
for (int i = 0; i < 100; ++i) {
ASSERT_OK(Put(1, Key(i), Key(i) + "_overwrite"));
}
// Flush to create L0 file.
ASSERT_OK(Flush(1));
for (int i = 0; i < 100; ++i) {
ASSERT_OK(Put(1, Key(i), Key(i) + "_overwrite2"));
}
// Flush again to create another L0 file. It should have higher sequencer.
ASSERT_OK(Flush(1));
Checkpoint* checkpoint;
ASSERT_OK(Checkpoint::Create(db_, &checkpoint));
ASSERT_OK(checkpoint->ExportColumnFamily(handles_[1], export_files_dir_,
&metadata_ptr_));
ASSERT_NE(metadata_ptr_, nullptr);
delete checkpoint;
ImportColumnFamilyOptions import_options;
import_options.move_files = false;
ASSERT_OK(db_->CreateColumnFamilyWithImport(options, "toto", import_options,
*metadata_ptr_, &import_cfh_));
ASSERT_NE(import_cfh_, nullptr);
import_options.move_files = true;
ASSERT_OK(db_->CreateColumnFamilyWithImport(options, "yoyo", import_options,
*metadata_ptr_, &import_cfh2_));
ASSERT_NE(import_cfh2_, nullptr);
delete metadata_ptr_;
metadata_ptr_ = NULL;
std::string value1, value2;
for (int i = 0; i < 100; ++i) {
ASSERT_OK(db_->Get(ReadOptions(), import_cfh_, Key(i), &value1));
ASSERT_EQ(Get(1, Key(i)), value1);
}
for (int i = 0; i < 100; ++i) {
ASSERT_OK(db_->Get(ReadOptions(), import_cfh2_, Key(i), &value2));
ASSERT_EQ(Get(1, Key(i)), value2);
}
// Modify keys in cf1 and verify.
for (int i = 0; i < 25; i++) {
ASSERT_OK(db_->Delete(WriteOptions(), import_cfh_, Key(i)));
}
for (int i = 25; i < 50; i++) {
ASSERT_OK(
db_->Put(WriteOptions(), import_cfh_, Key(i), Key(i) + "_overwrite3"));
}
for (int i = 0; i < 25; ++i) {
ASSERT_TRUE(
db_->Get(ReadOptions(), import_cfh_, Key(i), &value1).IsNotFound());
}
for (int i = 25; i < 50; ++i) {
ASSERT_OK(db_->Get(ReadOptions(), import_cfh_, Key(i), &value1));
ASSERT_EQ(Key(i) + "_overwrite3", value1);
}
for (int i = 50; i < 100; ++i) {
ASSERT_OK(db_->Get(ReadOptions(), import_cfh_, Key(i), &value1));
ASSERT_EQ(Key(i) + "_overwrite2", value1);
}
for (int i = 0; i < 100; ++i) {
ASSERT_OK(db_->Get(ReadOptions(), import_cfh2_, Key(i), &value2));
ASSERT_EQ(Get(1, Key(i)), value2);
}
// Compact and check again.
ASSERT_OK(db_->Flush(FlushOptions(), import_cfh_));
ASSERT_OK(
db_->CompactRange(CompactRangeOptions(), import_cfh_, nullptr, nullptr));
for (int i = 0; i < 25; ++i) {
ASSERT_TRUE(
db_->Get(ReadOptions(), import_cfh_, Key(i), &value1).IsNotFound());
}
for (int i = 25; i < 50; ++i) {
ASSERT_OK(db_->Get(ReadOptions(), import_cfh_, Key(i), &value1));
ASSERT_EQ(Key(i) + "_overwrite3", value1);
}
for (int i = 50; i < 100; ++i) {
ASSERT_OK(db_->Get(ReadOptions(), import_cfh_, Key(i), &value1));
ASSERT_EQ(Key(i) + "_overwrite2", value1);
}
for (int i = 0; i < 100; ++i) {
ASSERT_OK(db_->Get(ReadOptions(), import_cfh2_, Key(i), &value2));
ASSERT_EQ(Get(1, Key(i)), value2);
}
}
TEST_F(ImportColumnFamilyTest, ImportExportedSSTFromAnotherDB) {
Options options = CurrentOptions();
CreateAndReopenWithCF({"koko"}, options);
for (int i = 0; i < 100; ++i) {
ASSERT_OK(Put(1, Key(i), Key(i) + "_val"));
}
ASSERT_OK(Flush(1));
// Compact to create a L1 file.
ASSERT_OK(
db_->CompactRange(CompactRangeOptions(), handles_[1], nullptr, nullptr));
// Overwrite the value in the same set of keys.
for (int i = 0; i < 50; ++i) {
ASSERT_OK(Put(1, Key(i), Key(i) + "_overwrite"));
}
// Flush to create L0 file.
ASSERT_OK(Flush(1));
for (int i = 0; i < 25; ++i) {
ASSERT_OK(Put(1, Key(i), Key(i) + "_overwrite2"));
}
// Flush again to create another L0 file. It should have higher sequencer.
ASSERT_OK(Flush(1));
Checkpoint* checkpoint;
ASSERT_OK(Checkpoint::Create(db_, &checkpoint));
ASSERT_OK(checkpoint->ExportColumnFamily(handles_[1], export_files_dir_,
&metadata_ptr_));
ASSERT_NE(metadata_ptr_, nullptr);
delete checkpoint;
// Create a new db and import the files.
DB* db_copy;
ASSERT_OK(DestroyDir(env_, dbname_ + "/db_copy"));
ASSERT_OK(DB::Open(options, dbname_ + "/db_copy", &db_copy));
ColumnFamilyHandle* cfh = nullptr;
ASSERT_OK(db_copy->CreateColumnFamilyWithImport(ColumnFamilyOptions(), "yoyo",
ImportColumnFamilyOptions(),
*metadata_ptr_, &cfh));
ASSERT_NE(cfh, nullptr);
for (int i = 0; i < 100; ++i) {
std::string value;
ASSERT_OK(db_copy->Get(ReadOptions(), cfh, Key(i), &value));
ASSERT_EQ(Get(1, Key(i)), value);
}
ASSERT_OK(db_copy->DropColumnFamily(cfh));
ASSERT_OK(db_copy->DestroyColumnFamilyHandle(cfh));
delete db_copy;
ASSERT_OK(DestroyDir(env_, dbname_ + "/db_copy"));
}
TEST_F(ImportColumnFamilyTest,
ImportExportedSSTFromAnotherCFWithRangeTombstone) {
// Test for a bug where import file's smallest and largest key did not
// consider range tombstone.
Options options = CurrentOptions();
options.disable_auto_compactions = true;
CreateAndReopenWithCF({"koko"}, options);
for (int i = 10; i < 20; ++i) {
ASSERT_OK(Put(1, Key(i), Key(i) + "_val"));
}
ASSERT_OK(Flush(1 /* cf */));
MoveFilesToLevel(1 /* level */, 1 /* cf */);
const Snapshot* snapshot = db_->GetSnapshot();
ASSERT_OK(db_->DeleteRange(WriteOptions(), handles_[1], Key(0), Key(25)));
ASSERT_OK(Put(1, Key(1), "t"));
ASSERT_OK(Flush(1));
// Tests importing a range tombstone only file
ASSERT_OK(db_->DeleteRange(WriteOptions(), handles_[1], Key(0), Key(2)));
Checkpoint* checkpoint;
ASSERT_OK(Checkpoint::Create(db_, &checkpoint));
ASSERT_OK(checkpoint->ExportColumnFamily(handles_[1], export_files_dir_,
&metadata_ptr_));
ASSERT_NE(metadata_ptr_, nullptr);
delete checkpoint;
ImportColumnFamilyOptions import_options;
import_options.move_files = false;
ASSERT_OK(db_->CreateColumnFamilyWithImport(options, "toto", import_options,
*metadata_ptr_, &import_cfh_));
ASSERT_NE(import_cfh_, nullptr);
import_options.move_files = true;
ASSERT_OK(db_->CreateColumnFamilyWithImport(options, "yoyo", import_options,
*metadata_ptr_, &import_cfh2_));
ASSERT_NE(import_cfh2_, nullptr);
delete metadata_ptr_;
metadata_ptr_ = nullptr;
std::string value1, value2;
ReadOptions ro_latest;
ReadOptions ro_snapshot;
ro_snapshot.snapshot = snapshot;
for (int i = 10; i < 20; ++i) {
ASSERT_TRUE(db_->Get(ro_latest, import_cfh_, Key(i), &value1).IsNotFound());
ASSERT_OK(db_->Get(ro_snapshot, import_cfh_, Key(i), &value1));
ASSERT_EQ(Get(1, Key(i), snapshot), value1);
}
ASSERT_TRUE(db_->Get(ro_latest, import_cfh_, Key(1), &value1).IsNotFound());
for (int i = 10; i < 20; ++i) {
ASSERT_TRUE(
db_->Get(ro_latest, import_cfh2_, Key(i), &value1).IsNotFound());
ASSERT_OK(db_->Get(ro_snapshot, import_cfh2_, Key(i), &value2));
ASSERT_EQ(Get(1, Key(i), snapshot), value2);
}
ASSERT_TRUE(db_->Get(ro_latest, import_cfh2_, Key(1), &value1).IsNotFound());
db_->ReleaseSnapshot(snapshot);
}
TEST_F(ImportColumnFamilyTest, LevelFilesOverlappingAtEndpoints) {
// Imports a column family containing a level where two files overlap at their
// endpoints. "Overlap" means the largest user key in one file is the same as
// the smallest user key in the second file.
const int kFileBytes = 128 << 10; // 128KB
const int kValueBytes = 1 << 10; // 1KB
const int kNumFiles = 4;
Options options = CurrentOptions();
options.disable_auto_compactions = true;
options.num_levels = 2;
CreateAndReopenWithCF({"koko"}, options);
Random rnd(301);
// Every key is snapshot protected to ensure older versions will not be
// dropped during compaction.
std::vector<const Snapshot*> snapshots;
snapshots.reserve(kFileBytes / kValueBytes * kNumFiles);
for (int i = 0; i < kNumFiles; ++i) {
for (int j = 0; j < kFileBytes / kValueBytes; ++j) {
auto value = rnd.RandomString(kValueBytes);
ASSERT_OK(Put(1, "key", value));
snapshots.push_back(db_->GetSnapshot());
}
ASSERT_OK(Flush(1));
}
// Compact to create overlapping L1 files.
ASSERT_OK(
db_->CompactRange(CompactRangeOptions(), handles_[1], nullptr, nullptr));
ASSERT_GT(NumTableFilesAtLevel(1, 1), 1);
Checkpoint* checkpoint;
ASSERT_OK(Checkpoint::Create(db_, &checkpoint));
ASSERT_OK(checkpoint->ExportColumnFamily(handles_[1], export_files_dir_,
&metadata_ptr_));
ASSERT_NE(metadata_ptr_, nullptr);
delete checkpoint;
// Create a new db and import the files.
DB* db_copy;
ASSERT_OK(DestroyDir(env_, dbname_ + "/db_copy"));
ASSERT_OK(DB::Open(options, dbname_ + "/db_copy", &db_copy));
ColumnFamilyHandle* cfh = nullptr;
ASSERT_OK(db_copy->CreateColumnFamilyWithImport(ColumnFamilyOptions(), "yoyo",
ImportColumnFamilyOptions(),
*metadata_ptr_, &cfh));
ASSERT_NE(cfh, nullptr);
{
std::string value;
ASSERT_OK(db_copy->Get(ReadOptions(), cfh, "key", &value));
}
ASSERT_OK(db_copy->DropColumnFamily(cfh));
ASSERT_OK(db_copy->DestroyColumnFamilyHandle(cfh));
delete db_copy;
ASSERT_OK(DestroyDir(env_, dbname_ + "/db_copy"));
for (const Snapshot* snapshot : snapshots) {
db_->ReleaseSnapshot(snapshot);
}
}
TEST_F(ImportColumnFamilyTest, ImportColumnFamilyNegativeTest) {
Options options = CurrentOptions();
CreateAndReopenWithCF({"koko"}, options);
{
// Create column family with existing cf name.
ExportImportFilesMetaData metadata;
ASSERT_EQ(db_->CreateColumnFamilyWithImport(ColumnFamilyOptions(), "koko",
ImportColumnFamilyOptions(),
metadata, &import_cfh_),
Status::InvalidArgument("Column family already exists"));
ASSERT_EQ(import_cfh_, nullptr);
}
{
// Import with no files specified.
ExportImportFilesMetaData metadata;
ASSERT_EQ(db_->CreateColumnFamilyWithImport(ColumnFamilyOptions(), "yoyo",
ImportColumnFamilyOptions(),
metadata, &import_cfh_),
Status::InvalidArgument("The list of files is empty"));
ASSERT_EQ(import_cfh_, nullptr);
}
{
// Import with overlapping keys in sst files.
ExportImportFilesMetaData metadata;
SstFileWriter sfw_cf1(EnvOptions(), options, handles_[1]);
const std::string file1_sst_name = "file1.sst";
const std::string file1_sst = sst_files_dir_ + file1_sst_name;
ASSERT_OK(sfw_cf1.Open(file1_sst));
ASSERT_OK(sfw_cf1.Put("K1", "V1"));
ASSERT_OK(sfw_cf1.Put("K2", "V2"));
ASSERT_OK(sfw_cf1.Finish());
const std::string file2_sst_name = "file2.sst";
const std::string file2_sst = sst_files_dir_ + file2_sst_name;
ASSERT_OK(sfw_cf1.Open(file2_sst));
ASSERT_OK(sfw_cf1.Put("K2", "V2"));
ASSERT_OK(sfw_cf1.Put("K3", "V3"));
ASSERT_OK(sfw_cf1.Finish());
metadata.files.push_back(
LiveFileMetaDataInit(file1_sst_name, sst_files_dir_, 1, 10, 19));
metadata.files.push_back(
LiveFileMetaDataInit(file2_sst_name, sst_files_dir_, 1, 10, 19));
metadata.db_comparator_name = options.comparator->Name();
ASSERT_NOK(db_->CreateColumnFamilyWithImport(ColumnFamilyOptions(), "yoyo",
ImportColumnFamilyOptions(),
metadata, &import_cfh_));
ASSERT_EQ(import_cfh_, nullptr);
}
{
// Import with a mismatching comparator, should fail with appropriate error.
ExportImportFilesMetaData metadata;
Options mismatch_options = CurrentOptions();
mismatch_options.comparator = ReverseBytewiseComparator();
SstFileWriter sfw_cf1(EnvOptions(), mismatch_options, handles_[1]);
const std::string file1_sst_name = "file1.sst";
const std::string file1_sst = sst_files_dir_ + file1_sst_name;
ASSERT_OK(sfw_cf1.Open(file1_sst));
ASSERT_OK(sfw_cf1.Put("K2", "V2"));
ASSERT_OK(sfw_cf1.Put("K1", "V1"));
ASSERT_OK(sfw_cf1.Finish());
metadata.files.push_back(
LiveFileMetaDataInit(file1_sst_name, sst_files_dir_, 1, 10, 19));
metadata.db_comparator_name = mismatch_options.comparator->Name();
ASSERT_EQ(db_->CreateColumnFamilyWithImport(ColumnFamilyOptions(), "coco",
ImportColumnFamilyOptions(),
metadata, &import_cfh_),
Status::InvalidArgument("Comparator name mismatch"));
ASSERT_EQ(import_cfh_, nullptr);
}
{
// Import with non existent sst file should fail with appropriate error
ExportImportFilesMetaData metadata;
SstFileWriter sfw_cf1(EnvOptions(), options, handles_[1]);
const std::string file1_sst_name = "file1.sst";
const std::string file1_sst = sst_files_dir_ + file1_sst_name;
ASSERT_OK(sfw_cf1.Open(file1_sst));
ASSERT_OK(sfw_cf1.Put("K1", "V1"));
ASSERT_OK(sfw_cf1.Put("K2", "V2"));
ASSERT_OK(sfw_cf1.Finish());
const std::string file3_sst_name = "file3.sst";
metadata.files.push_back(
LiveFileMetaDataInit(file1_sst_name, sst_files_dir_, 1, 10, 19));
metadata.files.push_back(
LiveFileMetaDataInit(file3_sst_name, sst_files_dir_, 1, 10, 19));
metadata.db_comparator_name = options.comparator->Name();
ASSERT_EQ(db_->CreateColumnFamilyWithImport(ColumnFamilyOptions(), "yoyo",
ImportColumnFamilyOptions(),
metadata, &import_cfh_),
Status::IOError("No such file or directory"));
ASSERT_EQ(import_cfh_, nullptr);
// Test successful import after a failure with the same CF name. Ensures
// there is no side effect with CF when there is a failed import
metadata.files.pop_back();
metadata.db_comparator_name = options.comparator->Name();
ASSERT_OK(db_->CreateColumnFamilyWithImport(ColumnFamilyOptions(), "yoyo",
ImportColumnFamilyOptions(),
metadata, &import_cfh_));
ASSERT_NE(import_cfh_, nullptr);
}
}
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {
ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}