diff --git a/util/aligned_buffer.h b/util/aligned_buffer.h index 5924879cc7..0c2ec5059e 100644 --- a/util/aligned_buffer.h +++ b/util/aligned_buffer.h @@ -97,20 +97,31 @@ public: } // Allocates a new buffer and sets bufstart_ to the aligned first byte - void AllocateNewBuffer(size_t requestedCapacity) { - + void AllocateNewBuffer(size_t requested_capacity, bool copy_data = false) { assert(alignment_ > 0); assert((alignment_ & (alignment_ - 1)) == 0); - size_t size = Roundup(requestedCapacity, alignment_); - buf_.reset(new char[size + alignment_]); + if (copy_data && requested_capacity < cursize_) { + // If we are downsizing to a capacity that is smaller than the current + // data in the buffer. Ignore the request. + return; + } - char* p = buf_.get(); - bufstart_ = reinterpret_cast( - (reinterpret_cast(p)+(alignment_ - 1)) & - ~static_cast(alignment_ - 1)); - capacity_ = size; - cursize_ = 0; + size_t new_capacity = Roundup(requested_capacity, alignment_); + char* new_buf = new char[new_capacity + alignment_]; + char* new_bufstart = reinterpret_cast( + (reinterpret_cast(new_buf) + (alignment_ - 1)) & + ~static_cast(alignment_ - 1)); + + if (copy_data) { + memcpy(new_bufstart, bufstart_, cursize_); + } else { + cursize_ = 0; + } + + bufstart_ = new_bufstart; + capacity_ = new_capacity; + buf_.reset(new_buf); } // Used for write // Returns the number of bytes appended diff --git a/util/file_reader_writer.cc b/util/file_reader_writer.cc index 0e5d40f679..a29fe97150 100644 --- a/util/file_reader_writer.cc +++ b/util/file_reader_writer.cc @@ -124,6 +124,22 @@ Status WritableFileWriter::Append(const Slice& data) { writable_file_->PrepareWrite(static_cast(GetFileSize()), left); } + // See whether we need to enlarge the buffer to avoid the flush + if (buf_.Capacity() - buf_.CurrentSize() < left) { + for (size_t cap = buf_.Capacity(); + cap < max_buffer_size_; // There is still room to increase + cap *= 2) { + // See whether the next available size is large enough. + // Buffer will never be increased to more than max_buffer_size_. + size_t desired_capacity = std::min(cap * 2, max_buffer_size_); + if (desired_capacity - buf_.CurrentSize() >= left || + (use_direct_io() && desired_capacity == max_buffer_size_)) { + buf_.AllocateNewBuffer(desired_capacity, true); + break; + } + } + } + // Flush only when buffered I/O if (!use_direct_io() && (buf_.Capacity() - buf_.CurrentSize()) < left) { if (buf_.CurrentSize() > 0) { @@ -132,12 +148,6 @@ Status WritableFileWriter::Append(const Slice& data) { return s; } } - - if (buf_.Capacity() < max_buffer_size_) { - size_t desiredCapacity = buf_.Capacity() * 2; - desiredCapacity = std::min(desiredCapacity, max_buffer_size_); - buf_.AllocateNewBuffer(desiredCapacity); - } assert(buf_.CurrentSize() == 0); } @@ -155,15 +165,6 @@ Status WritableFileWriter::Append(const Slice& data) { if (!s.ok()) { break; } - - // We double the buffer here because - // Flush calls do not keep up with the incoming bytes - // This is the only place when buffer is changed with direct I/O - if (buf_.Capacity() < max_buffer_size_) { - size_t desiredCapacity = buf_.Capacity() * 2; - desiredCapacity = std::min(desiredCapacity, max_buffer_size_); - buf_.AllocateNewBuffer(desiredCapacity); - } } } } else { diff --git a/util/file_reader_writer.h b/util/file_reader_writer.h index 2823463e00..4a82e1ddc7 100644 --- a/util/file_reader_writer.h +++ b/util/file_reader_writer.h @@ -146,9 +146,7 @@ class WritableFileWriter { rate_limiter_(options.rate_limiter), stats_(stats) { buf_.Alignment(writable_file_->GetRequiredBufferAlignment()); - buf_.AllocateNewBuffer(use_direct_io() - ? max_buffer_size_ - : std::min((size_t)65536, max_buffer_size_)); + buf_.AllocateNewBuffer(std::min((size_t)65536, max_buffer_size_)); } WritableFileWriter(const WritableFileWriter&) = delete; diff --git a/util/file_reader_writer_test.cc b/util/file_reader_writer_test.cc index 02a2d3b59d..00a32fc253 100644 --- a/util/file_reader_writer_test.cc +++ b/util/file_reader_writer_test.cc @@ -89,6 +89,86 @@ TEST_F(WritableFileWriterTest, RangeSync) { writer->Close(); } +TEST_F(WritableFileWriterTest, IncrementalBuffer) { + class FakeWF : public WritableFile { + public: + explicit FakeWF(std::string* _file_data, bool _use_direct_io, + bool _no_flush) + : file_data_(_file_data), + use_direct_io_(_use_direct_io), + no_flush_(_no_flush) {} + ~FakeWF() {} + + Status Append(const Slice& data) override { + file_data_->append(data.data(), data.size()); + size_ += data.size(); + return Status::OK(); + } + Status PositionedAppend(const Slice& data, uint64_t pos) override { + EXPECT_TRUE(pos % 512 == 0); + EXPECT_TRUE(data.size() % 512 == 0); + file_data_->resize(pos); + file_data_->append(data.data(), data.size()); + size_ += data.size(); + return Status::OK(); + } + + virtual Status Truncate(uint64_t size) override { + file_data_->resize(size); + return Status::OK(); + } + Status Close() override { return Status::OK(); } + Status Flush() override { return Status::OK(); } + Status Sync() override { return Status::OK(); } + Status Fsync() override { return Status::OK(); } + void SetIOPriority(Env::IOPriority pri) override {} + uint64_t GetFileSize() override { return size_; } + void GetPreallocationStatus(size_t* block_size, + size_t* last_allocated_block) override {} + size_t GetUniqueId(char* id, size_t max_size) const override { return 0; } + Status InvalidateCache(size_t offset, size_t length) override { + return Status::OK(); + } + bool use_direct_io() const override { return use_direct_io_; } + + std::string* file_data_; + bool use_direct_io_; + bool no_flush_; + size_t size_ = 0; + }; + + Random r(301); + const int kNumAttempts = 50; + for (int attempt = 0; attempt < kNumAttempts; attempt++) { + bool no_flush = (attempt % 3 == 0); + EnvOptions env_options; + env_options.writable_file_max_buffer_size = + (attempt < kNumAttempts / 2) ? 512 * 1024 : 700 * 1024; + std::string actual; + unique_ptr wf(new FakeWF(&actual, attempt % 2 == 1, no_flush)); + unique_ptr writer( + new WritableFileWriter(std::move(wf), env_options)); + + std::string target; + for (int i = 0; i < 20; i++) { + uint32_t num = r.Skewed(16) * 100 + r.Uniform(100); + std::string random_string; + test::RandomString(&r, num, &random_string); + writer->Append(Slice(random_string.c_str(), num)); + target.append(random_string.c_str(), num); + + // In some attempts, flush in a chance of 1/10. + if (!no_flush && r.Uniform(10) == 0) { + writer->Flush(); + } + } + writer->Flush(); + writer->Close(); + ASSERT_EQ(target.size(), actual.size()); + ASSERT_EQ(target, actual); + } +} + #ifndef ROCKSDB_LITE TEST_F(WritableFileWriterTest, AppendStatusReturn) { class FakeWF : public WritableFile {