log_reader: introduce kBadHeader; drop wal mode from ReadPhysicalRecord

Move the WAL recovery mode logic out of ReadPhysicalRecord.  To do this we
introduce a new type indicating when we fail to read a valid header.

Signed-off-by: Sage Weil <sage@redhat.com>
This commit is contained in:
Sage Weil 2015-10-12 17:04:33 -04:00
parent 9c33f64d19
commit 4104e9bb67
2 changed files with 24 additions and 20 deletions

View file

@ -91,8 +91,8 @@ bool Reader::ReadRecord(Slice* record, std::string* scratch,
Slice fragment;
while (true) {
uint64_t physical_record_offset = end_of_buffer_offset_ - buffer_.size();
const unsigned int record_type =
ReadPhysicalRecord(&fragment, wal_recovery_mode);
size_t drop_size;
const unsigned int record_type = ReadPhysicalRecord(&fragment, &drop_size);
switch (record_type) {
case kFullType:
if (in_fragmented_record && !scratch->empty()) {
@ -142,6 +142,13 @@ bool Reader::ReadRecord(Slice* record, std::string* scratch,
}
break;
case kBadHeader:
if (wal_recovery_mode == WALRecoveryMode::kAbsoluteConsistency) {
// in clean shutdown we don't expect any error in the log files
ReportCorruption(drop_size, "truncated header");
}
// fall-thru
case kEof:
if (in_fragmented_record) {
if (wal_recovery_mode == WALRecoveryMode::kAbsoluteConsistency) {
@ -256,8 +263,7 @@ void Reader::ReportDrop(size_t bytes, const Status& reason) {
}
}
unsigned int Reader::ReadPhysicalRecord(Slice* result,
WALRecoveryMode wal_recovery_mode) {
unsigned int Reader::ReadPhysicalRecord(Slice* result, size_t* drop_size) {
while (true) {
if (buffer_.size() < (size_t)kHeaderSize) {
if (!eof_ && !read_error_) {
@ -280,10 +286,10 @@ unsigned int Reader::ReadPhysicalRecord(Slice* result,
// end of the file, which can be caused by the writer crashing in the
// middle of writing the header. Unless explicitly requested we don't
// considering this an error, just report EOF.
if (buffer_.size() &&
wal_recovery_mode == WALRecoveryMode::kAbsoluteConsistency) {
// in clean shutdown we don't expect any error in the log files
ReportCorruption(buffer_.size(), "truncated header");
if (buffer_.size()) {
*drop_size = buffer_.size();
buffer_.clear();
return kBadHeader;
}
buffer_.clear();
return kEof;
@ -297,19 +303,17 @@ unsigned int Reader::ReadPhysicalRecord(Slice* result,
const unsigned int type = header[6];
const uint32_t length = a | (b << 8);
if (kHeaderSize + length > buffer_.size()) {
size_t drop_size = buffer_.size();
*drop_size = buffer_.size();
buffer_.clear();
if (!eof_) {
ReportCorruption(drop_size, "bad record length");
ReportCorruption(*drop_size, "bad record length");
return kBadRecord;
}
// If the end of the file has been reached without reading |length| bytes
// of payload, assume the writer died in the middle of writing the record.
// Don't report a corruption unless requested.
if (drop_size &&
wal_recovery_mode == WALRecoveryMode::kAbsoluteConsistency) {
// in clean shutdown we don't expect any error in the log files
ReportCorruption(drop_size, "truncated header");
if (*drop_size) {
return kBadHeader;
}
return kEof;
}
@ -333,9 +337,9 @@ unsigned int Reader::ReadPhysicalRecord(Slice* result,
// been corrupted and if we trust it, we could find some
// fragment of a real log record that just happens to look
// like a valid log record.
size_t drop_size = buffer_.size();
*drop_size = buffer_.size();
buffer_.clear();
ReportCorruption(drop_size, "checksum mismatch");
ReportCorruption(*drop_size, "checksum mismatch");
return kBadRecord;
}
}

View file

@ -121,7 +121,9 @@ class Reader {
// * The record has an invalid CRC (ReadPhysicalRecord reports a drop)
// * The record is a 0-length record (No drop is reported)
// * The record is below constructor's initial_offset (No drop is reported)
kBadRecord = kMaxRecordType + 2
kBadRecord = kMaxRecordType + 2,
// Returned when we fail to read a valid header.
kBadHeader = kMaxRecordType + 3,
};
// Skips all blocks that are completely before "initial_offset_".
@ -130,9 +132,7 @@ class Reader {
bool SkipToInitialBlock();
// Return type, or one of the preceding special values
unsigned int ReadPhysicalRecord(
Slice* result, WALRecoveryMode wal_recovery_mode =
WALRecoveryMode::kTolerateCorruptedTailRecords);
unsigned int ReadPhysicalRecord(Slice* result, size_t* drop_size);
// Reports dropped bytes to the reporter.
// buffer_ must be updated to remove the dropped bytes prior to invocation.