mirror of https://github.com/facebook/rocksdb.git
Support range deletion tombstones in IngestExternalFile SSTs (#3778)
Summary: Fixes #3391. This change adds a `DeleteRange` method to `SstFileWriter` and adds support for ingesting SSTs with range deletion tombstones. This is important for applications that need to atomically ingest SSTs while clearing out any existing keys in a given key range. Pull Request resolved: https://github.com/facebook/rocksdb/pull/3778 Differential Revision: D8821836 Pulled By: anand1976 fbshipit-source-id: ca7786c1947ff129afa703dab011d524c7883844
This commit is contained in:
parent
91d7c03cdc
commit
ef7815b803
2
USERS.md
2
USERS.md
|
@ -26,7 +26,7 @@ Learn more about those use cases in a Tech Talk by Ankit Gupta and Naveen Somasu
|
|||
Yahoo is using RocksDB as a storage engine for their biggest distributed data store Sherpa. Learn more about it here: http://yahooeng.tumblr.com/post/120730204806/sherpa-scales-new-heights
|
||||
|
||||
## CockroachDB
|
||||
CockroachDB is an open-source geo-replicated transactional database (still in development). They are using RocksDB as their storage engine. Check out their github: https://github.com/cockroachdb/cockroach
|
||||
CockroachDB is an open-source geo-replicated transactional database. They are using RocksDB as their storage engine. Check out their github: https://github.com/cockroachdb/cockroach
|
||||
|
||||
## DNANexus
|
||||
DNANexus is using RocksDB to speed up processing of genomics data.
|
||||
|
|
|
@ -23,12 +23,6 @@ class DBRangeDelTest : public DBTestBase {
|
|||
}
|
||||
};
|
||||
|
||||
const int kRangeDelSkipConfigs =
|
||||
// Plain tables do not support range deletions.
|
||||
DBRangeDelTest::kSkipPlainTable |
|
||||
// MmapReads disables the iterator pinning that RangeDelAggregator requires.
|
||||
DBRangeDelTest::kSkipMmapReads;
|
||||
|
||||
// PlainTableFactory and NumTableFilesAtLevel() are not supported in
|
||||
// ROCKSDB_LITE
|
||||
#ifndef ROCKSDB_LITE
|
||||
|
@ -39,17 +33,17 @@ TEST_F(DBRangeDelTest, NonBlockBasedTableNotSupported) {
|
|||
for (auto config : {kPlainTableAllBytesPrefix, /* kWalDirAndMmapReads */}) {
|
||||
option_config_ = config;
|
||||
DestroyAndReopen(CurrentOptions());
|
||||
ASSERT_TRUE(
|
||||
db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), "dr1", "dr1")
|
||||
.IsNotSupported());
|
||||
ASSERT_TRUE(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
|
||||
"dr1", "dr1")
|
||||
.IsNotSupported());
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(DBRangeDelTest, FlushOutputHasOnlyRangeTombstones) {
|
||||
do {
|
||||
DestroyAndReopen(CurrentOptions());
|
||||
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), "dr1",
|
||||
"dr2"));
|
||||
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
|
||||
"dr1", "dr2"));
|
||||
ASSERT_OK(db_->Flush(FlushOptions()));
|
||||
ASSERT_EQ(1, NumTableFilesAtLevel(0));
|
||||
} while (ChangeOptions(kRangeDelSkipConfigs));
|
||||
|
|
|
@ -733,6 +733,13 @@ class DBTestBase : public testing::Test {
|
|||
kSkipMmapReads = 256,
|
||||
};
|
||||
|
||||
const int kRangeDelSkipConfigs =
|
||||
// Plain tables do not support range deletions.
|
||||
kSkipPlainTable |
|
||||
// MmapReads disables the iterator pinning that RangeDelAggregator
|
||||
// requires.
|
||||
kSkipMmapReads;
|
||||
|
||||
explicit DBTestBase(const std::string path);
|
||||
|
||||
~DBTestBase();
|
||||
|
|
|
@ -39,7 +39,8 @@ class ExternalSSTFileBasicTest : public DBTestBase {
|
|||
|
||||
Status GenerateAndAddExternalFile(
|
||||
const Options options, std::vector<int> keys,
|
||||
const std::vector<ValueType>& value_types, int file_id,
|
||||
const std::vector<ValueType>& value_types,
|
||||
std::vector<std::pair<int, int>> range_deletions, int file_id,
|
||||
std::map<std::string, std::string>* true_data) {
|
||||
assert(value_types.size() == 1 || keys.size() == value_types.size());
|
||||
std::string file_path = sst_files_dir_ + ToString(file_id);
|
||||
|
@ -49,6 +50,29 @@ class ExternalSSTFileBasicTest : public DBTestBase {
|
|||
if (!s.ok()) {
|
||||
return s;
|
||||
}
|
||||
for (size_t i = 0; i < range_deletions.size(); i++) {
|
||||
// Account for the effect of range deletions on true_data before
|
||||
// all point operators, even though sst_file_writer.DeleteRange
|
||||
// must be called before other sst_file_writer methods. This is
|
||||
// because point writes take precedence over range deletions
|
||||
// in the same ingested sst.
|
||||
std::string start_key = Key(range_deletions[i].first);
|
||||
std::string end_key = Key(range_deletions[i].second);
|
||||
s = sst_file_writer.DeleteRange(start_key, end_key);
|
||||
if (!s.ok()) {
|
||||
sst_file_writer.Finish();
|
||||
return s;
|
||||
}
|
||||
auto start_key_it = true_data->find(start_key);
|
||||
if (start_key_it == true_data->end()) {
|
||||
start_key_it = true_data->upper_bound(start_key);
|
||||
}
|
||||
auto end_key_it = true_data->find(end_key);
|
||||
if (end_key_it == true_data->end()) {
|
||||
end_key_it = true_data->upper_bound(end_key);
|
||||
}
|
||||
true_data->erase(start_key_it, end_key_it);
|
||||
}
|
||||
for (size_t i = 0; i < keys.size(); i++) {
|
||||
std::string key = Key(keys[i]);
|
||||
std::string value = Key(keys[i]) + ToString(file_id);
|
||||
|
@ -86,6 +110,14 @@ class ExternalSSTFileBasicTest : public DBTestBase {
|
|||
return s;
|
||||
}
|
||||
|
||||
Status GenerateAndAddExternalFile(
|
||||
const Options options, std::vector<int> keys,
|
||||
const std::vector<ValueType>& value_types, int file_id,
|
||||
std::map<std::string, std::string>* true_data) {
|
||||
return GenerateAndAddExternalFile(options, keys, value_types, {}, file_id,
|
||||
true_data);
|
||||
}
|
||||
|
||||
Status GenerateAndAddExternalFile(
|
||||
const Options options, std::vector<int> keys, const ValueType value_type,
|
||||
int file_id, std::map<std::string, std::string>* true_data) {
|
||||
|
@ -126,9 +158,14 @@ TEST_F(ExternalSSTFileBasicTest, Basic) {
|
|||
ASSERT_EQ(file1_info.num_entries, 100);
|
||||
ASSERT_EQ(file1_info.smallest_key, Key(0));
|
||||
ASSERT_EQ(file1_info.largest_key, Key(99));
|
||||
ASSERT_EQ(file1_info.num_range_del_entries, 0);
|
||||
ASSERT_EQ(file1_info.smallest_range_del_key, "");
|
||||
ASSERT_EQ(file1_info.largest_range_del_key, "");
|
||||
// sst_file_writer already finished, cannot add this value
|
||||
s = sst_file_writer.Put(Key(100), "bad_val");
|
||||
ASSERT_FALSE(s.ok()) << s.ToString();
|
||||
s = sst_file_writer.DeleteRange(Key(100), Key(200));
|
||||
ASSERT_FALSE(s.ok()) << s.ToString();
|
||||
|
||||
DestroyAndReopen(options);
|
||||
// Add file using file path
|
||||
|
@ -189,6 +226,7 @@ TEST_F(ExternalSSTFileBasicTest, NoCopy) {
|
|||
ASSERT_EQ(file3_info.num_entries, 15);
|
||||
ASSERT_EQ(file3_info.smallest_key, Key(110));
|
||||
ASSERT_EQ(file3_info.largest_key, Key(124));
|
||||
|
||||
s = DeprecatedAddFile({file1}, true /* move file */);
|
||||
ASSERT_TRUE(s.ok()) << s.ToString();
|
||||
ASSERT_EQ(Status::NotFound(), env_->FileExists(file1));
|
||||
|
@ -197,8 +235,8 @@ TEST_F(ExternalSSTFileBasicTest, NoCopy) {
|
|||
ASSERT_TRUE(s.ok()) << s.ToString();
|
||||
ASSERT_OK(env_->FileExists(file2));
|
||||
|
||||
// This file have overlapping values with the existing data
|
||||
s = DeprecatedAddFile({file2}, true /* move file */);
|
||||
// This file has overlapping values with the existing data
|
||||
s = DeprecatedAddFile({file3}, true /* move file */);
|
||||
ASSERT_FALSE(s.ok()) << s.ToString();
|
||||
ASSERT_OK(env_->FileExists(file3));
|
||||
|
||||
|
@ -218,34 +256,33 @@ TEST_F(ExternalSSTFileBasicTest, IngestFileWithGlobalSeqnoPickedSeqno) {
|
|||
ASSERT_OK(GenerateAndAddExternalFile(options, {1, 2, 3, 4, 5, 6},
|
||||
ValueType::kTypeValue, file_id++,
|
||||
&true_data));
|
||||
// File dont overwrite any keys, No seqno needed
|
||||
// File doesn't overwrite any keys, no seqno needed
|
||||
ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 0);
|
||||
|
||||
ASSERT_OK(GenerateAndAddExternalFile(options, {10, 11, 12, 13},
|
||||
ValueType::kTypeValue, file_id++,
|
||||
|
||||
&true_data));
|
||||
// File dont overwrite any keys, No seqno needed
|
||||
// File doesn't overwrite any keys, no seqno needed
|
||||
ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 0);
|
||||
|
||||
ASSERT_OK(GenerateAndAddExternalFile(
|
||||
options, {1, 4, 6}, ValueType::kTypeValue, file_id++, &true_data));
|
||||
// File overwrite some keys, a seqno will be assigned
|
||||
// File overwrites some keys, a seqno will be assigned
|
||||
ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 1);
|
||||
|
||||
ASSERT_OK(GenerateAndAddExternalFile(
|
||||
options, {11, 15, 19}, ValueType::kTypeValue, file_id++, &true_data));
|
||||
// File overwrite some keys, a seqno will be assigned
|
||||
// File overwrites some keys, a seqno will be assigned
|
||||
ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 2);
|
||||
|
||||
ASSERT_OK(GenerateAndAddExternalFile(
|
||||
options, {120, 130}, ValueType::kTypeValue, file_id++, &true_data));
|
||||
// File dont overwrite any keys, No seqno needed
|
||||
// File doesn't overwrite any keys, no seqno needed
|
||||
ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 2);
|
||||
|
||||
ASSERT_OK(GenerateAndAddExternalFile(
|
||||
options, {1, 130}, ValueType::kTypeValue, file_id++, &true_data));
|
||||
// File overwrite some keys, a seqno will be assigned
|
||||
// File overwrites some keys, a seqno will be assigned
|
||||
ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 3);
|
||||
|
||||
// Write some keys through normal write path
|
||||
|
@ -257,17 +294,17 @@ TEST_F(ExternalSSTFileBasicTest, IngestFileWithGlobalSeqnoPickedSeqno) {
|
|||
|
||||
ASSERT_OK(GenerateAndAddExternalFile(
|
||||
options, {60, 61, 62}, ValueType::kTypeValue, file_id++, &true_data));
|
||||
// File dont overwrite any keys, No seqno needed
|
||||
// File doesn't overwrite any keys, no seqno needed
|
||||
ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno);
|
||||
|
||||
ASSERT_OK(GenerateAndAddExternalFile(
|
||||
options, {40, 41, 42}, ValueType::kTypeValue, file_id++, &true_data));
|
||||
// File overwrite some keys, a seqno will be assigned
|
||||
// File overwrites some keys, a seqno will be assigned
|
||||
ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno + 1);
|
||||
|
||||
ASSERT_OK(GenerateAndAddExternalFile(
|
||||
options, {20, 30, 40}, ValueType::kTypeValue, file_id++, &true_data));
|
||||
// File overwrite some keys, a seqno will be assigned
|
||||
// File overwrites some keys, a seqno will be assigned
|
||||
ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno + 2);
|
||||
|
||||
const Snapshot* snapshot = db_->GetSnapshot();
|
||||
|
@ -314,37 +351,54 @@ TEST_F(ExternalSSTFileBasicTest, IngestFileWithMultipleValueType) {
|
|||
ASSERT_OK(GenerateAndAddExternalFile(options, {1, 2, 3, 4, 5, 6},
|
||||
ValueType::kTypeValue, file_id++,
|
||||
&true_data));
|
||||
// File dont overwrite any keys, No seqno needed
|
||||
// File doesn't overwrite any keys, no seqno needed
|
||||
ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 0);
|
||||
|
||||
ASSERT_OK(GenerateAndAddExternalFile(options, {10, 11, 12, 13},
|
||||
ValueType::kTypeValue, file_id++,
|
||||
|
||||
&true_data));
|
||||
// File dont overwrite any keys, No seqno needed
|
||||
// File doesn't overwrite any keys, no seqno needed
|
||||
ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 0);
|
||||
|
||||
ASSERT_OK(GenerateAndAddExternalFile(
|
||||
options, {1, 4, 6}, ValueType::kTypeMerge, file_id++, &true_data));
|
||||
// File overwrite some keys, a seqno will be assigned
|
||||
// File overwrites some keys, a seqno will be assigned
|
||||
ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 1);
|
||||
|
||||
ASSERT_OK(GenerateAndAddExternalFile(options, {11, 15, 19},
|
||||
ValueType::kTypeDeletion, file_id++,
|
||||
&true_data));
|
||||
// File overwrite some keys, a seqno will be assigned
|
||||
// File overwrites some keys, a seqno will be assigned
|
||||
ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 2);
|
||||
|
||||
ASSERT_OK(GenerateAndAddExternalFile(
|
||||
options, {120, 130}, ValueType::kTypeMerge, file_id++, &true_data));
|
||||
// File dont overwrite any keys, No seqno needed
|
||||
// File doesn't overwrite any keys, no seqno needed
|
||||
ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 2);
|
||||
|
||||
ASSERT_OK(GenerateAndAddExternalFile(
|
||||
options, {1, 130}, ValueType::kTypeDeletion, file_id++, &true_data));
|
||||
// File overwrite some keys, a seqno will be assigned
|
||||
// File overwrites some keys, a seqno will be assigned
|
||||
ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 3);
|
||||
|
||||
ASSERT_OK(GenerateAndAddExternalFile(options, {120},
|
||||
{ValueType::kTypeValue}, {{120, 135}},
|
||||
file_id++, &true_data));
|
||||
// File overwrites some keys, a seqno will be assigned
|
||||
ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 4);
|
||||
|
||||
ASSERT_OK(GenerateAndAddExternalFile(options, {}, {}, {{110, 120}},
|
||||
file_id++, &true_data));
|
||||
// The range deletion ends on a key, but it doesn't actually delete
|
||||
// this key because the largest key in the range is exclusive. Still,
|
||||
// it counts as an overlap so a new seqno will be assigned.
|
||||
ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 5);
|
||||
|
||||
ASSERT_OK(GenerateAndAddExternalFile(options, {}, {}, {{100, 109}},
|
||||
file_id++, &true_data));
|
||||
// File doesn't overwrite any keys, no seqno needed
|
||||
ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 5);
|
||||
|
||||
// Write some keys through normal write path
|
||||
for (int i = 0; i < 50; i++) {
|
||||
ASSERT_OK(Put(Key(i), "memtable"));
|
||||
|
@ -354,18 +408,18 @@ TEST_F(ExternalSSTFileBasicTest, IngestFileWithMultipleValueType) {
|
|||
|
||||
ASSERT_OK(GenerateAndAddExternalFile(
|
||||
options, {60, 61, 62}, ValueType::kTypeValue, file_id++, &true_data));
|
||||
// File dont overwrite any keys, No seqno needed
|
||||
// File doesn't overwrite any keys, no seqno needed
|
||||
ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno);
|
||||
|
||||
ASSERT_OK(GenerateAndAddExternalFile(
|
||||
options, {40, 41, 42}, ValueType::kTypeMerge, file_id++, &true_data));
|
||||
// File overwrite some keys, a seqno will be assigned
|
||||
// File overwrites some keys, a seqno will be assigned
|
||||
ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno + 1);
|
||||
|
||||
ASSERT_OK(GenerateAndAddExternalFile(options, {20, 30, 40},
|
||||
ValueType::kTypeDeletion, file_id++,
|
||||
&true_data));
|
||||
// File overwrite some keys, a seqno will be assigned
|
||||
// File overwrites some keys, a seqno will be assigned
|
||||
ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno + 2);
|
||||
|
||||
const Snapshot* snapshot = db_->GetSnapshot();
|
||||
|
@ -414,7 +468,7 @@ TEST_F(ExternalSSTFileBasicTest, IngestFileWithMixedValueType) {
|
|||
{ValueType::kTypeValue, ValueType::kTypeMerge, ValueType::kTypeValue,
|
||||
ValueType::kTypeMerge, ValueType::kTypeValue, ValueType::kTypeMerge},
|
||||
file_id++, &true_data));
|
||||
// File dont overwrite any keys, No seqno needed
|
||||
// File doesn't overwrite any keys, no seqno needed
|
||||
ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 0);
|
||||
|
||||
ASSERT_OK(GenerateAndAddExternalFile(
|
||||
|
@ -422,35 +476,58 @@ TEST_F(ExternalSSTFileBasicTest, IngestFileWithMixedValueType) {
|
|||
{ValueType::kTypeValue, ValueType::kTypeMerge, ValueType::kTypeValue,
|
||||
ValueType::kTypeMerge},
|
||||
file_id++, &true_data));
|
||||
// File dont overwrite any keys, No seqno needed
|
||||
// File doesn't overwrite any keys, no seqno needed
|
||||
ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 0);
|
||||
|
||||
ASSERT_OK(GenerateAndAddExternalFile(
|
||||
options, {1, 4, 6}, {ValueType::kTypeDeletion, ValueType::kTypeValue,
|
||||
ValueType::kTypeMerge},
|
||||
file_id++, &true_data));
|
||||
// File overwrite some keys, a seqno will be assigned
|
||||
// File overwrites some keys, a seqno will be assigned
|
||||
ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 1);
|
||||
|
||||
ASSERT_OK(GenerateAndAddExternalFile(
|
||||
options, {11, 15, 19}, {ValueType::kTypeDeletion, ValueType::kTypeMerge,
|
||||
ValueType::kTypeValue},
|
||||
file_id++, &true_data));
|
||||
// File overwrite some keys, a seqno will be assigned
|
||||
// File overwrites some keys, a seqno will be assigned
|
||||
ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 2);
|
||||
|
||||
ASSERT_OK(GenerateAndAddExternalFile(
|
||||
options, {120, 130}, {ValueType::kTypeValue, ValueType::kTypeMerge},
|
||||
file_id++, &true_data));
|
||||
// File dont overwrite any keys, No seqno needed
|
||||
// File doesn't overwrite any keys, no seqno needed
|
||||
ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 2);
|
||||
|
||||
ASSERT_OK(GenerateAndAddExternalFile(
|
||||
options, {1, 130}, {ValueType::kTypeMerge, ValueType::kTypeDeletion},
|
||||
file_id++, &true_data));
|
||||
// File overwrite some keys, a seqno will be assigned
|
||||
// File overwrites some keys, a seqno will be assigned
|
||||
ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 3);
|
||||
|
||||
ASSERT_OK(GenerateAndAddExternalFile(
|
||||
options, {150, 151, 152},
|
||||
{ValueType::kTypeValue, ValueType::kTypeMerge,
|
||||
ValueType::kTypeDeletion},
|
||||
{{150, 160}, {180, 190}}, file_id++, &true_data));
|
||||
// File doesn't overwrite any keys, no seqno needed
|
||||
ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 3);
|
||||
|
||||
ASSERT_OK(GenerateAndAddExternalFile(
|
||||
options, {150, 151, 152},
|
||||
{ValueType::kTypeValue, ValueType::kTypeMerge, ValueType::kTypeValue},
|
||||
{{200, 250}}, file_id++, &true_data));
|
||||
// File overwrites some keys, a seqno will be assigned
|
||||
ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 4);
|
||||
|
||||
ASSERT_OK(GenerateAndAddExternalFile(
|
||||
options, {300, 301, 302},
|
||||
{ValueType::kTypeValue, ValueType::kTypeMerge,
|
||||
ValueType::kTypeDeletion},
|
||||
{{1, 2}, {152, 154}}, file_id++, &true_data));
|
||||
// File overwrites some keys, a seqno will be assigned
|
||||
ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 5);
|
||||
|
||||
// Write some keys through normal write path
|
||||
for (int i = 0; i < 50; i++) {
|
||||
ASSERT_OK(Put(Key(i), "memtable"));
|
||||
|
@ -462,14 +539,15 @@ TEST_F(ExternalSSTFileBasicTest, IngestFileWithMixedValueType) {
|
|||
options, {60, 61, 62},
|
||||
{ValueType::kTypeValue, ValueType::kTypeMerge, ValueType::kTypeValue},
|
||||
file_id++, &true_data));
|
||||
// File dont overwrite any keys, No seqno needed
|
||||
// File doesn't overwrite any keys, no seqno needed
|
||||
ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno);
|
||||
|
||||
ASSERT_OK(GenerateAndAddExternalFile(
|
||||
options, {40, 41, 42}, {ValueType::kTypeValue, ValueType::kTypeDeletion,
|
||||
ValueType::kTypeDeletion},
|
||||
options, {40, 41, 42},
|
||||
{ValueType::kTypeValue, ValueType::kTypeDeletion,
|
||||
ValueType::kTypeDeletion},
|
||||
file_id++, &true_data));
|
||||
// File overwrite some keys, a seqno will be assigned
|
||||
// File overwrites some keys, a seqno will be assigned
|
||||
ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno + 1);
|
||||
|
||||
ASSERT_OK(GenerateAndAddExternalFile(
|
||||
|
@ -477,7 +555,7 @@ TEST_F(ExternalSSTFileBasicTest, IngestFileWithMixedValueType) {
|
|||
{ValueType::kTypeDeletion, ValueType::kTypeDeletion,
|
||||
ValueType::kTypeDeletion},
|
||||
file_id++, &true_data));
|
||||
// File overwrite some keys, a seqno will be assigned
|
||||
// File overwrites some keys, a seqno will be assigned
|
||||
ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno + 2);
|
||||
|
||||
const Snapshot* snapshot = db_->GetSnapshot();
|
||||
|
@ -589,7 +667,7 @@ TEST_F(ExternalSSTFileBasicTest, IngestionWithRangeDeletions) {
|
|||
SequenceNumber last_seqno = dbfull()->GetLatestSequenceNumber();
|
||||
ASSERT_OK(GenerateAndAddExternalFile(
|
||||
options, {60, 90}, {ValueType::kTypeValue, ValueType::kTypeValue},
|
||||
file_id++, &true_data));
|
||||
{{65, 70}, {70, 85}}, file_id++, &true_data));
|
||||
ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), ++last_seqno);
|
||||
ASSERT_EQ(2, NumTableFilesAtLevel(0));
|
||||
ASSERT_EQ(0, NumTableFilesAtLevel(kNumLevels - 2));
|
||||
|
@ -605,6 +683,16 @@ TEST_F(ExternalSSTFileBasicTest, IngestionWithRangeDeletions) {
|
|||
ASSERT_EQ(1, NumTableFilesAtLevel(kNumLevels - 2));
|
||||
ASSERT_EQ(1, NumTableFilesAtLevel(options.num_levels - 1));
|
||||
|
||||
// overlaps with L5 file but not memtable or L0 file, so flush is skipped and
|
||||
// file is ingested into L4
|
||||
ASSERT_OK(GenerateAndAddExternalFile(options, {}, {}, {{5, 15}}, file_id++,
|
||||
&true_data));
|
||||
ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), ++last_seqno);
|
||||
ASSERT_EQ(2, NumTableFilesAtLevel(0));
|
||||
ASSERT_EQ(1, NumTableFilesAtLevel(kNumLevels - 2));
|
||||
ASSERT_EQ(1, NumTableFilesAtLevel(options.num_levels - 2));
|
||||
ASSERT_EQ(1, NumTableFilesAtLevel(options.num_levels - 1));
|
||||
|
||||
// ingested file overlaps with memtable, so flush is triggered before the file
|
||||
// is ingested such that the ingested data is considered newest. So L0 file
|
||||
// count increases by two.
|
||||
|
@ -623,7 +711,7 @@ TEST_F(ExternalSSTFileBasicTest, IngestionWithRangeDeletions) {
|
|||
// seqnum.
|
||||
ASSERT_OK(GenerateAndAddExternalFile(
|
||||
options, {151, 175}, {ValueType::kTypeValue, ValueType::kTypeValue},
|
||||
file_id++, &true_data));
|
||||
{{160, 200}}, file_id++, &true_data));
|
||||
ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno);
|
||||
ASSERT_EQ(4, NumTableFilesAtLevel(0));
|
||||
ASSERT_EQ(1, NumTableFilesAtLevel(kNumLevels - 2));
|
||||
|
|
|
@ -78,7 +78,7 @@ Status ExternalSstFileIngestionJob::Prepare(
|
|||
}
|
||||
|
||||
for (IngestedFileInfo& f : files_to_ingest_) {
|
||||
if (f.num_entries == 0) {
|
||||
if (f.num_entries == 0 && f.num_range_deletions == 0) {
|
||||
return Status::InvalidArgument("File contain no entries");
|
||||
}
|
||||
|
||||
|
@ -358,6 +358,7 @@ Status ExternalSstFileIngestionJob::GetIngestedFileInfo(
|
|||
}
|
||||
// Get number of entries in table
|
||||
file_to_ingest->num_entries = props->num_entries;
|
||||
file_to_ingest->num_range_deletions = props->num_range_deletions;
|
||||
|
||||
ParsedInternalKey key;
|
||||
ReadOptions ro;
|
||||
|
@ -369,26 +370,55 @@ Status ExternalSstFileIngestionJob::GetIngestedFileInfo(
|
|||
ro.fill_cache = false;
|
||||
std::unique_ptr<InternalIterator> iter(table_reader->NewIterator(
|
||||
ro, sv->mutable_cf_options.prefix_extractor.get()));
|
||||
std::unique_ptr<InternalIterator> range_del_iter(
|
||||
table_reader->NewRangeTombstoneIterator(ro));
|
||||
|
||||
// Get first (smallest) key from file
|
||||
// Get first (smallest) and last (largest) key from file.
|
||||
bool bounds_set = false;
|
||||
iter->SeekToFirst();
|
||||
if (!ParseInternalKey(iter->key(), &key)) {
|
||||
return Status::Corruption("external file have corrupted keys");
|
||||
}
|
||||
if (key.sequence != 0) {
|
||||
return Status::Corruption("external file have non zero sequence number");
|
||||
}
|
||||
file_to_ingest->smallest_user_key = key.user_key.ToString();
|
||||
if (iter->Valid()) {
|
||||
if (!ParseInternalKey(iter->key(), &key)) {
|
||||
return Status::Corruption("external file have corrupted keys");
|
||||
}
|
||||
if (key.sequence != 0) {
|
||||
return Status::Corruption("external file have non zero sequence number");
|
||||
}
|
||||
file_to_ingest->smallest_user_key = key.user_key.ToString();
|
||||
|
||||
// Get last (largest) key from file
|
||||
iter->SeekToLast();
|
||||
if (!ParseInternalKey(iter->key(), &key)) {
|
||||
return Status::Corruption("external file have corrupted keys");
|
||||
iter->SeekToLast();
|
||||
if (!ParseInternalKey(iter->key(), &key)) {
|
||||
return Status::Corruption("external file have corrupted keys");
|
||||
}
|
||||
if (key.sequence != 0) {
|
||||
return Status::Corruption("external file have non zero sequence number");
|
||||
}
|
||||
file_to_ingest->largest_user_key = key.user_key.ToString();
|
||||
|
||||
bounds_set = true;
|
||||
}
|
||||
if (key.sequence != 0) {
|
||||
return Status::Corruption("external file have non zero sequence number");
|
||||
|
||||
// We may need to adjust these key bounds, depending on whether any range
|
||||
// deletion tombstones extend past them.
|
||||
const Comparator* ucmp = cfd_->internal_comparator().user_comparator();
|
||||
if (range_del_iter != nullptr) {
|
||||
for (range_del_iter->SeekToFirst(); range_del_iter->Valid();
|
||||
range_del_iter->Next()) {
|
||||
if (!ParseInternalKey(range_del_iter->key(), &key)) {
|
||||
return Status::Corruption("external file have corrupted keys");
|
||||
}
|
||||
RangeTombstone tombstone(key, range_del_iter->value());
|
||||
|
||||
if (!bounds_set || ucmp->Compare(tombstone.start_key_,
|
||||
file_to_ingest->smallest_user_key) < 0) {
|
||||
file_to_ingest->smallest_user_key = tombstone.start_key_.ToString();
|
||||
}
|
||||
if (!bounds_set || ucmp->Compare(tombstone.end_key_,
|
||||
file_to_ingest->largest_user_key) > 0) {
|
||||
file_to_ingest->largest_user_key = tombstone.end_key_.ToString();
|
||||
}
|
||||
bounds_set = true;
|
||||
}
|
||||
}
|
||||
file_to_ingest->largest_user_key = key.user_key.ToString();
|
||||
|
||||
file_to_ingest->cf_id = static_cast<uint32_t>(props->column_family_id);
|
||||
|
||||
|
|
|
@ -36,6 +36,8 @@ struct IngestedFileInfo {
|
|||
uint64_t file_size;
|
||||
// total number of keys in external file
|
||||
uint64_t num_entries;
|
||||
// total number of range deletions in external file
|
||||
uint64_t num_range_deletions;
|
||||
// Id of column family this file shoule be ingested into
|
||||
uint32_t cf_id;
|
||||
// TableProperties read from external file
|
||||
|
|
|
@ -222,6 +222,9 @@ TEST_F(ExternalSSTFileTest, Basic) {
|
|||
ASSERT_EQ(file1_info.num_entries, 100);
|
||||
ASSERT_EQ(file1_info.smallest_key, Key(0));
|
||||
ASSERT_EQ(file1_info.largest_key, Key(99));
|
||||
ASSERT_EQ(file1_info.num_range_del_entries, 0);
|
||||
ASSERT_EQ(file1_info.smallest_range_del_key, "");
|
||||
ASSERT_EQ(file1_info.largest_range_del_key, "");
|
||||
// sst_file_writer already finished, cannot add this value
|
||||
s = sst_file_writer.Put(Key(100), "bad_val");
|
||||
ASSERT_FALSE(s.ok()) << s.ToString();
|
||||
|
@ -290,6 +293,58 @@ TEST_F(ExternalSSTFileTest, Basic) {
|
|||
ASSERT_EQ(file5_info.smallest_key, Key(400));
|
||||
ASSERT_EQ(file5_info.largest_key, Key(499));
|
||||
|
||||
// file6.sst (delete 400 => 500)
|
||||
std::string file6 = sst_files_dir_ + "file6.sst";
|
||||
ASSERT_OK(sst_file_writer.Open(file6));
|
||||
sst_file_writer.DeleteRange(Key(400), Key(500));
|
||||
ExternalSstFileInfo file6_info;
|
||||
s = sst_file_writer.Finish(&file6_info);
|
||||
ASSERT_TRUE(s.ok()) << s.ToString();
|
||||
ASSERT_EQ(file6_info.file_path, file6);
|
||||
ASSERT_EQ(file6_info.num_entries, 0);
|
||||
ASSERT_EQ(file6_info.smallest_key, "");
|
||||
ASSERT_EQ(file6_info.largest_key, "");
|
||||
ASSERT_EQ(file6_info.num_range_del_entries, 1);
|
||||
ASSERT_EQ(file6_info.smallest_range_del_key, Key(400));
|
||||
ASSERT_EQ(file6_info.largest_range_del_key, Key(500));
|
||||
|
||||
// file7.sst (delete 500 => 570, put 520 => 599 divisible by 2)
|
||||
std::string file7 = sst_files_dir_ + "file7.sst";
|
||||
ASSERT_OK(sst_file_writer.Open(file7));
|
||||
sst_file_writer.DeleteRange(Key(500), Key(550));
|
||||
for (int k = 520; k < 560; k += 2) {
|
||||
ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val"));
|
||||
}
|
||||
sst_file_writer.DeleteRange(Key(525), Key(575));
|
||||
for (int k = 560; k < 600; k += 2) {
|
||||
ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val"));
|
||||
}
|
||||
ExternalSstFileInfo file7_info;
|
||||
s = sst_file_writer.Finish(&file7_info);
|
||||
ASSERT_TRUE(s.ok()) << s.ToString();
|
||||
ASSERT_EQ(file7_info.file_path, file7);
|
||||
ASSERT_EQ(file7_info.num_entries, 40);
|
||||
ASSERT_EQ(file7_info.smallest_key, Key(520));
|
||||
ASSERT_EQ(file7_info.largest_key, Key(598));
|
||||
ASSERT_EQ(file7_info.num_range_del_entries, 2);
|
||||
ASSERT_EQ(file7_info.smallest_range_del_key, Key(500));
|
||||
ASSERT_EQ(file7_info.largest_range_del_key, Key(575));
|
||||
|
||||
// file8.sst (delete 600 => 700)
|
||||
std::string file8 = sst_files_dir_ + "file8.sst";
|
||||
ASSERT_OK(sst_file_writer.Open(file8));
|
||||
sst_file_writer.DeleteRange(Key(600), Key(700));
|
||||
ExternalSstFileInfo file8_info;
|
||||
s = sst_file_writer.Finish(&file8_info);
|
||||
ASSERT_TRUE(s.ok()) << s.ToString();
|
||||
ASSERT_EQ(file8_info.file_path, file8);
|
||||
ASSERT_EQ(file8_info.num_entries, 0);
|
||||
ASSERT_EQ(file8_info.smallest_key, "");
|
||||
ASSERT_EQ(file8_info.largest_key, "");
|
||||
ASSERT_EQ(file8_info.num_range_del_entries, 1);
|
||||
ASSERT_EQ(file8_info.smallest_range_del_key, Key(600));
|
||||
ASSERT_EQ(file8_info.largest_range_del_key, Key(700));
|
||||
|
||||
// Cannot create an empty sst file
|
||||
std::string file_empty = sst_files_dir_ + "file_empty.sst";
|
||||
ExternalSstFileInfo file_empty_info;
|
||||
|
@ -336,6 +391,16 @@ TEST_F(ExternalSSTFileTest, Basic) {
|
|||
// Key range of file5 (400 => 499) dont overlap with any keys in DB
|
||||
ASSERT_OK(DeprecatedAddFile({file5}));
|
||||
|
||||
// This file has overlapping values with the existing data
|
||||
s = DeprecatedAddFile({file6});
|
||||
ASSERT_FALSE(s.ok()) << s.ToString();
|
||||
|
||||
// Key range of file7 (500 => 598) dont overlap with any keys in DB
|
||||
ASSERT_OK(DeprecatedAddFile({file7}));
|
||||
|
||||
// Key range of file7 (600 => 700) dont overlap with any keys in DB
|
||||
ASSERT_OK(DeprecatedAddFile({file8}));
|
||||
|
||||
// Make sure values are correct before and after flush/compaction
|
||||
for (int i = 0; i < 2; i++) {
|
||||
for (int k = 0; k < 200; k++) {
|
||||
|
@ -349,6 +414,13 @@ TEST_F(ExternalSSTFileTest, Basic) {
|
|||
std::string value = Key(k) + "_val";
|
||||
ASSERT_EQ(Get(Key(k)), value);
|
||||
}
|
||||
for (int k = 500; k < 600; k++) {
|
||||
std::string value = Key(k) + "_val";
|
||||
if (k < 520 || k % 2 == 1) {
|
||||
value = "NOT_FOUND";
|
||||
}
|
||||
ASSERT_EQ(Get(Key(k)), value);
|
||||
}
|
||||
ASSERT_OK(Flush());
|
||||
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
|
||||
}
|
||||
|
@ -377,7 +449,8 @@ TEST_F(ExternalSSTFileTest, Basic) {
|
|||
ASSERT_EQ(Get(Key(k)), value);
|
||||
}
|
||||
DestroyAndRecreateExternalSSTFilesDir();
|
||||
} while (ChangeOptions(kSkipPlainTable | kSkipFIFOCompaction));
|
||||
} while (ChangeOptions(kSkipPlainTable | kSkipFIFOCompaction |
|
||||
kRangeDelSkipConfigs));
|
||||
}
|
||||
class SstFileWriterCollector : public TablePropertiesCollector {
|
||||
public:
|
||||
|
@ -518,17 +591,57 @@ TEST_F(ExternalSSTFileTest, AddList) {
|
|||
ASSERT_EQ(file5_info.smallest_key, Key(200));
|
||||
ASSERT_EQ(file5_info.largest_key, Key(299));
|
||||
|
||||
// file6.sst (delete 0 => 100)
|
||||
std::string file6 = sst_files_dir_ + "file6.sst";
|
||||
ASSERT_OK(sst_file_writer.Open(file6));
|
||||
ASSERT_OK(sst_file_writer.DeleteRange(Key(0), Key(75)));
|
||||
ASSERT_OK(sst_file_writer.DeleteRange(Key(25), Key(100)));
|
||||
ExternalSstFileInfo file6_info;
|
||||
s = sst_file_writer.Finish(&file6_info);
|
||||
ASSERT_TRUE(s.ok()) << s.ToString();
|
||||
ASSERT_EQ(file6_info.file_path, file6);
|
||||
ASSERT_EQ(file6_info.num_entries, 0);
|
||||
ASSERT_EQ(file6_info.smallest_key, "");
|
||||
ASSERT_EQ(file6_info.largest_key, "");
|
||||
ASSERT_EQ(file6_info.num_range_del_entries, 2);
|
||||
ASSERT_EQ(file6_info.smallest_range_del_key, Key(0));
|
||||
ASSERT_EQ(file6_info.largest_range_del_key, Key(100));
|
||||
|
||||
// file7.sst (delete 100 => 200)
|
||||
std::string file7 = sst_files_dir_ + "file7.sst";
|
||||
ASSERT_OK(sst_file_writer.Open(file7));
|
||||
ASSERT_OK(sst_file_writer.DeleteRange(Key(100), Key(200)));
|
||||
ExternalSstFileInfo file7_info;
|
||||
s = sst_file_writer.Finish(&file7_info);
|
||||
ASSERT_TRUE(s.ok()) << s.ToString();
|
||||
ASSERT_EQ(file7_info.file_path, file7);
|
||||
ASSERT_EQ(file7_info.num_entries, 0);
|
||||
ASSERT_EQ(file7_info.smallest_key, "");
|
||||
ASSERT_EQ(file7_info.largest_key, "");
|
||||
ASSERT_EQ(file7_info.num_range_del_entries, 1);
|
||||
ASSERT_EQ(file7_info.smallest_range_del_key, Key(100));
|
||||
ASSERT_EQ(file7_info.largest_range_del_key, Key(200));
|
||||
|
||||
// list 1 has internal key range conflict
|
||||
std::vector<std::string> file_list0({file1, file2});
|
||||
std::vector<std::string> file_list1({file3, file2, file1});
|
||||
std::vector<std::string> file_list2({file5});
|
||||
std::vector<std::string> file_list3({file3, file4});
|
||||
std::vector<std::string> file_list4({file5, file7});
|
||||
std::vector<std::string> file_list5({file6, file7});
|
||||
|
||||
DestroyAndReopen(options);
|
||||
|
||||
// This list of files have key ranges are overlapping with each other
|
||||
// These lists of files have key ranges that overlap with each other
|
||||
s = DeprecatedAddFile(file_list1);
|
||||
ASSERT_FALSE(s.ok()) << s.ToString();
|
||||
// Both of the following overlap on the end key of a range deletion
|
||||
// tombstone. This is a limitation because these tombstones have exclusive
|
||||
// end keys that should not count as overlapping with other keys.
|
||||
s = DeprecatedAddFile(file_list4);
|
||||
ASSERT_FALSE(s.ok()) << s.ToString();
|
||||
s = DeprecatedAddFile(file_list5);
|
||||
ASSERT_FALSE(s.ok()) << s.ToString();
|
||||
|
||||
// Add files using file path list
|
||||
s = DeprecatedAddFile(file_list0);
|
||||
|
@ -619,7 +732,8 @@ TEST_F(ExternalSSTFileTest, AddList) {
|
|||
ASSERT_EQ(Get(Key(k)), value);
|
||||
}
|
||||
DestroyAndRecreateExternalSSTFilesDir();
|
||||
} while (ChangeOptions(kSkipPlainTable | kSkipFIFOCompaction));
|
||||
} while (ChangeOptions(kSkipPlainTable | kSkipFIFOCompaction |
|
||||
kRangeDelSkipConfigs));
|
||||
}
|
||||
|
||||
TEST_F(ExternalSSTFileTest, AddListAtomicity) {
|
||||
|
|
|
@ -435,10 +435,6 @@ Status RangeDelAggregator::AddTombstones(
|
|||
input->SeekToFirst();
|
||||
bool first_iter = true;
|
||||
while (input->Valid()) {
|
||||
// The tombstone map holds slices into the iterator's memory. This assert
|
||||
// ensures pinning the iterator also pins the keys/values.
|
||||
assert(input->IsKeyPinned() && input->IsValuePinned());
|
||||
|
||||
if (first_iter) {
|
||||
if (rep_ == nullptr) {
|
||||
InitRep({upper_bound_});
|
||||
|
@ -448,10 +444,29 @@ Status RangeDelAggregator::AddTombstones(
|
|||
first_iter = false;
|
||||
}
|
||||
ParsedInternalKey parsed_key;
|
||||
if (!ParseInternalKey(input->key(), &parsed_key)) {
|
||||
bool parsed;
|
||||
if (input->IsKeyPinned()) {
|
||||
parsed = ParseInternalKey(input->key(), &parsed_key);
|
||||
} else {
|
||||
// The tombstone map holds slices into the iterator's memory. Make a
|
||||
// copy of the key if it is not pinned.
|
||||
rep_->pinned_slices_.emplace_back(input->key().data(),
|
||||
input->key().size());
|
||||
parsed = ParseInternalKey(rep_->pinned_slices_.back(), &parsed_key);
|
||||
}
|
||||
if (!parsed) {
|
||||
return Status::Corruption("Unable to parse range tombstone InternalKey");
|
||||
}
|
||||
RangeTombstone tombstone(parsed_key, input->value());
|
||||
RangeTombstone tombstone;
|
||||
if (input->IsValuePinned()) {
|
||||
tombstone = RangeTombstone(parsed_key, input->value());
|
||||
} else {
|
||||
// The tombstone map holds slices into the iterator's memory. Make a
|
||||
// copy of the value if it is not pinned.
|
||||
rep_->pinned_slices_.emplace_back(input->value().data(),
|
||||
input->value().size());
|
||||
tombstone = RangeTombstone(parsed_key, rep_->pinned_slices_.back());
|
||||
}
|
||||
// Truncate the tombstone to the range [smallest, largest].
|
||||
if (smallest != nullptr) {
|
||||
if (icmp_.user_comparator()->Compare(
|
||||
|
|
|
@ -5,6 +5,7 @@
|
|||
|
||||
#pragma once
|
||||
|
||||
#include <list>
|
||||
#include <map>
|
||||
#include <set>
|
||||
#include <string>
|
||||
|
@ -105,7 +106,7 @@ class RangeDelAggregator {
|
|||
// covered by a range tombstone residing in the same snapshot stripe.
|
||||
// @param mode If collapse_deletions_ is true, this dictates how we will find
|
||||
// the deletion whose interval contains this key. Otherwise, its
|
||||
// value must be kFullScan indicating linear scan from beginning..
|
||||
// value must be kFullScan indicating linear scan from beginning.
|
||||
bool ShouldDelete(
|
||||
const ParsedInternalKey& parsed,
|
||||
RangeDelPositioningMode mode = RangeDelPositioningMode::kFullScan) {
|
||||
|
@ -173,6 +174,7 @@ class RangeDelAggregator {
|
|||
struct Rep {
|
||||
StripeMap stripe_map_;
|
||||
PinnedIteratorsManager pinned_iters_mgr_;
|
||||
std::list<std::string> pinned_slices_;
|
||||
std::set<uint64_t> added_files_;
|
||||
};
|
||||
// Initializes rep_ lazily. This aggregator object is constructed for every
|
||||
|
|
|
@ -32,9 +32,12 @@ struct ExternalSstFileInfo {
|
|||
: file_path(""),
|
||||
smallest_key(""),
|
||||
largest_key(""),
|
||||
smallest_range_del_key(""),
|
||||
largest_range_del_key(""),
|
||||
sequence_number(0),
|
||||
file_size(0),
|
||||
num_entries(0),
|
||||
num_range_del_entries(0),
|
||||
version(0) {}
|
||||
|
||||
ExternalSstFileInfo(const std::string& _file_path,
|
||||
|
@ -45,17 +48,24 @@ struct ExternalSstFileInfo {
|
|||
: file_path(_file_path),
|
||||
smallest_key(_smallest_key),
|
||||
largest_key(_largest_key),
|
||||
smallest_range_del_key(""),
|
||||
largest_range_del_key(""),
|
||||
sequence_number(_sequence_number),
|
||||
file_size(_file_size),
|
||||
num_entries(_num_entries),
|
||||
num_range_del_entries(0),
|
||||
version(_version) {}
|
||||
|
||||
std::string file_path; // external sst file path
|
||||
std::string smallest_key; // smallest user key in file
|
||||
std::string largest_key; // largest user key in file
|
||||
SequenceNumber sequence_number; // sequence number of all keys in file
|
||||
uint64_t file_size; // file size in bytes
|
||||
uint64_t num_entries; // number of entries in file
|
||||
std::string file_path; // external sst file path
|
||||
std::string smallest_key; // smallest user key in file
|
||||
std::string largest_key; // largest user key in file
|
||||
std::string
|
||||
smallest_range_del_key; // smallest range deletion user key in file
|
||||
std::string largest_range_del_key; // largest range deletion user key in file
|
||||
SequenceNumber sequence_number; // sequence number of all keys in file
|
||||
uint64_t file_size; // file size in bytes
|
||||
uint64_t num_entries; // number of entries in file
|
||||
uint64_t num_range_del_entries; // number of range deletion entries in file
|
||||
int32_t version; // file version
|
||||
};
|
||||
|
||||
|
@ -106,6 +116,9 @@ class SstFileWriter {
|
|||
// REQUIRES: key is after any previously added key according to comparator.
|
||||
Status Delete(const Slice& user_key);
|
||||
|
||||
// Add a range deletion tombstone to currently opened file
|
||||
Status DeleteRange(const Slice& begin_key, const Slice& end_key);
|
||||
|
||||
// Finalize writing to sst file and close file.
|
||||
//
|
||||
// An optional ExternalSstFileInfo pointer can be passed to the function
|
||||
|
|
|
@ -68,10 +68,11 @@ void Java_org_rocksdb_BackupEngine_createNewBackupWithMetadata(
|
|||
auto* backup_engine = reinterpret_cast<rocksdb::BackupEngine*>(jbe_handle);
|
||||
|
||||
jboolean has_exception = JNI_FALSE;
|
||||
std::string app_metadata = rocksdb::JniUtil::copyStdString(
|
||||
env, japp_metadata, &has_exception);
|
||||
std::string app_metadata =
|
||||
rocksdb::JniUtil::copyStdString(env, japp_metadata, &has_exception);
|
||||
if (has_exception == JNI_TRUE) {
|
||||
rocksdb::RocksDBExceptionJni::ThrowNew(env, "Could not copy jstring to std::string");
|
||||
rocksdb::RocksDBExceptionJni::ThrowNew(
|
||||
env, "Could not copy jstring to std::string");
|
||||
return;
|
||||
}
|
||||
|
||||
|
|
|
@ -2370,14 +2370,16 @@ class BackupInfoJni : public JavaClass {
|
|||
* exception occurs
|
||||
*/
|
||||
static jobject construct0(JNIEnv* env, uint32_t backup_id, int64_t timestamp,
|
||||
uint64_t size, uint32_t number_files, const std::string& app_metadata) {
|
||||
uint64_t size, uint32_t number_files,
|
||||
const std::string& app_metadata) {
|
||||
jclass jclazz = getJClass(env);
|
||||
if(jclazz == nullptr) {
|
||||
// exception occurred accessing class
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
static jmethodID mid = env->GetMethodID(jclazz, "<init>", "(IJJILjava/lang/String;)V");
|
||||
static jmethodID mid =
|
||||
env->GetMethodID(jclazz, "<init>", "(IJJILjava/lang/String;)V");
|
||||
if(mid == nullptr) {
|
||||
// exception occurred accessing method
|
||||
return nullptr;
|
||||
|
@ -2392,8 +2394,8 @@ class BackupInfoJni : public JavaClass {
|
|||
}
|
||||
}
|
||||
|
||||
jobject jbackup_info =
|
||||
env->NewObject(jclazz, mid, backup_id, timestamp, size, number_files, japp_metadata);
|
||||
jobject jbackup_info = env->NewObject(jclazz, mid, backup_id, timestamp,
|
||||
size, number_files, japp_metadata);
|
||||
if(env->ExceptionCheck()) {
|
||||
env->DeleteLocalRef(japp_metadata);
|
||||
return nullptr;
|
||||
|
@ -2448,12 +2450,9 @@ class BackupInfoListJni {
|
|||
for (auto it = backup_infos.begin(); it != end; ++it) {
|
||||
auto backup_info = *it;
|
||||
|
||||
jobject obj = rocksdb::BackupInfoJni::construct0(env,
|
||||
backup_info.backup_id,
|
||||
backup_info.timestamp,
|
||||
backup_info.size,
|
||||
backup_info.number_files,
|
||||
backup_info.app_metadata);
|
||||
jobject obj = rocksdb::BackupInfoJni::construct0(
|
||||
env, backup_info.backup_id, backup_info.timestamp, backup_info.size,
|
||||
backup_info.number_files, backup_info.app_metadata);
|
||||
if(env->ExceptionCheck()) {
|
||||
// exception occurred constructing object
|
||||
if(obj != nullptr) {
|
||||
|
|
|
@ -104,9 +104,8 @@ public class BackupEngine extends RocksObject implements AutoCloseable {
|
|||
*
|
||||
* @throws RocksDBException thrown if a new backup could not be created
|
||||
*/
|
||||
public void createNewBackupWithMetadata(
|
||||
final RocksDB db, final String metadata, final boolean flushBeforeBackup)
|
||||
throws RocksDBException {
|
||||
public void createNewBackupWithMetadata(final RocksDB db, final String metadata,
|
||||
final boolean flushBeforeBackup) throws RocksDBException {
|
||||
assert (isOwningHandle());
|
||||
createNewBackupWithMetadata(nativeHandle_, db.nativeHandle_, metadata, flushBeforeBackup);
|
||||
}
|
||||
|
|
|
@ -19,8 +19,8 @@ public class BackupInfo {
|
|||
* @param size size of backup
|
||||
* @param numberFiles number of files related to this backup.
|
||||
*/
|
||||
BackupInfo(final int backupId, final long timestamp, final long size,
|
||||
final int numberFiles, final String app_metadata) {
|
||||
BackupInfo(final int backupId, final long timestamp, final long size, final int numberFiles,
|
||||
final String app_metadata) {
|
||||
backupId_ = backupId;
|
||||
timestamp_ = timestamp;
|
||||
size_ = size;
|
||||
|
|
|
@ -207,20 +207,17 @@ public class BackupEngineTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void backupDbWithMetadata()
|
||||
throws RocksDBException {
|
||||
public void backupDbWithMetadata() throws RocksDBException {
|
||||
// Open empty database.
|
||||
try(final Options opt = new Options().setCreateIfMissing(true);
|
||||
final RocksDB db = RocksDB.open(opt,
|
||||
dbFolder.getRoot().getAbsolutePath())) {
|
||||
|
||||
try (final Options opt = new Options().setCreateIfMissing(true);
|
||||
final RocksDB db = RocksDB.open(opt, dbFolder.getRoot().getAbsolutePath())) {
|
||||
// Fill database with some test values
|
||||
prepareDatabase(db);
|
||||
|
||||
// Create two backups
|
||||
try(final BackupableDBOptions bopt = new BackupableDBOptions(
|
||||
backupFolder.getRoot().getAbsolutePath());
|
||||
final BackupEngine be = BackupEngine.open(opt.getEnv(), bopt)) {
|
||||
try (final BackupableDBOptions bopt =
|
||||
new BackupableDBOptions(backupFolder.getRoot().getAbsolutePath());
|
||||
final BackupEngine be = BackupEngine.open(opt.getEnv(), bopt)) {
|
||||
final String metadata = String.valueOf(ThreadLocalRandom.current().nextInt());
|
||||
be.createNewBackupWithMetadata(db, metadata, true);
|
||||
final List<BackupInfo> backupInfoList = verifyNumberOfValidBackups(be, 1);
|
||||
|
|
|
@ -285,13 +285,14 @@ bool BlockIter::ParseNextKey() {
|
|||
if (global_seqno_ != kDisableGlobalSequenceNumber) {
|
||||
// If we are reading a file with a global sequence number we should
|
||||
// expect that all encoded sequence numbers are zeros and any value
|
||||
// type is kTypeValue, kTypeMerge or kTypeDeletion
|
||||
// type is kTypeValue, kTypeMerge, kTypeDeletion, or kTypeRangeDeletion.
|
||||
assert(GetInternalKeySeqno(key_.GetInternalKey()) == 0);
|
||||
|
||||
ValueType value_type = ExtractValueType(key_.GetInternalKey());
|
||||
assert(value_type == ValueType::kTypeValue ||
|
||||
value_type == ValueType::kTypeMerge ||
|
||||
value_type == ValueType::kTypeDeletion);
|
||||
value_type == ValueType::kTypeDeletion ||
|
||||
value_type == ValueType::kTypeRangeDeletion);
|
||||
|
||||
if (key_pinned_) {
|
||||
// TODO(tec): Investigate updating the seqno in the loaded block
|
||||
|
|
|
@ -903,6 +903,20 @@ Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions,
|
|||
}
|
||||
}
|
||||
|
||||
// Read the table properties, if provided.
|
||||
if (rep->table_properties) {
|
||||
rep->whole_key_filtering &=
|
||||
IsFeatureSupported(*(rep->table_properties),
|
||||
BlockBasedTablePropertyNames::kWholeKeyFiltering,
|
||||
rep->ioptions.info_log);
|
||||
rep->prefix_filtering &= IsFeatureSupported(
|
||||
*(rep->table_properties),
|
||||
BlockBasedTablePropertyNames::kPrefixFiltering, rep->ioptions.info_log);
|
||||
|
||||
rep->global_seqno = GetGlobalSequenceNumber(*(rep->table_properties),
|
||||
rep->ioptions.info_log);
|
||||
}
|
||||
|
||||
// Read the range del meta block
|
||||
bool found_range_del_block;
|
||||
s = SeekToRangeDelBlock(meta_iter.get(), &found_range_del_block,
|
||||
|
@ -928,20 +942,6 @@ Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions,
|
|||
}
|
||||
}
|
||||
|
||||
// Determine whether whole key filtering is supported.
|
||||
if (rep->table_properties) {
|
||||
rep->whole_key_filtering &=
|
||||
IsFeatureSupported(*(rep->table_properties),
|
||||
BlockBasedTablePropertyNames::kWholeKeyFiltering,
|
||||
rep->ioptions.info_log);
|
||||
rep->prefix_filtering &= IsFeatureSupported(
|
||||
*(rep->table_properties),
|
||||
BlockBasedTablePropertyNames::kPrefixFiltering, rep->ioptions.info_log);
|
||||
|
||||
rep->global_seqno = GetGlobalSequenceNumber(*(rep->table_properties),
|
||||
rep->ioptions.info_log);
|
||||
}
|
||||
|
||||
bool need_upper_bound_check =
|
||||
PrefixExtractorChanged(rep->table_properties.get(), prefix_extractor);
|
||||
|
||||
|
|
|
@ -101,6 +101,42 @@ struct SstFileWriter::Rep {
|
|||
return Status::OK();
|
||||
}
|
||||
|
||||
Status DeleteRange(const Slice& begin_key, const Slice& end_key) {
|
||||
if (!builder) {
|
||||
return Status::InvalidArgument("File is not opened");
|
||||
}
|
||||
|
||||
RangeTombstone tombstone(begin_key, end_key, 0 /* Sequence Number */);
|
||||
if (file_info.num_range_del_entries == 0) {
|
||||
file_info.smallest_range_del_key.assign(tombstone.start_key_.data(),
|
||||
tombstone.start_key_.size());
|
||||
file_info.largest_range_del_key.assign(tombstone.end_key_.data(),
|
||||
tombstone.end_key_.size());
|
||||
} else {
|
||||
if (internal_comparator.user_comparator()->Compare(
|
||||
tombstone.start_key_, file_info.smallest_range_del_key) < 0) {
|
||||
file_info.smallest_range_del_key.assign(tombstone.start_key_.data(),
|
||||
tombstone.start_key_.size());
|
||||
}
|
||||
if (internal_comparator.user_comparator()->Compare(
|
||||
tombstone.end_key_, file_info.largest_range_del_key) > 0) {
|
||||
file_info.largest_range_del_key.assign(tombstone.end_key_.data(),
|
||||
tombstone.end_key_.size());
|
||||
}
|
||||
}
|
||||
|
||||
auto ikey_and_end_key = tombstone.Serialize();
|
||||
builder->Add(ikey_and_end_key.first.Encode(), ikey_and_end_key.second);
|
||||
|
||||
// update file info
|
||||
file_info.num_range_del_entries++;
|
||||
file_info.file_size = builder->FileSize();
|
||||
|
||||
InvalidatePageCache(false /* closing */);
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
void InvalidatePageCache(bool closing) {
|
||||
if (invalidate_page_cache == false) {
|
||||
// Fadvise disabled
|
||||
|
@ -209,10 +245,8 @@ Status SstFileWriter::Open(const std::string& file_path) {
|
|||
r->builder.reset(r->ioptions.table_factory->NewTableBuilder(
|
||||
table_builder_options, cf_id, r->file_writer.get()));
|
||||
|
||||
r->file_info = ExternalSstFileInfo();
|
||||
r->file_info.file_path = file_path;
|
||||
r->file_info.file_size = 0;
|
||||
r->file_info.num_entries = 0;
|
||||
r->file_info.sequence_number = 0;
|
||||
r->file_info.version = 2;
|
||||
return s;
|
||||
}
|
||||
|
@ -233,12 +267,18 @@ Status SstFileWriter::Delete(const Slice& user_key) {
|
|||
return rep_->Add(user_key, Slice(), ValueType::kTypeDeletion);
|
||||
}
|
||||
|
||||
Status SstFileWriter::DeleteRange(const Slice& begin_key,
|
||||
const Slice& end_key) {
|
||||
return rep_->DeleteRange(begin_key, end_key);
|
||||
}
|
||||
|
||||
Status SstFileWriter::Finish(ExternalSstFileInfo* file_info) {
|
||||
Rep* r = rep_.get();
|
||||
if (!r->builder) {
|
||||
return Status::InvalidArgument("File is not opened");
|
||||
}
|
||||
if (r->file_info.num_entries == 0) {
|
||||
if (r->file_info.num_entries == 0 &&
|
||||
r->file_info.num_range_del_entries == 0) {
|
||||
return Status::InvalidArgument("Cannot create sst file with no entries");
|
||||
}
|
||||
|
||||
|
|
|
@ -975,8 +975,8 @@ Status BackupEngineImpl::DeleteBackup(BackupID backup_id) {
|
|||
for (auto& itr : backuped_file_infos_) {
|
||||
if (itr.second->refs == 0) {
|
||||
Status s = backup_env_->DeleteFile(GetAbsolutePath(itr.first));
|
||||
ROCKS_LOG_INFO(options_.info_log, "Deleting %s -- %s", itr.first.c_str(),
|
||||
s.ToString().c_str());
|
||||
ROCKS_LOG_INFO(options_.info_log, "Deleting %s -- %s",
|
||||
itr.first.c_str(), s.ToString().c_str());
|
||||
to_delete.push_back(itr.first);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue