Added check_snapshot option in the DB's AddFile function (#1261)

* Added check_snapshot option in the DB's AddFile function

* change check_snapshot to skip_snapshot_check

* add unit test for skip_snapshot_check

* Add skip_snapshot_check comment
This commit is contained in:
Zongzhi Chen 2016-08-10 09:14:13 +08:00 committed by Islam AbdelRahman
parent 9fd68b7fb6
commit 98d0b78eac
6 changed files with 105 additions and 28 deletions

View file

@ -259,10 +259,10 @@ class DBImpl : public DB {
using DB::AddFile;
virtual Status AddFile(ColumnFamilyHandle* column_family,
const std::vector<ExternalSstFileInfo>& file_info_list,
bool move_file) override;
bool move_file, bool skip_snapshot_check) override;
virtual Status AddFile(ColumnFamilyHandle* column_family,
const std::vector<std::string>& file_path_list,
bool move_file) override;
bool move_file, bool skip_snapshot_check) override;
#endif // ROCKSDB_LITE

View file

@ -105,7 +105,7 @@ Status DBImpl::ReadExternalSstFileInfo(ColumnFamilyHandle* column_family,
Status DBImpl::AddFile(ColumnFamilyHandle* column_family,
const std::vector<std::string>& file_path_list,
bool move_file) {
bool move_file, bool skip_snapshot_check) {
Status status;
auto num_files = file_path_list.size();
if (num_files == 0) {
@ -120,12 +120,12 @@ Status DBImpl::AddFile(ColumnFamilyHandle* column_family,
return status;
}
}
return AddFile(column_family, file_info_list, move_file);
return AddFile(column_family, file_info_list, move_file, skip_snapshot_check);
}
Status DBImpl::AddFile(ColumnFamilyHandle* column_family,
const std::vector<ExternalSstFileInfo>& file_info_list,
bool move_file) {
bool move_file, bool skip_snapshot_check) {
Status status;
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
ColumnFamilyData* cfd = cfh->cfd();
@ -244,7 +244,7 @@ Status DBImpl::AddFile(ColumnFamilyHandle* column_family,
WriteThread::Writer w;
write_thread_.EnterUnbatched(&w, &mutex_);
if (!snapshots_.empty()) {
if (!skip_snapshot_check && !snapshots_.empty()) {
// Check that no snapshots are being held
status =
Status::NotSupported("Cannot add a file while holding snapshots");

View file

@ -1272,6 +1272,81 @@ TEST_F(DBSSTTest, AddExternalSstFileNoCopy) {
}
}
TEST_F(DBSSTTest, AddExternalSstFileSkipSnapshot) {
std::string sst_files_folder = test::TmpDir(env_) + "/sst_files/";
env_->CreateDir(sst_files_folder);
Options options = CurrentOptions();
options.env = env_;
SstFileWriter sst_file_writer(EnvOptions(), options, options.comparator);
// file1.sst (0 => 99)
std::string file1 = sst_files_folder + "file1.sst";
ASSERT_OK(sst_file_writer.Open(file1));
for (int k = 0; k < 100; k++) {
ASSERT_OK(sst_file_writer.Add(Key(k), Key(k) + "_val"));
}
ExternalSstFileInfo file1_info;
Status s = sst_file_writer.Finish(&file1_info);
ASSERT_TRUE(s.ok()) << s.ToString();
ASSERT_EQ(file1_info.file_path, file1);
ASSERT_EQ(file1_info.num_entries, 100);
ASSERT_EQ(file1_info.smallest_key, Key(0));
ASSERT_EQ(file1_info.largest_key, Key(99));
// file2.sst (100 => 299)
std::string file2 = sst_files_folder + "file2.sst";
ASSERT_OK(sst_file_writer.Open(file2));
for (int k = 100; k < 300; k++) {
ASSERT_OK(sst_file_writer.Add(Key(k), Key(k) + "_val"));
}
ExternalSstFileInfo file2_info;
s = sst_file_writer.Finish(&file2_info);
ASSERT_TRUE(s.ok()) << s.ToString();
ASSERT_EQ(file2_info.file_path, file2);
ASSERT_EQ(file2_info.num_entries, 200);
ASSERT_EQ(file2_info.smallest_key, Key(100));
ASSERT_EQ(file2_info.largest_key, Key(299));
ASSERT_OK(db_->AddFile(std::vector<ExternalSstFileInfo>(1, file1_info)));
// Add file will fail when holding snapshot and use the default
// skip_snapshot_check to false
const Snapshot* s1 = db_->GetSnapshot();
if (s1 != nullptr) {
ASSERT_NOK(db_->AddFile(std::vector<ExternalSstFileInfo>(1, file2_info)));
}
// Add file will success when set skip_snapshot_check to true even db holding
// snapshot
if (s1 != nullptr) {
ASSERT_OK(db_->AddFile(std::vector<ExternalSstFileInfo>(1, file2_info), false, true));
db_->ReleaseSnapshot(s1);
}
// file3.sst (300 => 399)
std::string file3 = sst_files_folder + "file3.sst";
ASSERT_OK(sst_file_writer.Open(file3));
for (int k = 300; k < 400; k++) {
ASSERT_OK(sst_file_writer.Add(Key(k), Key(k) + "_val"));
}
ExternalSstFileInfo file3_info;
s = sst_file_writer.Finish(&file3_info);
ASSERT_TRUE(s.ok()) << s.ToString();
ASSERT_EQ(file3_info.file_path, file3);
ASSERT_EQ(file3_info.num_entries, 100);
ASSERT_EQ(file3_info.smallest_key, Key(300));
ASSERT_EQ(file3_info.largest_key, Key(399));
// check that we have change the old key
ASSERT_EQ(Get(Key(300)), "NOT_FOUND");
const Snapshot* s2 = db_->GetSnapshot();
ASSERT_OK(db_->AddFile(std::vector<ExternalSstFileInfo>(1, file3_info), false, true));
ASSERT_EQ(Get(Key(300)), Key(300) + ("_val"));
ASSERT_EQ(Get(Key(300), s2), Key(300) + ("_val"));
}
TEST_F(DBSSTTest, AddExternalSstFileMultiThreaded) {
std::string sst_files_folder = test::TmpDir(env_) + "/sst_files/";
// Bulk load 10 files every file contain 1000 keys

View file

@ -2649,12 +2649,12 @@ class ModelDB : public DB {
using DB::AddFile;
virtual Status AddFile(ColumnFamilyHandle* column_family,
const std::vector<ExternalSstFileInfo>& file_info_list,
bool move_file) override {
bool move_file, bool skip_snapshot_check) override {
return Status::NotSupported("Not implemented.");
}
virtual Status AddFile(ColumnFamilyHandle* column_family,
const std::vector<std::string>& file_path_list,
bool move_file) override {
bool move_file, bool skip_snapshot_check) override {
return Status::NotSupported("Not implemented.");
}

View file

@ -806,22 +806,24 @@ class DB {
// "column_family", a vector of ExternalSstFileInfo can be used
// instead of "file_path_list" to do a blind batch add that wont
// need to read the file, move_file can be set to true to
// move the files instead of copying them.
// move the files instead of copying them, skip_snapshot_check can be set to
// true to ignore the snapshot, make sure that you know that when you use it,
// snapshots see the data that is added in the new files.
//
// Current Requirements:
// (1) The key ranges of the files don't overlap with each other
// (2) The key range of any file in list doesn't overlap with
// existing keys or tombstones in DB.
// (3) No snapshots are held.
// (3) No snapshots are held (check skip_snapshot_check to skip this check).
//
// Notes: We will try to ingest the files to the lowest possible level
// even if the file compression dont match the level compression
virtual Status AddFile(ColumnFamilyHandle* column_family,
const std::vector<std::string>& file_path_list,
bool move_file = false) = 0;
bool move_file = false, bool skip_snapshot_check = false) = 0;
virtual Status AddFile(const std::vector<std::string>& file_path_list,
bool move_file = false) {
return AddFile(DefaultColumnFamily(), file_path_list, move_file);
bool move_file = false, bool skip_snapshot_check = false) {
return AddFile(DefaultColumnFamily(), file_path_list, move_file, skip_snapshot_check);
}
#if defined(__GNUC__) || defined(__clang__)
__attribute__((__deprecated__))
@ -830,9 +832,9 @@ class DB {
#endif
virtual Status
AddFile(ColumnFamilyHandle* column_family, const std::string& file_path,
bool move_file = false) {
bool move_file = false, bool skip_snapshot_check = false) {
return AddFile(column_family, std::vector<std::string>(1, file_path),
move_file);
move_file, skip_snapshot_check);
}
#if defined(__GNUC__) || defined(__clang__)
__attribute__((__deprecated__))
@ -840,18 +842,18 @@ class DB {
__declspec(deprecated)
#endif
virtual Status
AddFile(const std::string& file_path, bool move_file = false) {
AddFile(const std::string& file_path, bool move_file = false, bool skip_snapshot_check = false) {
return AddFile(DefaultColumnFamily(),
std::vector<std::string>(1, file_path), move_file);
std::vector<std::string>(1, file_path), move_file, skip_snapshot_check);
}
// Load table file with information "file_info" into "column_family"
virtual Status AddFile(ColumnFamilyHandle* column_family,
const std::vector<ExternalSstFileInfo>& file_info_list,
bool move_file = false) = 0;
bool move_file = false, bool skip_snapshot_check = false) = 0;
virtual Status AddFile(const std::vector<ExternalSstFileInfo>& file_info_list,
bool move_file = false) {
return AddFile(DefaultColumnFamily(), file_info_list, move_file);
bool move_file = false, bool skip_snapshot_check = false) {
return AddFile(DefaultColumnFamily(), file_info_list, move_file, skip_snapshot_check);
}
#if defined(__GNUC__) || defined(__clang__)
__attribute__((__deprecated__))
@ -860,9 +862,9 @@ class DB {
#endif
virtual Status
AddFile(ColumnFamilyHandle* column_family,
const ExternalSstFileInfo* file_info, bool move_file = false) {
const ExternalSstFileInfo* file_info, bool move_file = false, bool skip_snapshot_check = false) {
return AddFile(column_family,
std::vector<ExternalSstFileInfo>(1, *file_info), move_file);
std::vector<ExternalSstFileInfo>(1, *file_info), move_file, skip_snapshot_check);
}
#if defined(__GNUC__) || defined(__clang__)
__attribute__((__deprecated__))
@ -870,9 +872,9 @@ class DB {
__declspec(deprecated)
#endif
virtual Status
AddFile(const ExternalSstFileInfo* file_info, bool move_file = false) {
AddFile(const ExternalSstFileInfo* file_info, bool move_file = false, bool skip_snapshot_check = false) {
return AddFile(DefaultColumnFamily(),
std::vector<ExternalSstFileInfo>(1, *file_info), move_file);
std::vector<ExternalSstFileInfo>(1, *file_info), move_file, skip_snapshot_check);
}
#endif // ROCKSDB_LITE

View file

@ -71,13 +71,13 @@ class StackableDB : public DB {
using DB::AddFile;
virtual Status AddFile(ColumnFamilyHandle* column_family,
const std::vector<ExternalSstFileInfo>& file_info_list,
bool move_file) override {
return db_->AddFile(column_family, file_info_list, move_file);
bool move_file, bool skip_snapshot_check) override {
return db_->AddFile(column_family, file_info_list, move_file, skip_snapshot_check);
}
virtual Status AddFile(ColumnFamilyHandle* column_family,
const std::vector<std::string>& file_path_list,
bool move_file) override {
return db_->AddFile(column_family, file_path_list, move_file);
bool move_file, bool skip_snapshot_check) override {
return db_->AddFile(column_family, file_path_list, move_file, skip_snapshot_check);
}
using DB::KeyMayExist;