Fix recycled WAL detection when wal_compression is enabled (#12643)

Summary:
I think the point of the `if (end_of_buffer_offset_ - buffer_.size() == 0)` was to only set `recycled_` when the first record was read. However, the condition was false when reading the first record when the WAL began with a  `kSetCompressionType` record because we had already dropped the `kSetCompressionType` record from `buffer_`. To fix this, I used `first_record_read_` instead.

Also, it was pretty confusing to treat the WAL as non-recycled when a recyclable record first appeared in a non-first record. I changed it to return an error if that happens.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/12643

Reviewed By: hx235

Differential Revision: D57238099

Pulled By: ajkr

fbshipit-source-id: e20a2a0c9cf0c9510a7b6af463650a05d559239e
This commit is contained in:
Andrew Kryczka 2024-05-22 15:34:37 -07:00 committed by Facebook GitHub Bot
parent db0960800a
commit c72ee4531b
3 changed files with 46 additions and 4 deletions

View File

@ -487,9 +487,11 @@ unsigned int Reader::ReadPhysicalRecord(Slice* result, size_t* drop_size,
type == kRecyclableUserDefinedTimestampSizeType);
if (is_recyclable_type) {
header_size = kRecyclableHeaderSize;
if (end_of_buffer_offset_ - buffer_.size() == 0) {
recycled_ = true;
if (first_record_read_ && !recycled_) {
// A recycled log should have started with a recycled record
return kBadRecord;
}
recycled_ = true;
// We need enough for the larger header
if (buffer_.size() < static_cast<size_t>(kRecyclableHeaderSize)) {
int r = kEof;
@ -867,9 +869,12 @@ bool FragmentBufferedReader::TryReadFragment(
int header_size = kHeaderSize;
if ((type >= kRecyclableFullType && type <= kRecyclableLastType) ||
type == kRecyclableUserDefinedTimestampSizeType) {
if (end_of_buffer_offset_ - buffer_.size() == 0) {
recycled_ = true;
if (first_record_read_ && !recycled_) {
// A recycled log should have started with a recycled record
*fragment_type_or_err = kBadRecord;
return true;
}
recycled_ = true;
header_size = kRecyclableHeaderSize;
while (buffer_.size() < static_cast<size_t>(kRecyclableHeaderSize)) {
size_t old_size = buffer_.size();

View File

@ -1157,6 +1157,42 @@ TEST_P(CompressionLogTest, AlignedFragmentation) {
ASSERT_EQ("EOF", Read());
}
TEST_P(CompressionLogTest, ChecksumMismatch) {
const CompressionType kCompressionType = std::get<2>(GetParam());
const bool kCompressionEnabled = kCompressionType != kNoCompression;
const bool kRecyclableLog = (std::get<0>(GetParam()) != 0);
if (!StreamingCompressionTypeSupported(kCompressionType)) {
ROCKSDB_GTEST_SKIP("Test requires support for compression type");
return;
}
ASSERT_OK(SetupTestEnv());
Write("foooooo");
int header_len;
if (kRecyclableLog) {
header_len = kRecyclableHeaderSize;
} else {
header_len = kHeaderSize;
}
int compression_record_len;
if (kCompressionEnabled) {
compression_record_len = header_len + 4;
} else {
compression_record_len = 0;
}
IncrementByte(compression_record_len + header_len /* offset */,
14 /* delta */);
ASSERT_EQ("EOF", Read());
if (!kRecyclableLog) {
ASSERT_GT(DroppedBytes(), 0U);
ASSERT_EQ("OK", MatchError("checksum mismatch"));
} else {
ASSERT_EQ(0U, DroppedBytes());
ASSERT_EQ("", ReportMessage());
}
}
INSTANTIATE_TEST_CASE_P(
Compression, CompressionLogTest,
::testing::Combine(::testing::Values(0, 1), ::testing::Bool(),

View File

@ -0,0 +1 @@
* Fixed a false positive `Status::Corruption` reported when reopening a DB that used `DBOptions::recycle_log_file_num > 0` and `DBOptions::wal_compression != kNoCompression`.