From b9fc13db69ebddc746a0a0ebad7f6cf37144a0cb Mon Sep 17 00:00:00 2001 From: Andrii Lysenko Date: Tue, 14 May 2024 15:54:02 -0700 Subject: [PATCH] Add padding before timestamp size record if it doesn't fit into a WAL block. (#12614) Summary: If timestamp size record doesn't fit into a block, without padding `Writer::EmitPhysicalRecord` fails on assert (either `assert(block_offset_ + kHeaderSize + n <= kBlockSize);` or `assert(block_offset_ + kRecyclableHeaderSize + n <= kBlockSize)`, depending on whether recycling log files is enabled) in debug build. In release, current block grows beyond 32K, `block_offset_` gets reset on next `AddRecord` and all the subsequent blocks are no longer aligned by block size. Pull Request resolved: https://github.com/facebook/rocksdb/pull/12614 Reviewed By: ltamasi Differential Revision: D57302140 Pulled By: jowlyzhang fbshipit-source-id: cacb5cefb7586885e52a8137ae23a95e1aefca2d --- db/log_test.cc | 26 ++++++++++++++++++++++++++ db/log_writer.cc | 36 ++++++++++++++++++++++++++---------- db/log_writer.h | 3 +++ 3 files changed, 55 insertions(+), 10 deletions(-) diff --git a/db/log_test.cc b/db/log_test.cc index 79ff02a04b..57b6f64faa 100644 --- a/db/log_test.cc +++ b/db/log_test.cc @@ -774,6 +774,32 @@ TEST_P(LogTest, RecycleWithTimestampSize) { ASSERT_EQ("EOF", Read()); } +// Validates that `MaybeAddUserDefinedTimestampSizeRecord`` adds padding to the +// tail of a block and switches to a new block, if there's not enough space for +// the record. +TEST_P(LogTest, TimestampSizeRecordPadding) { + bool recyclable_log = (std::get<0>(GetParam()) != 0); + const size_t header_size = + recyclable_log ? kRecyclableHeaderSize : kHeaderSize; + const size_t data_len = kBlockSize - 2 * header_size; + + const auto first_str = BigString("foo", data_len); + Write(first_str); + + UnorderedMap ts_sz = { + {2, sizeof(uint64_t)}, + }; + ASSERT_OK( + writer_->MaybeAddUserDefinedTimestampSizeRecord(WriteOptions(), ts_sz)); + ASSERT_LT(writer_->TEST_block_offset(), kBlockSize); + + const auto second_str = BigString("bar", 1000); + Write(second_str); + + ASSERT_EQ(first_str, Read()); + CheckRecordAndTimestampSize(second_str, ts_sz); +} + // Do NOT enable compression for this instantiation. INSTANTIATE_TEST_CASE_P( Log, LogTest, diff --git a/db/log_writer.cc b/db/log_writer.cc index e61efc9eef..2cd6bbd788 100644 --- a/db/log_writer.cc +++ b/db/log_writer.cc @@ -27,6 +27,8 @@ Writer::Writer(std::unique_ptr&& dest, uint64_t log_number, block_offset_(0), log_number_(log_number), recycle_log_files_(recycle_log_files), + // Header size varies depending on whether we are recycling or not. + header_size_(recycle_log_files ? kRecyclableHeaderSize : kHeaderSize), manual_flush_(manual_flush), compression_type_(compression_type), compress_(nullptr) { @@ -80,10 +82,6 @@ IOStatus Writer::AddRecord(const WriteOptions& write_options, const char* ptr = slice.data(); size_t left = slice.size(); - // Header size varies depending on whether we are recycling or not. - const int header_size = - recycle_log_files_ ? kRecyclableHeaderSize : kHeaderSize; - // Fragment the record if necessary and emit it. Note that if slice // is empty, we still want to iterate once to emit a single // zero-length record @@ -102,12 +100,12 @@ IOStatus Writer::AddRecord(const WriteOptions& write_options, do { const int64_t leftover = kBlockSize - block_offset_; assert(leftover >= 0); - if (leftover < header_size) { + if (leftover < header_size_) { // Switch to a new block if (leftover > 0) { // Fill the trailer (literal below relies on kHeaderSize and // kRecyclableHeaderSize being <= 11) - assert(header_size <= 11); + assert(header_size_ <= 11); s = dest_->Append(opts, Slice("\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00", static_cast(leftover)), @@ -120,9 +118,9 @@ IOStatus Writer::AddRecord(const WriteOptions& write_options, } // Invariant: we never leave < header_size bytes in a block. - assert(static_cast(kBlockSize - block_offset_) >= header_size); + assert(static_cast(kBlockSize - block_offset_) >= header_size_); - const size_t avail = kBlockSize - block_offset_ - header_size; + const size_t avail = kBlockSize - block_offset_ - header_size_; // Compress the record if compression is enabled. // Compress() is called at least once (compress_start=true) and after the @@ -203,8 +201,7 @@ IOStatus Writer::AddCompressionTypeRecord(const WriteOptions& write_options) { } } // Initialize fields required for compression - const size_t max_output_buffer_len = - kBlockSize - (recycle_log_files_ ? kRecyclableHeaderSize : kHeaderSize); + const size_t max_output_buffer_len = kBlockSize - header_size_; CompressionOptions opts; constexpr uint32_t compression_format_version = 2; compress_ = StreamingCompress::Create(compression_type_, opts, @@ -244,6 +241,25 @@ IOStatus Writer::MaybeAddUserDefinedTimestampSizeRecord( record.EncodeTo(&encoded); RecordType type = recycle_log_files_ ? kRecyclableUserDefinedTimestampSizeType : kUserDefinedTimestampSizeType; + + // If there's not enough space for this record, switch to a new block. + const int64_t leftover = kBlockSize - block_offset_; + if (leftover < header_size_ + (int)encoded.size()) { + IOOptions opts; + IOStatus s = WritableFileWriter::PrepareIOOptions(write_options, opts); + if (!s.ok()) { + return s; + } + + std::vector trailer(leftover, '\x00'); + s = dest_->Append(opts, Slice(trailer.data(), trailer.size())); + if (!s.ok()) { + return s; + } + + block_offset_ = 0; + } + return EmitPhysicalRecord(write_options, type, encoded.data(), encoded.size()); } diff --git a/db/log_writer.h b/db/log_writer.h index 1bbf72569e..48fd3db7c2 100644 --- a/db/log_writer.h +++ b/db/log_writer.h @@ -109,11 +109,14 @@ class Writer { bool BufferIsEmpty(); + size_t TEST_block_offset() const { return block_offset_; } + private: std::unique_ptr dest_; size_t block_offset_; // Current offset in block uint64_t log_number_; bool recycle_log_files_; + int header_size_; // crc32c values for all supported record types. These are // pre-computed to reduce the overhead of computing the crc of the