In all the places where log records are read, there was a check that

record.size() should not be less than 12.

This "magic number" seems to be the WriteBatch header (8 byte sequence
and 4 byte count).   Replaced all the places where "12" was used
by WriteBatchInternal::kHeader.
This commit is contained in:
zensan 2016-03-30 23:05:22 +05:30
parent e3802531f1
commit 78711524b7
7 changed files with 12 additions and 10 deletions

View File

@ -1186,7 +1186,7 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
continue_replay_log && continue_replay_log &&
reader.ReadRecord(&record, &scratch, db_options_.wal_recovery_mode) && reader.ReadRecord(&record, &scratch, db_options_.wal_recovery_mode) &&
status.ok()) { status.ok()) {
if (record.size() < 12) { if (record.size() < WriteBatchInternal::kHeader) {
reporter.Corruption(record.size(), reporter.Corruption(record.size(),
Status::Corruption("log record too small")); Status::Corruption("log record too small"));
continue; continue;

View File

@ -265,7 +265,7 @@ class Repairer {
mem->Ref(); mem->Ref();
int counter = 0; int counter = 0;
while (reader.ReadRecord(&record, &scratch)) { while (reader.ReadRecord(&record, &scratch)) {
if (record.size() < 12) { if (record.size() < WriteBatchInternal::kHeader) {
reporter.Corruption( reporter.Corruption(
record.size(), Status::Corruption("log record too small")); record.size(), Status::Corruption("log record too small"));
continue; continue;

View File

@ -107,7 +107,7 @@ void TransactionLogIteratorImpl::SeekToStartSequence(
return; return;
} }
while (RestrictedRead(&record, &scratch)) { while (RestrictedRead(&record, &scratch)) {
if (record.size() < 12) { if (record.size() < WriteBatchInternal::kHeader) {
reporter_.Corruption( reporter_.Corruption(
record.size(), Status::Corruption("very small log record")); record.size(), Status::Corruption("very small log record"));
continue; continue;
@ -167,7 +167,7 @@ void TransactionLogIteratorImpl::NextImpl(bool internal) {
currentLogReader_->UnmarkEOF(); currentLogReader_->UnmarkEOF();
} }
while (RestrictedRead(&record, &scratch)) { while (RestrictedRead(&record, &scratch)) {
if (record.size() < 12) { if (record.size() < WriteBatchInternal::kHeader) {
reporter_.Corruption( reporter_.Corruption(
record.size(), Status::Corruption("very small log record")); record.size(), Status::Corruption("very small log record"));
continue; continue;

View File

@ -455,7 +455,7 @@ Status WalManager::ReadFirstLine(const std::string& fname,
if (reader.ReadRecord(&record, &scratch) && if (reader.ReadRecord(&record, &scratch) &&
(status.ok() || !db_options_.paranoid_checks)) { (status.ok() || !db_options_.paranoid_checks)) {
if (record.size() < 12) { if (record.size() < WriteBatchInternal::kHeader) {
reporter.Corruption(record.size(), reporter.Corruption(record.size(),
Status::Corruption("log record too small")); Status::Corruption("log record too small"));
// TODO read record's till the first no corrupt entry? // TODO read record's till the first no corrupt entry?

View File

@ -81,8 +81,6 @@ struct BatchContentClassifier : public WriteBatch::Handler {
} // anon namespace } // anon namespace
// WriteBatch header has an 8-byte sequence number followed by a 4-byte count.
static const size_t kHeader = 12;
struct SavePoint { struct SavePoint {
size_t size; // size of rep_ size_t size; // size of rep_
@ -96,8 +94,8 @@ struct SavePoints {
WriteBatch::WriteBatch(size_t reserved_bytes) WriteBatch::WriteBatch(size_t reserved_bytes)
: save_points_(nullptr), content_flags_(0), rep_() { : save_points_(nullptr), content_flags_(0), rep_() {
rep_.reserve((reserved_bytes > kHeader) ? reserved_bytes : kHeader); rep_.reserve((reserved_bytes > WriteBatchInternal::kHeader) ? reserved_bytes : WriteBatchInternal::kHeader);
rep_.resize(kHeader); rep_.resize(WriteBatchInternal::kHeader);
} }
WriteBatch::WriteBatch(const std::string& rep) WriteBatch::WriteBatch(const std::string& rep)

View File

@ -63,6 +63,10 @@ class ColumnFamilyMemTablesDefault : public ColumnFamilyMemTables {
// WriteBatch that we don't want in the public WriteBatch interface. // WriteBatch that we don't want in the public WriteBatch interface.
class WriteBatchInternal { class WriteBatchInternal {
public: public:
// WriteBatch header has an 8-byte sequence number followed by a 4-byte count.
static const size_t kHeader = 12;
// WriteBatch methods with column_family_id instead of ColumnFamilyHandle* // WriteBatch methods with column_family_id instead of ColumnFamilyHandle*
static void Put(WriteBatch* batch, uint32_t column_family_id, static void Put(WriteBatch* batch, uint32_t column_family_id,
const Slice& key, const Slice& value); const Slice& key, const Slice& value);

View File

@ -1576,7 +1576,7 @@ void DumpWalFile(std::string wal_file, bool print_header, bool print_values,
} }
while (reader.ReadRecord(&record, &scratch)) { while (reader.ReadRecord(&record, &scratch)) {
row.str(""); row.str("");
if (record.size() < 12) { if (record.size() < WriteBatchInternal::kHeader) {
reporter.Corruption(record.size(), reporter.Corruption(record.size(),
Status::Corruption("log record too small")); Status::Corruption("log record too small"));
} else { } else {