log_writer: pass log number and whether recycling is enabled to ctor

When we recycle log files, we need to mix the log number into the CRC
for each record.  Note that for logs that don't get recycled (like the
manifest), we always pass a log_number of 0 and false.

Signed-off-by: Sage Weil <sage@redhat.com>
This commit is contained in:
Sage Weil 2015-10-08 13:07:15 -04:00
parent 666376150c
commit 5830c699f2
10 changed files with 78 additions and 62 deletions

View file

@ -198,7 +198,7 @@ class CompactionJobTest : public testing::Test {
unique_ptr<WritableFileWriter> file_writer(
new WritableFileWriter(std::move(file), env_options_));
{
log::Writer log(std::move(file_writer));
log::Writer log(std::move(file_writer), 0, false);
std::string record;
new_db.EncodeTo(&record);
s = log.AddRecord(record);

View file

@ -420,7 +420,7 @@ Status DBImpl::NewDB() {
file->SetPreallocationBlockSize(db_options_.manifest_preallocation_size);
unique_ptr<WritableFileWriter> file_writer(
new WritableFileWriter(std::move(file), env_options));
log::Writer log(std::move(file_writer));
log::Writer log(std::move(file_writer), 0, false);
std::string record;
new_db.EncodeTo(&record);
s = log.AddRecord(record);
@ -4117,7 +4117,9 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) {
mutable_cf_options.write_buffer_size);
unique_ptr<WritableFileWriter> file_writer(
new WritableFileWriter(std::move(lfile), opt_env_opt));
new_log = new log::Writer(std::move(file_writer));
new_log = new log::Writer(std::move(file_writer),
new_log_number,
db_options_.recycle_log_file_num > 0);
}
}
@ -4756,8 +4758,11 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname,
impl->logfile_number_ = new_log_number;
unique_ptr<WritableFileWriter> file_writer(
new WritableFileWriter(std::move(lfile), opt_env_options));
impl->logs_.emplace_back(new_log_number,
new log::Writer(std::move(file_writer)));
impl->logs_.emplace_back(
new_log_number,
new log::Writer(std::move(file_writer),
new_log_number,
impl->db_options_.recycle_log_file_num > 0));
// set column family handles
for (auto cf : column_families) {

View file

@ -5074,7 +5074,9 @@ class RecoveryTestHelper {
ASSERT_OK(db_options.env->NewWritableFile(fname, &file, env_options));
unique_ptr<WritableFileWriter> file_writer(
new WritableFileWriter(std::move(file), env_options));
current_log_writer.reset(new log::Writer(std::move(file_writer)));
current_log_writer.reset(new log::Writer(
std::move(file_writer), current_log_number,
db_options.recycle_log_file_num > 0));
for (int i = 0; i < kKeysPerWALFile; i++) {
std::string key = "key" + ToString(count++);

View file

@ -61,7 +61,7 @@ class FlushJobTest : public testing::Test {
unique_ptr<WritableFileWriter> file_writer(
new WritableFileWriter(std::move(file), EnvOptions()));
{
log::Writer log(std::move(file_writer));
log::Writer log(std::move(file_writer), 0, false);
std::string record;
new_db.EncodeTo(&record);
s = log.AddRecord(record);

View file

@ -43,7 +43,7 @@ static std::string RandomSkewedString(int i, Random* rnd) {
return BigString(NumberString(i), rnd->Skewed(17));
}
class LogTest : public testing::Test {
class LogTest : public ::testing::TestWithParam<int> {
private:
class StringSource : public SequentialFile {
public:
@ -158,12 +158,11 @@ class LogTest : public testing::Test {
public:
LogTest()
: reader_contents_(),
dest_holder_(
test::GetWritableFileWriter(
dest_holder_(test::GetWritableFileWriter(
new test::StringSink(&reader_contents_))),
source_holder_(
test::GetSequentialFileReader(new StringSource(reader_contents_))),
writer_(std::move(dest_holder_)),
writer_(std::move(dest_holder_), 123, GetParam()),
reader_(std::move(source_holder_), &report_, true /*checksum*/,
0 /*initial_offset*/) {}
@ -298,9 +297,9 @@ uint64_t LogTest::initial_offset_last_record_offsets_[] =
2 * (kHeaderSize + 10000) +
(2 * log::kBlockSize - 1000) + 3 * kHeaderSize};
TEST_F(LogTest, Empty) { ASSERT_EQ("EOF", Read()); }
TEST_P(LogTest, Empty) { ASSERT_EQ("EOF", Read()); }
TEST_F(LogTest, ReadWrite) {
TEST_P(LogTest, ReadWrite) {
Write("foo");
Write("bar");
Write("");
@ -313,7 +312,7 @@ TEST_F(LogTest, ReadWrite) {
ASSERT_EQ("EOF", Read()); // Make sure reads at eof work
}
TEST_F(LogTest, ManyBlocks) {
TEST_P(LogTest, ManyBlocks) {
for (int i = 0; i < 100000; i++) {
Write(NumberString(i));
}
@ -323,7 +322,7 @@ TEST_F(LogTest, ManyBlocks) {
ASSERT_EQ("EOF", Read());
}
TEST_F(LogTest, Fragmentation) {
TEST_P(LogTest, Fragmentation) {
Write("small");
Write(BigString("medium", 50000));
Write(BigString("large", 100000));
@ -333,7 +332,7 @@ TEST_F(LogTest, Fragmentation) {
ASSERT_EQ("EOF", Read());
}
TEST_F(LogTest, MarginalTrailer) {
TEST_P(LogTest, MarginalTrailer) {
// Make a trailer that is exactly the same length as an empty record.
const int n = kBlockSize - 2*kHeaderSize;
Write(BigString("foo", n));
@ -346,7 +345,7 @@ TEST_F(LogTest, MarginalTrailer) {
ASSERT_EQ("EOF", Read());
}
TEST_F(LogTest, MarginalTrailer2) {
TEST_P(LogTest, MarginalTrailer2) {
// Make a trailer that is exactly the same length as an empty record.
const int n = kBlockSize - 2*kHeaderSize;
Write(BigString("foo", n));
@ -359,7 +358,7 @@ TEST_F(LogTest, MarginalTrailer2) {
ASSERT_EQ("", ReportMessage());
}
TEST_F(LogTest, ShortTrailer) {
TEST_P(LogTest, ShortTrailer) {
const int n = kBlockSize - 2*kHeaderSize + 4;
Write(BigString("foo", n));
ASSERT_EQ((unsigned int)(kBlockSize - kHeaderSize + 4), WrittenBytes());
@ -371,7 +370,7 @@ TEST_F(LogTest, ShortTrailer) {
ASSERT_EQ("EOF", Read());
}
TEST_F(LogTest, AlignedEof) {
TEST_P(LogTest, AlignedEof) {
const int n = kBlockSize - 2*kHeaderSize + 4;
Write(BigString("foo", n));
ASSERT_EQ((unsigned int)(kBlockSize - kHeaderSize + 4), WrittenBytes());
@ -379,7 +378,7 @@ TEST_F(LogTest, AlignedEof) {
ASSERT_EQ("EOF", Read());
}
TEST_F(LogTest, RandomRead) {
TEST_P(LogTest, RandomRead) {
const int N = 500;
Random write_rnd(301);
for (int i = 0; i < N; i++) {
@ -394,7 +393,7 @@ TEST_F(LogTest, RandomRead) {
// Tests of all the error paths in log_reader.cc follow:
TEST_F(LogTest, ReadError) {
TEST_P(LogTest, ReadError) {
Write("foo");
ForceError();
ASSERT_EQ("EOF", Read());
@ -402,7 +401,7 @@ TEST_F(LogTest, ReadError) {
ASSERT_EQ("OK", MatchError("read error"));
}
TEST_F(LogTest, BadRecordType) {
TEST_P(LogTest, BadRecordType) {
Write("foo");
// Type is stored in header[6]
IncrementByte(6, 100);
@ -412,7 +411,7 @@ TEST_F(LogTest, BadRecordType) {
ASSERT_EQ("OK", MatchError("unknown record type"));
}
TEST_F(LogTest, TruncatedTrailingRecordIsIgnored) {
TEST_P(LogTest, TruncatedTrailingRecordIsIgnored) {
Write("foo");
ShrinkSize(4); // Drop all payload as well as a header byte
ASSERT_EQ("EOF", Read());
@ -421,7 +420,7 @@ TEST_F(LogTest, TruncatedTrailingRecordIsIgnored) {
ASSERT_EQ("", ReportMessage());
}
TEST_F(LogTest, TruncatedTrailingRecordIsNotIgnored) {
TEST_P(LogTest, TruncatedTrailingRecordIsNotIgnored) {
Write("foo");
ShrinkSize(4); // Drop all payload as well as a header byte
ASSERT_EQ("EOF", Read(/*report_eof_inconsistency*/ true));
@ -430,7 +429,7 @@ TEST_F(LogTest, TruncatedTrailingRecordIsNotIgnored) {
ASSERT_EQ("OK", MatchError("Corruption: truncated header"));
}
TEST_F(LogTest, BadLength) {
TEST_P(LogTest, BadLength) {
const int kPayloadSize = kBlockSize - kHeaderSize;
Write(BigString("bar", kPayloadSize));
Write("foo");
@ -441,7 +440,7 @@ TEST_F(LogTest, BadLength) {
ASSERT_EQ("OK", MatchError("bad record length"));
}
TEST_F(LogTest, BadLengthAtEndIsIgnored) {
TEST_P(LogTest, BadLengthAtEndIsIgnored) {
Write("foo");
ShrinkSize(1);
ASSERT_EQ("EOF", Read());
@ -449,7 +448,7 @@ TEST_F(LogTest, BadLengthAtEndIsIgnored) {
ASSERT_EQ("", ReportMessage());
}
TEST_F(LogTest, BadLengthAtEndIsNotIgnored) {
TEST_P(LogTest, BadLengthAtEndIsNotIgnored) {
Write("foo");
ShrinkSize(1);
ASSERT_EQ("EOF", Read(/*report_eof_inconsistency=*/true));
@ -457,7 +456,7 @@ TEST_F(LogTest, BadLengthAtEndIsNotIgnored) {
ASSERT_EQ("OK", MatchError("Corruption: truncated header"));
}
TEST_F(LogTest, ChecksumMismatch) {
TEST_P(LogTest, ChecksumMismatch) {
Write("foo");
IncrementByte(0, 10);
ASSERT_EQ("EOF", Read());
@ -465,7 +464,7 @@ TEST_F(LogTest, ChecksumMismatch) {
ASSERT_EQ("OK", MatchError("checksum mismatch"));
}
TEST_F(LogTest, UnexpectedMiddleType) {
TEST_P(LogTest, UnexpectedMiddleType) {
Write("foo");
SetByte(6, kMiddleType);
FixChecksum(0, 3);
@ -474,7 +473,7 @@ TEST_F(LogTest, UnexpectedMiddleType) {
ASSERT_EQ("OK", MatchError("missing start"));
}
TEST_F(LogTest, UnexpectedLastType) {
TEST_P(LogTest, UnexpectedLastType) {
Write("foo");
SetByte(6, kLastType);
FixChecksum(0, 3);
@ -483,7 +482,7 @@ TEST_F(LogTest, UnexpectedLastType) {
ASSERT_EQ("OK", MatchError("missing start"));
}
TEST_F(LogTest, UnexpectedFullType) {
TEST_P(LogTest, UnexpectedFullType) {
Write("foo");
Write("bar");
SetByte(6, kFirstType);
@ -494,7 +493,7 @@ TEST_F(LogTest, UnexpectedFullType) {
ASSERT_EQ("OK", MatchError("partial record without end"));
}
TEST_F(LogTest, UnexpectedFirstType) {
TEST_P(LogTest, UnexpectedFirstType) {
Write("foo");
Write(BigString("bar", 100000));
SetByte(6, kFirstType);
@ -505,7 +504,7 @@ TEST_F(LogTest, UnexpectedFirstType) {
ASSERT_EQ("OK", MatchError("partial record without end"));
}
TEST_F(LogTest, MissingLastIsIgnored) {
TEST_P(LogTest, MissingLastIsIgnored) {
Write(BigString("bar", kBlockSize));
// Remove the LAST block, including header.
ShrinkSize(14);
@ -514,7 +513,7 @@ TEST_F(LogTest, MissingLastIsIgnored) {
ASSERT_EQ(0U, DroppedBytes());
}
TEST_F(LogTest, MissingLastIsNotIgnored) {
TEST_P(LogTest, MissingLastIsNotIgnored) {
Write(BigString("bar", kBlockSize));
// Remove the LAST block, including header.
ShrinkSize(14);
@ -523,7 +522,7 @@ TEST_F(LogTest, MissingLastIsNotIgnored) {
ASSERT_EQ("OK", MatchError("Corruption: error reading trailing data"));
}
TEST_F(LogTest, PartialLastIsIgnored) {
TEST_P(LogTest, PartialLastIsIgnored) {
Write(BigString("bar", kBlockSize));
// Cause a bad record length in the LAST block.
ShrinkSize(1);
@ -532,7 +531,7 @@ TEST_F(LogTest, PartialLastIsIgnored) {
ASSERT_EQ(0U, DroppedBytes());
}
TEST_F(LogTest, PartialLastIsNotIgnored) {
TEST_P(LogTest, PartialLastIsNotIgnored) {
Write(BigString("bar", kBlockSize));
// Cause a bad record length in the LAST block.
ShrinkSize(1);
@ -543,7 +542,7 @@ TEST_F(LogTest, PartialLastIsNotIgnored) {
"error reading trailing data"));
}
TEST_F(LogTest, ErrorJoinsRecords) {
TEST_P(LogTest, ErrorJoinsRecords) {
// Consider two fragmented records:
// first(R1) last(R1) first(R2) last(R2)
// where the middle two fragments disappear. We do not want
@ -566,43 +565,43 @@ TEST_F(LogTest, ErrorJoinsRecords) {
ASSERT_GE(dropped, 2 * kBlockSize);
}
TEST_F(LogTest, ReadStart) { CheckInitialOffsetRecord(0, 0); }
TEST_P(LogTest, ReadStart) { CheckInitialOffsetRecord(0, 0); }
TEST_F(LogTest, ReadSecondOneOff) { CheckInitialOffsetRecord(1, 1); }
TEST_P(LogTest, ReadSecondOneOff) { CheckInitialOffsetRecord(1, 1); }
TEST_F(LogTest, ReadSecondTenThousand) { CheckInitialOffsetRecord(10000, 1); }
TEST_P(LogTest, ReadSecondTenThousand) { CheckInitialOffsetRecord(10000, 1); }
TEST_F(LogTest, ReadSecondStart) { CheckInitialOffsetRecord(10007, 1); }
TEST_P(LogTest, ReadSecondStart) { CheckInitialOffsetRecord(10007, 1); }
TEST_F(LogTest, ReadThirdOneOff) { CheckInitialOffsetRecord(10008, 2); }
TEST_P(LogTest, ReadThirdOneOff) { CheckInitialOffsetRecord(10008, 2); }
TEST_F(LogTest, ReadThirdStart) { CheckInitialOffsetRecord(20014, 2); }
TEST_P(LogTest, ReadThirdStart) { CheckInitialOffsetRecord(20014, 2); }
TEST_F(LogTest, ReadFourthOneOff) { CheckInitialOffsetRecord(20015, 3); }
TEST_P(LogTest, ReadFourthOneOff) { CheckInitialOffsetRecord(20015, 3); }
TEST_F(LogTest, ReadFourthFirstBlockTrailer) {
TEST_P(LogTest, ReadFourthFirstBlockTrailer) {
CheckInitialOffsetRecord(log::kBlockSize - 4, 3);
}
TEST_F(LogTest, ReadFourthMiddleBlock) {
TEST_P(LogTest, ReadFourthMiddleBlock) {
CheckInitialOffsetRecord(log::kBlockSize + 1, 3);
}
TEST_F(LogTest, ReadFourthLastBlock) {
TEST_P(LogTest, ReadFourthLastBlock) {
CheckInitialOffsetRecord(2 * log::kBlockSize + 1, 3);
}
TEST_F(LogTest, ReadFourthStart) {
TEST_P(LogTest, ReadFourthStart) {
CheckInitialOffsetRecord(
2 * (kHeaderSize + 1000) + (2 * log::kBlockSize - 1000) + 3 * kHeaderSize,
3);
}
TEST_F(LogTest, ReadEnd) { CheckOffsetPastEndReturnsNoRecords(0); }
TEST_P(LogTest, ReadEnd) { CheckOffsetPastEndReturnsNoRecords(0); }
TEST_F(LogTest, ReadPastEnd) { CheckOffsetPastEndReturnsNoRecords(5); }
TEST_P(LogTest, ReadPastEnd) { CheckOffsetPastEndReturnsNoRecords(5); }
TEST_F(LogTest, ClearEofSingleBlock) {
TEST_P(LogTest, ClearEofSingleBlock) {
Write("foo");
Write("bar");
ForceEOF(3 + kHeaderSize + 2);
@ -617,7 +616,7 @@ TEST_F(LogTest, ClearEofSingleBlock) {
ASSERT_TRUE(IsEOF());
}
TEST_F(LogTest, ClearEofMultiBlock) {
TEST_P(LogTest, ClearEofMultiBlock) {
size_t num_full_blocks = 5;
size_t n = (kBlockSize - kHeaderSize) * num_full_blocks + 25;
Write(BigString("foo", n));
@ -634,7 +633,7 @@ TEST_F(LogTest, ClearEofMultiBlock) {
ASSERT_TRUE(IsEOF());
}
TEST_F(LogTest, ClearEofError) {
TEST_P(LogTest, ClearEofError) {
// If an error occurs during Read() in UnmarkEOF(), the records contained
// in the buffer should be returned on subsequent calls of ReadRecord()
// until no more full records are left, whereafter ReadRecord() should return
@ -652,7 +651,7 @@ TEST_F(LogTest, ClearEofError) {
ASSERT_EQ("EOF", Read());
}
TEST_F(LogTest, ClearEofError2) {
TEST_P(LogTest, ClearEofError2) {
Write("foo");
Write("bar");
UnmarkEOF();
@ -666,6 +665,8 @@ TEST_F(LogTest, ClearEofError2) {
ASSERT_EQ("OK", MatchError("read error"));
}
INSTANTIATE_TEST_CASE_P(bool, LogTest, ::testing::Values(0, 2));
} // namespace log
} // namespace rocksdb

View file

@ -18,8 +18,12 @@
namespace rocksdb {
namespace log {
Writer::Writer(unique_ptr<WritableFileWriter>&& dest)
: dest_(std::move(dest)), block_offset_(0) {
Writer::Writer(unique_ptr<WritableFileWriter>&& dest,
uint64_t log_number, bool recycle_log_files)
: dest_(std::move(dest)),
block_offset_(0),
log_number_(log_number),
recycle_log_files_(recycle_log_files) {
for (int i = 0; i <= kMaxRecordType; i++) {
char t = static_cast<char>(i);
type_crc_[i] = crc32c::Value(&t, 1);

View file

@ -63,7 +63,8 @@ class Writer {
// Create a writer that will append data to "*dest".
// "*dest" must be initially empty.
// "*dest" must remain live while this Writer is in use.
explicit Writer(unique_ptr<WritableFileWriter>&& dest);
explicit Writer(unique_ptr<WritableFileWriter>&& dest,
uint64_t log_number, bool recycle_log_files);
~Writer();
Status AddRecord(const Slice& slice);
@ -74,6 +75,8 @@ class Writer {
private:
unique_ptr<WritableFileWriter> dest_;
int block_offset_; // Current offset in block
uint64_t log_number_;
bool recycle_log_files_;
// crc32c values for all supported record types. These are
// pre-computed to reduce the overhead of computing the crc of the

View file

@ -413,7 +413,7 @@ class Repairer {
{
unique_ptr<WritableFileWriter> file_writer(
new WritableFileWriter(std::move(file), env_options));
log::Writer log(std::move(file_writer));
log::Writer log(std::move(file_writer), 0, false);
std::string record;
edit_->EncodeTo(&record);
status = log.AddRecord(record);

View file

@ -2112,7 +2112,7 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data,
unique_ptr<WritableFileWriter> file_writer(
new WritableFileWriter(std::move(descriptor_file), opt_env_opts));
descriptor_log_.reset(new log::Writer(std::move(file_writer)));
descriptor_log_.reset(new log::Writer(std::move(file_writer), 0, false));
s = WriteSnapshot(descriptor_log_.get());
}
}

View file

@ -77,7 +77,7 @@ class WalManagerTest : public testing::Test {
ASSERT_OK(env_->NewWritableFile(fname, &file, env_options_));
unique_ptr<WritableFileWriter> file_writer(
new WritableFileWriter(std::move(file), env_options_));
current_log_writer_.reset(new log::Writer(std::move(file_writer)));
current_log_writer_.reset(new log::Writer(std::move(file_writer), 0, false));
}
void CreateArchiveLogs(int num_logs, int entries_per_log) {
@ -127,7 +127,8 @@ TEST_F(WalManagerTest, ReadFirstRecordCache) {
unique_ptr<WritableFileWriter> file_writer(
new WritableFileWriter(std::move(file), EnvOptions()));
log::Writer writer(std::move(file_writer));
log::Writer writer(std::move(file_writer), 1,
db_options_.recycle_log_file_num > 0);
WriteBatch batch;
batch.Put("foo", "bar");
WriteBatchInternal::SetSequence(&batch, 10);