Update Remote Compaction Tests to include more than one CF (#12430)

Summary:
Update `compaction_service_test` to make sure remote compaction works with multiple column family set up. Minor refactor to get rid of duplicate code

Fixing one quick bug in the existing test util: Test util's `FilesPerLevel` didn't honor `cf_id` properly)

Pull Request resolved: https://github.com/facebook/rocksdb/pull/12430

Test Plan:
```
./compaction_service_test
```

Reviewed By: ajkr

Differential Revision: D54883035

Pulled By: jaykorean

fbshipit-source-id: 83b4f6f566fed5c4824bfef7de01074354a72b44
This commit is contained in:
Jay Huh 2024-03-18 15:40:48 -07:00 committed by Facebook GitHub Bot
parent 2443ebf810
commit b4e9f5a400
2 changed files with 44 additions and 116 deletions

View File

@ -177,6 +177,7 @@ class CompactionServiceTest : public DBTestBase {
remote_table_properties_collector_factories);
options->compaction_service = compaction_service_;
DestroyAndReopen(*options);
CreateAndReopenWithCF({"cf_1", "cf_2", "cf_3"}, *options);
}
Statistics* GetCompactorStatistics() { return compactor_statistics_.get(); }
@ -188,32 +189,40 @@ class CompactionServiceTest : public DBTestBase {
return static_cast_with_check<MyTestCompactionService>(cs);
}
void GenerateTestData() {
// Generate 20 files @ L2
void GenerateTestData(bool move_files_manually = false) {
// Generate 20 files @ L2 Per CF
for (int cf_id = 0; cf_id < static_cast<int>(handles_.size()); cf_id++) {
for (int i = 0; i < 20; i++) {
for (int j = 0; j < 10; j++) {
int key_id = i * 10 + j;
ASSERT_OK(Put(Key(key_id), "value" + std::to_string(key_id)));
ASSERT_OK(Put(cf_id, Key(key_id), "value" + std::to_string(key_id)));
}
ASSERT_OK(Flush());
ASSERT_OK(Flush(cf_id));
}
if (move_files_manually) {
MoveFilesToLevel(2, cf_id);
}
MoveFilesToLevel(2);
// Generate 10 files @ L1 overlap with all 20 files @ L2
for (int i = 0; i < 10; i++) {
for (int j = 0; j < 10; j++) {
int key_id = i * 20 + j * 2;
ASSERT_OK(Put(Key(key_id), "value_new" + std::to_string(key_id)));
ASSERT_OK(
Put(cf_id, Key(key_id), "value_new" + std::to_string(key_id)));
}
ASSERT_OK(Flush(cf_id));
}
if (move_files_manually) {
MoveFilesToLevel(1, cf_id);
ASSERT_EQ(FilesPerLevel(cf_id), "0,10,20");
}
ASSERT_OK(Flush());
}
MoveFilesToLevel(1);
ASSERT_EQ(FilesPerLevel(), "0,10,20");
}
void VerifyTestData() {
for (int cf_id = 0; cf_id < static_cast<int>(handles_.size()); cf_id++) {
for (int i = 0; i < 200; i++) {
auto result = Get(Key(i));
auto result = Get(cf_id, Key(i));
if (i % 2) {
ASSERT_EQ(result, "value" + std::to_string(i));
} else {
@ -221,6 +230,7 @@ class CompactionServiceTest : public DBTestBase {
}
}
}
}
std::vector<std::shared_ptr<EventListener>> remote_listeners;
std::vector<std::shared_ptr<TablePropertiesCollectorFactory>>
@ -239,32 +249,10 @@ TEST_F(CompactionServiceTest, BasicCompactions) {
Statistics* primary_statistics = GetPrimaryStatistics();
Statistics* compactor_statistics = GetCompactorStatistics();
for (int i = 0; i < 20; i++) {
for (int j = 0; j < 10; j++) {
int key_id = i * 10 + j;
ASSERT_OK(Put(Key(key_id), "value" + std::to_string(key_id)));
}
ASSERT_OK(Flush());
}
for (int i = 0; i < 10; i++) {
for (int j = 0; j < 10; j++) {
int key_id = i * 20 + j * 2;
ASSERT_OK(Put(Key(key_id), "value_new" + std::to_string(key_id)));
}
ASSERT_OK(Flush());
}
GenerateTestData();
ASSERT_OK(dbfull()->TEST_WaitForCompact());
VerifyTestData();
// verify result
for (int i = 0; i < 200; i++) {
auto result = Get(Key(i));
if (i % 2) {
ASSERT_EQ(result, "value" + std::to_string(i));
} else {
ASSERT_EQ(result, "value_new" + std::to_string(i));
}
}
auto my_cs = GetCompactionService();
ASSERT_GE(my_cs->GetCompactionNum(), 1);
@ -327,7 +315,8 @@ TEST_F(CompactionServiceTest, BasicCompactions) {
assert(*id != kNullUniqueId64x2);
verify_passed++;
});
Reopen(options);
ReopenWithColumnFamilies({kDefaultColumnFamilyName, "cf_1", "cf_2", "cf_3"},
options);
ASSERT_GT(verify_passed, 0);
Close();
}
@ -495,26 +484,9 @@ TEST_F(CompactionServiceTest, CompactionFilter) {
new PartialDeleteCompactionFilter());
options.compaction_filter = delete_comp_filter.get();
ReopenWithCompactionService(&options);
for (int i = 0; i < 20; i++) {
for (int j = 0; j < 10; j++) {
int key_id = i * 10 + j;
ASSERT_OK(Put(Key(key_id), "value" + std::to_string(key_id)));
}
ASSERT_OK(Flush());
}
for (int i = 0; i < 10; i++) {
for (int j = 0; j < 10; j++) {
int key_id = i * 20 + j * 2;
ASSERT_OK(Put(Key(key_id), "value_new" + std::to_string(key_id)));
}
ASSERT_OK(Flush());
}
GenerateTestData();
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
// verify result
for (int i = 0; i < 200; i++) {
auto result = Get(Key(i));
@ -556,7 +528,7 @@ TEST_F(CompactionServiceTest, ConcurrentCompaction) {
options.level0_file_num_compaction_trigger = 100;
options.max_background_jobs = 20;
ReopenWithCompactionService(&options);
GenerateTestData();
GenerateTestData(true);
ColumnFamilyMetaData meta;
db_->GetColumnFamilyMetaData(&meta);
@ -575,14 +547,7 @@ TEST_F(CompactionServiceTest, ConcurrentCompaction) {
ASSERT_OK(dbfull()->TEST_WaitForCompact());
// verify result
for (int i = 0; i < 200; i++) {
auto result = Get(Key(i));
if (i % 2) {
ASSERT_EQ(result, "value" + std::to_string(i));
} else {
ASSERT_EQ(result, "value_new" + std::to_string(i));
}
}
VerifyTestData();
auto my_cs = GetCompactionService();
ASSERT_EQ(my_cs->GetCompactionNum(), 10);
ASSERT_EQ(FilesPerLevel(), "0,0,10");
@ -592,21 +557,7 @@ TEST_F(CompactionServiceTest, CompactionInfo) {
Options options = CurrentOptions();
ReopenWithCompactionService(&options);
for (int i = 0; i < 20; i++) {
for (int j = 0; j < 10; j++) {
int key_id = i * 10 + j;
ASSERT_OK(Put(Key(key_id), "value" + std::to_string(key_id)));
}
ASSERT_OK(Flush());
}
for (int i = 0; i < 10; i++) {
for (int j = 0; j < 10; j++) {
int key_id = i * 20 + j * 2;
ASSERT_OK(Put(Key(key_id), "value_new" + std::to_string(key_id)));
}
ASSERT_OK(Flush());
}
GenerateTestData();
ASSERT_OK(dbfull()->TEST_WaitForCompact());
auto my_cs =
static_cast_with_check<MyTestCompactionService>(GetCompactionService());
@ -681,32 +632,9 @@ TEST_F(CompactionServiceTest, FallbackLocalAuto) {
my_cs->OverrideStartStatus(CompactionServiceJobStatus::kUseLocal);
for (int i = 0; i < 20; i++) {
for (int j = 0; j < 10; j++) {
int key_id = i * 10 + j;
ASSERT_OK(Put(Key(key_id), "value" + std::to_string(key_id)));
}
ASSERT_OK(Flush());
}
for (int i = 0; i < 10; i++) {
for (int j = 0; j < 10; j++) {
int key_id = i * 20 + j * 2;
ASSERT_OK(Put(Key(key_id), "value_new" + std::to_string(key_id)));
}
ASSERT_OK(Flush());
}
GenerateTestData();
ASSERT_OK(dbfull()->TEST_WaitForCompact());
// verify result
for (int i = 0; i < 200; i++) {
auto result = Get(Key(i));
if (i % 2) {
ASSERT_EQ(result, "value" + std::to_string(i));
} else {
ASSERT_EQ(result, "value_new" + std::to_string(i));
}
}
VerifyTestData();
ASSERT_EQ(my_cs->GetCompactionNum(), 0);

View File

@ -1165,7 +1165,7 @@ int DBTestBase::TotalTableFiles(int cf, int levels) {
// Return spread of files per level
std::string DBTestBase::FilesPerLevel(int cf) {
int num_levels =
(cf == 0) ? db_->NumberLevels() : db_->NumberLevels(handles_[1]);
(cf == 0) ? db_->NumberLevels() : db_->NumberLevels(handles_[cf]);
std::string result;
size_t last_non_zero_offset = 0;
for (int level = 0; level < num_levels; level++) {