Add padding before timestamp size record if it doesn't fit into a WAL block. (#12614)

Summary:
If timestamp size record doesn't fit into a block, without padding `Writer::EmitPhysicalRecord` fails on assert (either `assert(block_offset_ + kHeaderSize + n <= kBlockSize);` or `assert(block_offset_ + kRecyclableHeaderSize + n <= kBlockSize)`, depending on whether recycling log files is enabled)  in debug build. In release, current block grows beyond 32K, `block_offset_` gets reset on next `AddRecord` and all the subsequent blocks are no longer aligned by block size.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/12614

Reviewed By: ltamasi

Differential Revision: D57302140

Pulled By: jowlyzhang

fbshipit-source-id: cacb5cefb7586885e52a8137ae23a95e1aefca2d
This commit is contained in:
Andrii Lysenko 2024-05-14 15:54:02 -07:00 committed by Facebook GitHub Bot
parent 1567d50a27
commit b9fc13db69
3 changed files with 55 additions and 10 deletions

View File

@ -774,6 +774,32 @@ TEST_P(LogTest, RecycleWithTimestampSize) {
ASSERT_EQ("EOF", Read());
}
// Validates that `MaybeAddUserDefinedTimestampSizeRecord`` adds padding to the
// tail of a block and switches to a new block, if there's not enough space for
// the record.
TEST_P(LogTest, TimestampSizeRecordPadding) {
bool recyclable_log = (std::get<0>(GetParam()) != 0);
const size_t header_size =
recyclable_log ? kRecyclableHeaderSize : kHeaderSize;
const size_t data_len = kBlockSize - 2 * header_size;
const auto first_str = BigString("foo", data_len);
Write(first_str);
UnorderedMap<uint32_t, size_t> ts_sz = {
{2, sizeof(uint64_t)},
};
ASSERT_OK(
writer_->MaybeAddUserDefinedTimestampSizeRecord(WriteOptions(), ts_sz));
ASSERT_LT(writer_->TEST_block_offset(), kBlockSize);
const auto second_str = BigString("bar", 1000);
Write(second_str);
ASSERT_EQ(first_str, Read());
CheckRecordAndTimestampSize(second_str, ts_sz);
}
// Do NOT enable compression for this instantiation.
INSTANTIATE_TEST_CASE_P(
Log, LogTest,

View File

@ -27,6 +27,8 @@ Writer::Writer(std::unique_ptr<WritableFileWriter>&& dest, uint64_t log_number,
block_offset_(0),
log_number_(log_number),
recycle_log_files_(recycle_log_files),
// Header size varies depending on whether we are recycling or not.
header_size_(recycle_log_files ? kRecyclableHeaderSize : kHeaderSize),
manual_flush_(manual_flush),
compression_type_(compression_type),
compress_(nullptr) {
@ -80,10 +82,6 @@ IOStatus Writer::AddRecord(const WriteOptions& write_options,
const char* ptr = slice.data();
size_t left = slice.size();
// Header size varies depending on whether we are recycling or not.
const int header_size =
recycle_log_files_ ? kRecyclableHeaderSize : kHeaderSize;
// Fragment the record if necessary and emit it. Note that if slice
// is empty, we still want to iterate once to emit a single
// zero-length record
@ -102,12 +100,12 @@ IOStatus Writer::AddRecord(const WriteOptions& write_options,
do {
const int64_t leftover = kBlockSize - block_offset_;
assert(leftover >= 0);
if (leftover < header_size) {
if (leftover < header_size_) {
// Switch to a new block
if (leftover > 0) {
// Fill the trailer (literal below relies on kHeaderSize and
// kRecyclableHeaderSize being <= 11)
assert(header_size <= 11);
assert(header_size_ <= 11);
s = dest_->Append(opts,
Slice("\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00",
static_cast<size_t>(leftover)),
@ -120,9 +118,9 @@ IOStatus Writer::AddRecord(const WriteOptions& write_options,
}
// Invariant: we never leave < header_size bytes in a block.
assert(static_cast<int64_t>(kBlockSize - block_offset_) >= header_size);
assert(static_cast<int64_t>(kBlockSize - block_offset_) >= header_size_);
const size_t avail = kBlockSize - block_offset_ - header_size;
const size_t avail = kBlockSize - block_offset_ - header_size_;
// Compress the record if compression is enabled.
// Compress() is called at least once (compress_start=true) and after the
@ -203,8 +201,7 @@ IOStatus Writer::AddCompressionTypeRecord(const WriteOptions& write_options) {
}
}
// Initialize fields required for compression
const size_t max_output_buffer_len =
kBlockSize - (recycle_log_files_ ? kRecyclableHeaderSize : kHeaderSize);
const size_t max_output_buffer_len = kBlockSize - header_size_;
CompressionOptions opts;
constexpr uint32_t compression_format_version = 2;
compress_ = StreamingCompress::Create(compression_type_, opts,
@ -244,6 +241,25 @@ IOStatus Writer::MaybeAddUserDefinedTimestampSizeRecord(
record.EncodeTo(&encoded);
RecordType type = recycle_log_files_ ? kRecyclableUserDefinedTimestampSizeType
: kUserDefinedTimestampSizeType;
// If there's not enough space for this record, switch to a new block.
const int64_t leftover = kBlockSize - block_offset_;
if (leftover < header_size_ + (int)encoded.size()) {
IOOptions opts;
IOStatus s = WritableFileWriter::PrepareIOOptions(write_options, opts);
if (!s.ok()) {
return s;
}
std::vector<char> trailer(leftover, '\x00');
s = dest_->Append(opts, Slice(trailer.data(), trailer.size()));
if (!s.ok()) {
return s;
}
block_offset_ = 0;
}
return EmitPhysicalRecord(write_options, type, encoded.data(),
encoded.size());
}

View File

@ -109,11 +109,14 @@ class Writer {
bool BufferIsEmpty();
size_t TEST_block_offset() const { return block_offset_; }
private:
std::unique_ptr<WritableFileWriter> dest_;
size_t block_offset_; // Current offset in block
uint64_t log_number_;
bool recycle_log_files_;
int header_size_;
// crc32c values for all supported record types. These are
// pre-computed to reduce the overhead of computing the crc of the