Add file_checksum from FileChecksumGenFactory and Tests for corrupted output (#13060)

Summary:
- When `FileChecksumGenFactory` is set, include the `file_checksum` and `file_checksum_func_name` in the output file metadata
- ~~In Remote Compaction, try opening the output files in the temporary directory to do a quick sanity check before returning the result with status.~~
- After offline discussion, we decided to rely on Primary's existing Compaction flow to sanity check the output files. If the output file is corrupted, we will still be able to catch it and not installing it even after renaming them to cf_paths. The corrupted file in the cf_path won't be added to the MANIFEST and will be purged as part of the next `PurgeObsoleteFiles()` call.
- Unit Test has been added to validate above.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/13060

Test Plan:
Unit test added
```
./compaction_service_test --gtest_filter="*CorruptedOutput*"
./compaction_service_test --gtest_filter="*TruncatedOutput*"
./compaction_service_test --gtest_filter="*CustomFileChecksum*"
./compaction_job_test --gtest_filter="*ResultSerialization*"
```

Reviewed By: cbi42

Differential Revision: D64189645

Pulled By: jaykorean

fbshipit-source-id: 6cf28720169c960c80df257806bfee3c0d177159
This commit is contained in:
Jay Huh 2024-10-14 18:26:17 -07:00 committed by Facebook GitHub Bot
parent 351d2fd2b6
commit dd76862b00
4 changed files with 291 additions and 17 deletions

View File

@ -413,19 +413,22 @@ struct CompactionServiceOutputFile {
SequenceNumber largest_seqno; SequenceNumber largest_seqno;
std::string smallest_internal_key; std::string smallest_internal_key;
std::string largest_internal_key; std::string largest_internal_key;
uint64_t oldest_ancester_time; uint64_t oldest_ancester_time = kUnknownOldestAncesterTime;
uint64_t file_creation_time; uint64_t file_creation_time = kUnknownFileCreationTime;
uint64_t epoch_number; uint64_t epoch_number = kUnknownEpochNumber;
std::string file_checksum = kUnknownFileChecksum;
std::string file_checksum_func_name = kUnknownFileChecksumFuncName;
uint64_t paranoid_hash; uint64_t paranoid_hash;
bool marked_for_compaction; bool marked_for_compaction;
UniqueId64x2 unique_id; UniqueId64x2 unique_id{};
CompactionServiceOutputFile() = default; CompactionServiceOutputFile() = default;
CompactionServiceOutputFile( CompactionServiceOutputFile(
const std::string& name, SequenceNumber smallest, SequenceNumber largest, const std::string& name, SequenceNumber smallest, SequenceNumber largest,
std::string _smallest_internal_key, std::string _largest_internal_key, std::string _smallest_internal_key, std::string _largest_internal_key,
uint64_t _oldest_ancester_time, uint64_t _file_creation_time, uint64_t _oldest_ancester_time, uint64_t _file_creation_time,
uint64_t _epoch_number, uint64_t _paranoid_hash, uint64_t _epoch_number, const std::string& _file_checksum,
const std::string& _file_checksum_func_name, uint64_t _paranoid_hash,
bool _marked_for_compaction, UniqueId64x2 _unique_id) bool _marked_for_compaction, UniqueId64x2 _unique_id)
: file_name(name), : file_name(name),
smallest_seqno(smallest), smallest_seqno(smallest),
@ -435,6 +438,8 @@ struct CompactionServiceOutputFile {
oldest_ancester_time(_oldest_ancester_time), oldest_ancester_time(_oldest_ancester_time),
file_creation_time(_file_creation_time), file_creation_time(_file_creation_time),
epoch_number(_epoch_number), epoch_number(_epoch_number),
file_checksum(_file_checksum),
file_checksum_func_name(_file_checksum_func_name),
paranoid_hash(_paranoid_hash), paranoid_hash(_paranoid_hash),
marked_for_compaction(_marked_for_compaction), marked_for_compaction(_marked_for_compaction),
unique_id(std::move(_unique_id)) {} unique_id(std::move(_unique_id)) {}

View File

@ -1656,15 +1656,26 @@ TEST_F(CompactionJobTest, ResultSerialization) {
}; };
result.status = result.status =
status_list.at(rnd.Uniform(static_cast<int>(status_list.size()))); status_list.at(rnd.Uniform(static_cast<int>(status_list.size())));
std::string file_checksum = rnd.RandomBinaryString(rnd.Uniform(kStrMaxLen));
std::string file_checksum_func_name = "MyAwesomeChecksumGenerator";
while (!rnd.OneIn(10)) { while (!rnd.OneIn(10)) {
UniqueId64x2 id{rnd64.Uniform(UINT64_MAX), rnd64.Uniform(UINT64_MAX)}; UniqueId64x2 id{rnd64.Uniform(UINT64_MAX), rnd64.Uniform(UINT64_MAX)};
result.output_files.emplace_back( result.output_files.emplace_back(
rnd.RandomString(rnd.Uniform(kStrMaxLen)), rnd64.Uniform(UINT64_MAX), rnd.RandomString(rnd.Uniform(kStrMaxLen)) /* file_name */,
rnd64.Uniform(UINT64_MAX), rnd64.Uniform(UINT64_MAX) /* smallest_seqno */,
rnd.RandomBinaryString(rnd.Uniform(kStrMaxLen)), rnd64.Uniform(UINT64_MAX) /* largest_seqno */,
rnd.RandomBinaryString(rnd.Uniform(kStrMaxLen)), rnd.RandomBinaryString(
rnd64.Uniform(UINT64_MAX), rnd64.Uniform(UINT64_MAX), rnd.Uniform(kStrMaxLen)) /* smallest_internal_key */,
rnd64.Uniform(UINT64_MAX), rnd64.Uniform(UINT64_MAX), rnd.OneIn(2), id); rnd.RandomBinaryString(
rnd.Uniform(kStrMaxLen)) /* largest_internal_key */,
rnd64.Uniform(UINT64_MAX) /* oldest_ancester_time */,
rnd64.Uniform(UINT64_MAX) /* file_creation_time */,
rnd64.Uniform(UINT64_MAX) /* epoch_number */,
file_checksum /* file_checksum */,
file_checksum_func_name /* file_checksum_func_name */,
rnd64.Uniform(UINT64_MAX) /* paranoid_hash */,
rnd.OneIn(2) /* marked_for_compaction */, id);
} }
result.output_level = rnd.Uniform(10); result.output_level = rnd.Uniform(10);
result.output_path = rnd.RandomString(rnd.Uniform(kStrMaxLen)); result.output_path = rnd.RandomString(rnd.Uniform(kStrMaxLen));
@ -1700,6 +1711,10 @@ TEST_F(CompactionJobTest, ResultSerialization) {
ASSERT_FALSE(deserialized_tmp.TEST_Equals(&result, &mismatch)); ASSERT_FALSE(deserialized_tmp.TEST_Equals(&result, &mismatch));
ASSERT_EQ(mismatch, "output_files.unique_id"); ASSERT_EQ(mismatch, "output_files.unique_id");
deserialized_tmp.status.PermitUncheckedError(); deserialized_tmp.status.PermitUncheckedError();
ASSERT_EQ(deserialized_tmp.output_files[0].file_checksum, file_checksum);
ASSERT_EQ(deserialized_tmp.output_files[0].file_checksum_func_name,
file_checksum_func_name);
} }
// Test unknown field // Test unknown field

View File

@ -195,6 +195,8 @@ CompactionJob::ProcessKeyValueCompactionWithCompactionService(
meta.oldest_ancester_time = file.oldest_ancester_time; meta.oldest_ancester_time = file.oldest_ancester_time;
meta.file_creation_time = file.file_creation_time; meta.file_creation_time = file.file_creation_time;
meta.epoch_number = file.epoch_number; meta.epoch_number = file.epoch_number;
meta.file_checksum = file.file_checksum;
meta.file_checksum_func_name = file.file_checksum_func_name;
meta.marked_for_compaction = file.marked_for_compaction; meta.marked_for_compaction = file.marked_for_compaction;
meta.unique_id = file.unique_id; meta.unique_id = file.unique_id;
@ -320,9 +322,6 @@ Status CompactionServiceCompactionJob::Run() {
if (status.ok()) { if (status.ok()) {
status = io_s; status = io_s;
} }
if (status.ok()) {
// TODO: Add verify_table()
}
// Finish up all book-keeping to unify the subcompaction results // Finish up all book-keeping to unify the subcompaction results
compact_->AggregateCompactionStats(compaction_stats_, *compaction_job_stats_); compact_->AggregateCompactionStats(compaction_stats_, *compaction_job_stats_);
@ -343,10 +342,14 @@ Status CompactionServiceCompactionJob::Run() {
MakeTableFileName(meta.fd.GetNumber()), meta.fd.smallest_seqno, MakeTableFileName(meta.fd.GetNumber()), meta.fd.smallest_seqno,
meta.fd.largest_seqno, meta.smallest.Encode().ToString(), meta.fd.largest_seqno, meta.smallest.Encode().ToString(),
meta.largest.Encode().ToString(), meta.oldest_ancester_time, meta.largest.Encode().ToString(), meta.oldest_ancester_time,
meta.file_creation_time, meta.epoch_number, meta.file_creation_time, meta.epoch_number, meta.file_checksum,
output_file.validator.GetHash(), meta.marked_for_compaction, meta.file_checksum_func_name, output_file.validator.GetHash(),
meta.unique_id); meta.marked_for_compaction, meta.unique_id);
} }
TEST_SYNC_POINT_CALLBACK("CompactionServiceCompactionJob::Run:0",
&compaction_result_);
InternalStats::CompactionStatsFull compaction_stats; InternalStats::CompactionStatsFull compaction_stats;
sub_compact->AggregateCompactionStats(compaction_stats); sub_compact->AggregateCompactionStats(compaction_stats);
compaction_result_->num_output_records = compaction_result_->num_output_records =
@ -471,6 +474,14 @@ static std::unordered_map<std::string, OptionTypeInfo>
{offsetof(struct CompactionServiceOutputFile, epoch_number), {offsetof(struct CompactionServiceOutputFile, epoch_number),
OptionType::kUInt64T, OptionVerificationType::kNormal, OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}}, OptionTypeFlags::kNone}},
{"file_checksum",
{offsetof(struct CompactionServiceOutputFile, file_checksum),
OptionType::kEncodedString, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"file_checksum_func_name",
{offsetof(struct CompactionServiceOutputFile, file_checksum_func_name),
OptionType::kEncodedString, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"paranoid_hash", {"paranoid_hash",
{offsetof(struct CompactionServiceOutputFile, paranoid_hash), {offsetof(struct CompactionServiceOutputFile, paranoid_hash),
OptionType::kUInt64T, OptionVerificationType::kNormal, OptionType::kUInt64T, OptionVerificationType::kNormal,

View File

@ -396,6 +396,249 @@ TEST_F(CompactionServiceTest, ManualCompaction) {
ASSERT_TRUE(result.stats.is_remote_compaction); ASSERT_TRUE(result.stats.is_remote_compaction);
} }
TEST_F(CompactionServiceTest, CorruptedOutput) {
Options options = CurrentOptions();
options.disable_auto_compactions = true;
ReopenWithCompactionService(&options);
GenerateTestData();
auto my_cs = GetCompactionService();
std::string start_str = Key(15);
std::string end_str = Key(45);
Slice start(start_str);
Slice end(end_str);
uint64_t comp_num = my_cs->GetCompactionNum();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"CompactionServiceCompactionJob::Run:0", [&](void* arg) {
CompactionServiceResult* compaction_result =
*(static_cast<CompactionServiceResult**>(arg));
ASSERT_TRUE(compaction_result != nullptr &&
!compaction_result->output_files.empty());
// Corrupt files here
for (const auto& output_file : compaction_result->output_files) {
std::string file_name =
compaction_result->output_path + "/" + output_file.file_name;
uint64_t file_size = 0;
Status s = options.env->GetFileSize(file_name, &file_size);
ASSERT_OK(s);
ASSERT_GT(file_size, 0);
ASSERT_OK(test::CorruptFile(env_, file_name, 0,
static_cast<int>(file_size),
true /* verifyChecksum */));
}
});
SyncPoint::GetInstance()->EnableProcessing();
// CompactRange() should fail
Status s = db_->CompactRange(CompactRangeOptions(), &start, &end);
ASSERT_NOK(s);
ASSERT_TRUE(s.IsCorruption());
ASSERT_GE(my_cs->GetCompactionNum(), comp_num + 1);
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
// On the worker side, the compaction is considered success
// Verification is done on the primary side
CompactionServiceResult result;
my_cs->GetResult(&result);
ASSERT_OK(result.status);
ASSERT_TRUE(result.stats.is_manual_compaction);
ASSERT_TRUE(result.stats.is_remote_compaction);
}
TEST_F(CompactionServiceTest, CorruptedOutputParanoidFileCheck) {
for (bool paranoid_file_check_enabled : {false, true}) {
SCOPED_TRACE("paranoid_file_check_enabled=" +
std::to_string(paranoid_file_check_enabled));
Options options = CurrentOptions();
Destroy(options);
options.disable_auto_compactions = true;
options.paranoid_file_checks = paranoid_file_check_enabled;
ReopenWithCompactionService(&options);
GenerateTestData();
auto my_cs = GetCompactionService();
std::string start_str = Key(15);
std::string end_str = Key(45);
Slice start(start_str);
Slice end(end_str);
uint64_t comp_num = my_cs->GetCompactionNum();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"CompactionServiceCompactionJob::Run:0", [&](void* arg) {
CompactionServiceResult* compaction_result =
*(static_cast<CompactionServiceResult**>(arg));
ASSERT_TRUE(compaction_result != nullptr &&
!compaction_result->output_files.empty());
// Corrupt files here
for (const auto& output_file : compaction_result->output_files) {
std::string file_name =
compaction_result->output_path + "/" + output_file.file_name;
// Corrupt very small range of bytes. This corruption is so small
// that this isn't caught by default light-weight check
ASSERT_OK(test::CorruptFile(env_, file_name, 0, 1,
false /* verifyChecksum */));
}
});
SyncPoint::GetInstance()->EnableProcessing();
Status s = db_->CompactRange(CompactRangeOptions(), &start, &end);
if (paranoid_file_check_enabled) {
ASSERT_NOK(s);
ASSERT_EQ(Status::Corruption("Paranoid checksums do not match"), s);
} else {
// CompactRange() goes through if paranoid file check is not enabled
ASSERT_OK(s);
}
ASSERT_GE(my_cs->GetCompactionNum(), comp_num + 1);
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
// On the worker side, the compaction is considered success
// Verification is done on the primary side
CompactionServiceResult result;
my_cs->GetResult(&result);
ASSERT_OK(result.status);
ASSERT_TRUE(result.stats.is_manual_compaction);
ASSERT_TRUE(result.stats.is_remote_compaction);
}
}
TEST_F(CompactionServiceTest, TruncatedOutput) {
Options options = CurrentOptions();
options.disable_auto_compactions = true;
ReopenWithCompactionService(&options);
GenerateTestData();
auto my_cs = GetCompactionService();
std::string start_str = Key(15);
std::string end_str = Key(45);
Slice start(start_str);
Slice end(end_str);
uint64_t comp_num = my_cs->GetCompactionNum();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"CompactionServiceCompactionJob::Run:0", [&](void* arg) {
CompactionServiceResult* compaction_result =
*(static_cast<CompactionServiceResult**>(arg));
ASSERT_TRUE(compaction_result != nullptr &&
!compaction_result->output_files.empty());
// Truncate files here
for (const auto& output_file : compaction_result->output_files) {
std::string file_name =
compaction_result->output_path + "/" + output_file.file_name;
uint64_t file_size = 0;
Status s = options.env->GetFileSize(file_name, &file_size);
ASSERT_OK(s);
ASSERT_GT(file_size, 0);
ASSERT_OK(test::TruncateFile(env_, file_name, file_size / 2));
}
});
SyncPoint::GetInstance()->EnableProcessing();
// CompactRange() should fail
Status s = db_->CompactRange(CompactRangeOptions(), &start, &end);
ASSERT_NOK(s);
ASSERT_TRUE(s.IsCorruption());
ASSERT_GE(my_cs->GetCompactionNum(), comp_num + 1);
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
// On the worker side, the compaction is considered success
// Verification is done on the primary side
CompactionServiceResult result;
my_cs->GetResult(&result);
ASSERT_OK(result.status);
ASSERT_TRUE(result.stats.is_manual_compaction);
ASSERT_TRUE(result.stats.is_remote_compaction);
}
TEST_F(CompactionServiceTest, CustomFileChecksum) {
Options options = CurrentOptions();
options.file_checksum_gen_factory = GetFileChecksumGenCrc32cFactory();
ReopenWithCompactionService(&options);
GenerateTestData();
auto my_cs = GetCompactionService();
std::string start_str = Key(15);
std::string end_str = Key(45);
Slice start(start_str);
Slice end(end_str);
uint64_t comp_num = my_cs->GetCompactionNum();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"CompactionServiceCompactionJob::Run:0", [&](void* arg) {
CompactionServiceResult* compaction_result =
*(static_cast<CompactionServiceResult**>(arg));
ASSERT_TRUE(compaction_result != nullptr &&
!compaction_result->output_files.empty());
// Validate Checksum files here
for (const auto& output_file : compaction_result->output_files) {
std::string file_name =
compaction_result->output_path + "/" + output_file.file_name;
FileChecksumGenContext gen_context;
gen_context.file_name = file_name;
std::unique_ptr<FileChecksumGenerator> file_checksum_gen =
options.file_checksum_gen_factory->CreateFileChecksumGenerator(
gen_context);
std::unique_ptr<SequentialFile> file_reader;
uint64_t file_size = 0;
Status s = options.env->GetFileSize(file_name, &file_size);
ASSERT_OK(s);
ASSERT_GT(file_size, 0);
s = options.env->NewSequentialFile(file_name, &file_reader,
EnvOptions());
ASSERT_OK(s);
Slice result;
std::unique_ptr<char[]> scratch(new char[file_size]);
s = file_reader->Read(file_size, &result, scratch.get());
ASSERT_OK(s);
file_checksum_gen->Update(scratch.get(), result.size());
file_checksum_gen->Finalize();
// Verify actual checksum and the func name
ASSERT_EQ(file_checksum_gen->Name(),
output_file.file_checksum_func_name);
ASSERT_EQ(file_checksum_gen->GetChecksum(),
output_file.file_checksum);
}
});
SyncPoint::GetInstance()->EnableProcessing();
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &start, &end));
ASSERT_GE(my_cs->GetCompactionNum(), comp_num + 1);
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
CompactionServiceResult result;
my_cs->GetResult(&result);
ASSERT_OK(result.status);
ASSERT_TRUE(result.stats.is_manual_compaction);
ASSERT_TRUE(result.stats.is_remote_compaction);
}
TEST_F(CompactionServiceTest, CancelCompactionOnRemoteSide) { TEST_F(CompactionServiceTest, CancelCompactionOnRemoteSide) {
Options options = CurrentOptions(); Options options = CurrentOptions();
options.disable_auto_compactions = true; options.disable_auto_compactions = true;