verify files by just opening the them

This commit is contained in:
Jay Huh 2024-10-11 14:46:35 -07:00
parent e3031a9901
commit 9f55030b0d
2 changed files with 61 additions and 9 deletions

View file

@ -351,7 +351,7 @@ Status CompactionServiceCompactionJob::Run() {
TEST_SYNC_POINT_CALLBACK("CompactionServiceCompactionJob::Run:0",
&compaction_result_);
// Verify compaction result using SST File Reader
// Sanity check by try opening the output file using SST File Reader
if (status.ok()) {
// GetLatestCFOptions() normally requires DB Mutex held, but we don't expect
// CFs to change in the remote worker, so this should be okay
@ -360,12 +360,7 @@ Status CompactionServiceCompactionJob::Run() {
SstFileReader reader(options);
for (const auto& output_file : compaction_result_->output_files) {
status = reader.Open(output_path_ + "/" + output_file.file_name);
if (status.ok()) {
status = reader.VerifyChecksum(ReadOptions());
if (!status.ok()) {
break;
}
} else {
if (!status.ok()) {
break;
}
}

View file

@ -409,17 +409,25 @@ TEST_F(CompactionServiceTest, CorruptedOutput) {
Slice start(start_str);
Slice end(end_str);
uint64_t comp_num = my_cs->GetCompactionNum();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"CompactionServiceCompactionJob::Run:0", [&](void* arg) {
CompactionServiceResult* compaction_result =
*(static_cast<CompactionServiceResult**>(arg));
ASSERT_TRUE(compaction_result != nullptr &&
!compaction_result->output_files.empty());
// Corrupt files here
// Corrupt or truncate files here
for (const auto& output_file : compaction_result->output_files) {
std::string file_name =
compaction_result->output_path + "/" + output_file.file_name;
ASSERT_OK(test::CorruptFile(env_, file_name, 0, 10,
uint64_t file_size = 0;
Status s = options.env->GetFileSize(file_name, &file_size);
ASSERT_OK(s);
ASSERT_GT(file_size, 0);
ASSERT_OK(test::CorruptFile(env_, file_name, 0,
static_cast<int>(file_size),
true /* verifyChecksum */));
}
});
@ -439,6 +447,55 @@ TEST_F(CompactionServiceTest, CorruptedOutput) {
ASSERT_TRUE(result.stats.is_remote_compaction);
}
TEST_F(CompactionServiceTest, TruncatedOutput) {
Options options = CurrentOptions();
options.disable_auto_compactions = true;
ReopenWithCompactionService(&options);
GenerateTestData();
auto my_cs = GetCompactionService();
std::string start_str = Key(15);
std::string end_str = Key(45);
Slice start(start_str);
Slice end(end_str);
uint64_t comp_num = my_cs->GetCompactionNum();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"CompactionServiceCompactionJob::Run:0", [&](void* arg) {
CompactionServiceResult* compaction_result =
*(static_cast<CompactionServiceResult**>(arg));
ASSERT_TRUE(compaction_result != nullptr &&
!compaction_result->output_files.empty());
// Corrupt or truncate files here
for (const auto& output_file : compaction_result->output_files) {
std::string file_name =
compaction_result->output_path + "/" + output_file.file_name;
uint64_t file_size = 0;
Status s = options.env->GetFileSize(file_name, &file_size);
ASSERT_OK(s);
ASSERT_GT(file_size, 0);
ASSERT_OK(test::TruncateFile(env_, file_name, file_size / 2));
}
});
SyncPoint::GetInstance()->EnableProcessing();
ASSERT_NOK(db_->CompactRange(CompactRangeOptions(), &start, &end));
ASSERT_GE(my_cs->GetCompactionNum(), comp_num + 1);
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
CompactionServiceResult result;
my_cs->GetResult(&result);
ASSERT_NOK(result.status);
ASSERT_TRUE(result.status.IsCorruption());
ASSERT_TRUE(result.stats.is_manual_compaction);
ASSERT_TRUE(result.stats.is_remote_compaction);
}
TEST_F(CompactionServiceTest, CustomFileChecksum) {
Options options = CurrentOptions();
options.file_checksum_gen_factory = GetFileChecksumGenCrc32cFactory();