From 30e82d5c411d8d57c24fa98f955ab14f10e2a3d2 Mon Sep 17 00:00:00 2001 From: Dmitri Smirnov Date: Fri, 11 Sep 2015 09:57:02 -0700 Subject: [PATCH] Refactor to support file_reader_writer on Windows. Summary. A change https://reviews.facebook.net/differential/diff/224721/ Has attempted to move common functionality out of platform dependent code to a new facility called file_reader_writer. This includes: - perf counters - Buffering - RateLimiting However, the change did not attempt to refactor Windows code. To mitigate, we introduce new quering interfaces such as UseOSBuffer(), GetRequiredBufferAlignment() and ReaderWriterForward() for pure forwarding where required. Introduce WritableFile got a new method Truncate(). This is to communicate to the file as to how much data it has on close. - When space is pre-allocated on Linux it is filled with zeros implicitly, no such thing exist on Windows so we must truncate file on close. - When operating in unbuffered mode the last page is filled with zeros but we still want to truncate. Previously, Close() would take care of it but now buffer management is shifted to the wrappers and the file has no idea about the file true size. This means that Close() on the wrapper level must always include Truncate() as well as wrapper __dtor should call Close() and against double Close(). Move buffered/unbuffered write logic to the wrapper. Utilize Aligned buffer class. Adjust tests and implement Truncate() where necessary. Come up with reasonable defaults for new virtual interfaces. Forward calls for RandomAccessReadAhead class to avoid double buffering and locking (double locking in unbuffered mode on WIndows). --- db/db_bench.cc | 1 + db/fault_injection_test.cc | 1 + include/rocksdb/env.h | 32 ++- port/win/env_win.cc | 446 +++++++------------------------- util/aligned_buffer.h | 154 +++++++++++ util/db_test_util.h | 5 + util/env_posix.cc | 12 + util/env_test.cc | 1 + util/file_reader_writer.cc | 306 ++++++++++++++++------ util/file_reader_writer.h | 96 +++++-- util/file_reader_writer_test.cc | 3 + util/memenv.cc | 4 +- util/mock_env.cc | 4 +- util/testutil.h | 4 + 14 files changed, 613 insertions(+), 456 deletions(-) create mode 100644 util/aligned_buffer.h diff --git a/db/db_bench.cc b/db/db_bench.cc index ca05b2b207..a98e7c7d3a 100644 --- a/db/db_bench.cc +++ b/db/db_bench.cc @@ -898,6 +898,7 @@ class ReportFileOpEnv : public EnvWrapper { return rv; } + Status Truncate(uint64_t size) override { return target_->Truncate(size); } Status Close() override { return target_->Close(); } Status Flush() override { return target_->Flush(); } Status Sync() override { return target_->Sync(); } diff --git a/db/fault_injection_test.cc b/db/fault_injection_test.cc index 87028e381b..7ce18c8e71 100644 --- a/db/fault_injection_test.cc +++ b/db/fault_injection_test.cc @@ -141,6 +141,7 @@ class TestWritableFile : public WritableFile { FaultInjectionTestEnv* env); virtual ~TestWritableFile(); virtual Status Append(const Slice& data) override; + virtual Status Truncate(uint64_t size) override { return target_->Truncate(size); } virtual Status Close() override; virtual Status Flush() override; virtual Status Sync() override; diff --git a/include/rocksdb/env.h b/include/rocksdb/env.h index 093292f66d..b346effbad 100644 --- a/include/rocksdb/env.h +++ b/include/rocksdb/env.h @@ -393,6 +393,12 @@ class RandomAccessFile { virtual Status Read(uint64_t offset, size_t n, Slice* result, char* scratch) const = 0; + // Used by the file_reader_writer to decide if the ReadAhead wrapper + // should simply forward the call and do not enact buffering or locking. + virtual bool ReaderWriterForward() const { + return false; + } + // Tries to get an unique ID for this file that will be the same each time // the file is opened (and will stay the same while the file is open). // Furthermore, it tries to make this ID at most "max_size" bytes. If such an @@ -413,7 +419,6 @@ class RandomAccessFile { // compatibility. }; - enum AccessPattern { NORMAL, RANDOM, SEQUENTIAL, WILLNEED, DONTNEED }; virtual void Hint(AccessPattern pattern) {} @@ -438,7 +443,31 @@ class WritableFile { } virtual ~WritableFile(); + // Indicates if the class makes use of unbuffered I/O + virtual bool UseOSBuffer() const { + return true; + } + + // This is needed when you want to allocate + // AlignedBuffer for use with file I/O classes + // Used for unbuffered file I/O when UseOSBuffer() returns false + virtual size_t GetRequiredBufferAlignment() const { + return 4 * 1024; + } + virtual Status Append(const Slice& data) = 0; + + // Positional write for unbuffered access default forward + // to simple append as most of the tests are buffered by default + virtual Status Append(const Slice& /* data */, uint64_t /* offset */) { + return Status::NotSupported(); + } + + // Truncate is necessary to trim the file to the correct size + // before closing. It is not always possible to keep track of the file + // size due to whole pages writes. The behavior is undefined if called + // with other writes to follow. + virtual Status Truncate(uint64_t size) = 0; virtual Status Close() = 0; virtual Status Flush() = 0; virtual Status Sync() = 0; // sync data @@ -839,6 +868,7 @@ class WritableFileWrapper : public WritableFile { explicit WritableFileWrapper(WritableFile* t) : target_(t) { } Status Append(const Slice& data) override { return target_->Append(data); } + Status Truncate(uint64_t size) override { return target_->Truncate(size); } Status Close() override { return target_->Close(); } Status Flush() override { return target_->Flush(); } Status Sync() override { return target_->Sync(); } diff --git a/port/win/env_win.cc b/port/win/env_win.cc index 70c68b8fbc..c6d0cb3caf 100644 --- a/port/win/env_win.cc +++ b/port/win/env_win.cc @@ -30,6 +30,7 @@ #include "util/iostats_context_imp.h" #include "util/rate_limiter.h" #include "util/sync_point.h" +#include "util/aligned_buffer.h" #include "util/thread_status_updater.h" #include "util/thread_status_util.h" @@ -161,15 +162,6 @@ inline int fsync(HANDLE hFile) { return 0; } -inline size_t TruncateToPageBoundary(size_t page_size, size_t s) { - s -= (s & (page_size - 1)); - assert((s % page_size) == 0); - return s; -} - -// Roundup x to a multiple of y -inline size_t Roundup(size_t x, size_t y) { return ((x + y - 1) / y) * y; } - // SetFileInformationByHandle() is capable of fast pre-allocates. // However, this does not change the file end position unless the file is // truncated and the pre-allocated space is not considered filled with zeros. @@ -492,7 +484,6 @@ class WinMmapFile : public WritableFile { size_t n = std::min(left, avail); memcpy(dst_, src, n); - IOSTATS_ADD(bytes_written, n); dst_ += n; src += n; left -= n; @@ -502,6 +493,12 @@ class WinMmapFile : public WritableFile { return Status::OK(); } + // Means Close() will properly take care of truncate + // and it does not need any additional information + virtual Status Truncate(uint64_t size) override { + return Status::OK(); + } + virtual Status Close() override { Status s; @@ -612,94 +609,6 @@ class WinMmapFile : public WritableFile { } }; -// This class is to manage an aligned user -// allocated buffer for unbuffered I/O purposes -// though it does not make a difference if you need a buffer. -class AlignedBuffer { - const size_t alignment_; - std::unique_ptr buf_; - size_t capacity_; - size_t cursize_; - char* bufstart_; - - public: - explicit AlignedBuffer(size_t alignment) - : alignment_(alignment), capacity_(0), cursize_(0), bufstart_(nullptr) { - assert(alignment > 0); - assert((alignment & (alignment - 1)) == 0); - } - - size_t GetAlignment() const { return alignment_; } - - size_t GetCapacity() const { return capacity_; } - - size_t GetCurrentSize() const { return cursize_; } - - const char* GetBufferStart() const { return bufstart_; } - - void Clear() { cursize_ = 0; } - - // Allocates a new buffer and sets bufstart_ to the aligned first byte - void AllocateNewBuffer(size_t requestedCapacity) { - size_t size = Roundup(requestedCapacity, alignment_); - buf_.reset(new char[size + alignment_]); - - char* p = buf_.get(); - bufstart_ = reinterpret_cast( - (reinterpret_cast(p) + (alignment_ - 1)) & - ~static_cast(alignment_ - 1)); - capacity_ = size; - cursize_ = 0; - } - - // Used for write - // Returns the number of bytes appended - size_t Append(const char* src, size_t append_size) { - size_t buffer_remaining = capacity_ - cursize_; - size_t to_copy = std::min(append_size, buffer_remaining); - - if (to_copy > 0) { - memcpy(bufstart_ + cursize_, src, to_copy); - cursize_ += to_copy; - } - return to_copy; - } - - size_t Read(char* dest, size_t offset, size_t read_size) const { - assert(offset < cursize_); - size_t to_read = std::min(cursize_ - offset, read_size); - if (to_read > 0) { - memcpy(dest, bufstart_ + offset, to_read); - } - return to_read; - } - - /// Pad to alignment - void PadToAlignmentWith(int padding) { - size_t total_size = Roundup(cursize_, alignment_); - size_t pad_size = total_size - cursize_; - - if (pad_size > 0) { - assert((pad_size + cursize_) <= capacity_); - memset(bufstart_ + cursize_, padding, pad_size); - cursize_ += pad_size; - } - } - - // After a partial flush move the tail to the beginning of the buffer - void RefitTail(size_t tail_offset, size_t tail_size) { - if (tail_size > 0) { - memmove(bufstart_, bufstart_ + tail_offset, tail_size); - } - cursize_ = tail_size; - } - - // Returns place to start writing - char* GetDestination() { return bufstart_ + cursize_; } - - void SetSize(size_t cursize) { cursize_ = cursize; } -}; - class WinSequentialFile : public SequentialFile { private: const std::string filename_; @@ -734,7 +643,7 @@ class WinSequentialFile : public SequentialFile { // Windows ReadFile API accepts a DWORD. // While it is possible to read in a loop if n is > UINT_MAX // it is a highly unlikely case. - if (n > UINT_MAX) { + if (n > UINT_MAX) { return IOErrorFromWindowsError(filename_, ERROR_INVALID_PARAMETER); } @@ -747,8 +656,6 @@ class WinSequentialFile : public SequentialFile { return IOErrorFromWindowsError(filename_, GetLastError()); } - IOSTATS_ADD(bytes_read, r); - *result = Slice(scratch, r); return s; @@ -791,12 +698,13 @@ class WinRandomAccessFile : public RandomAccessFile { : filename_(fname), hFile_(hFile), use_os_buffer_(options.use_os_buffer), - buffer_(alignment), + buffer_(), buffered_start_(0) { assert(!options.use_mmap_reads); // Unbuffered access, use internal buffer for reads if (!use_os_buffer_) { + buffer_.SetAlignment(alignment); // Random read, no need in a big buffer // We read things in database blocks which are likely to be similar to // the alignment we use. @@ -854,11 +762,8 @@ class WinRandomAccessFile : public RandomAccessFile { } SSIZE_T read = 0; - { - IOSTATS_TIMER_GUARD(read_nanos); - read = pread(hFile_, buffer_.GetDestination(), actual_bytes_toread, - start_page_start); - } + read = pread(hFile_, buffer_.GetDestination(), actual_bytes_toread, + start_page_start); if (read > 0) { buffer_.SetSize(read); @@ -884,7 +789,6 @@ class WinRandomAccessFile : public RandomAccessFile { } } - IOSTATS_ADD_IF_POSITIVE(bytes_read, n - left); *result = Slice(scratch, (r < 0) ? 0 : n - left); if (r < 0) { @@ -893,6 +797,10 @@ class WinRandomAccessFile : public RandomAccessFile { return s; } + virtual bool ReaderWriterForward() const override { + return true; + } + virtual void Hint(AccessPattern pattern) override {} virtual Status InvalidateCache(size_t offset, size_t length) override { @@ -915,33 +823,23 @@ class WinRandomAccessFile : public RandomAccessFile { class WinWritableFile : public WritableFile { private: const std::string filename_; - HANDLE hFile_; - AlignedBuffer buffer_; - - uint64_t filesize_; // How much data is actually written disk - uint64_t reservedsize_; // how far we have reserved space - - bool pending_sync_; - - RateLimiter* rate_limiter_; - - const bool use_os_buffer_; // Used to indicate unbuffered access, the file - // must be opened as unbuffered if false + HANDLE hFile_; + const bool use_os_buffer_; // Used to indicate unbuffered access, the file + const uint64_t alignment_; + // must be opened as unbuffered if false + uint64_t filesize_; // How much data is actually written disk + uint64_t reservedsize_; // how far we have reserved space public: WinWritableFile(const std::string& fname, HANDLE hFile, size_t alignment, size_t capacity, const EnvOptions& options) : filename_(fname), hFile_(hFile), - buffer_(alignment), + use_os_buffer_(options.use_os_buffer), + alignment_(alignment), filesize_(0), - reservedsize_(0), - pending_sync_(false), - rate_limiter_(options.rate_limiter), - use_os_buffer_(options.use_os_buffer) { + reservedsize_(0) { assert(!options.use_mmap_writes); - - buffer_.AllocateNewBuffer(capacity); } ~WinWritableFile() { @@ -950,106 +848,84 @@ class WinWritableFile : public WritableFile { } } + // Indicates if the class makes use of unbuffered I/O + virtual bool UseOSBuffer() const override { + return use_os_buffer_; + } + + virtual size_t GetRequiredBufferAlignment() const override { + return alignment_; + } + virtual Status Append(const Slice& data) override { - const char* src = data.data(); - assert(data.size() < INT_MAX); + // Used for buffered access ONLY + assert(use_os_buffer_); + assert(data.size() < std::numeric_limits::max()); - size_t left = data.size(); Status s; - pending_sync_ = true; - // This would call Alloc() if we are out of blocks - PrepareWrite(GetFileSize(), left); - - // Flush only when I/O is buffered - if (use_os_buffer_ && - (buffer_.GetCapacity() - buffer_.GetCurrentSize()) < left) { - if (buffer_.GetCurrentSize() > 0) { - s = Flush(); - if (!s.ok()) { - return s; - } - } - - if (buffer_.GetCapacity() < c_OneMB) { - size_t desiredCapacity = buffer_.GetCapacity() * 2; - desiredCapacity = std::min(desiredCapacity, c_OneMB); - buffer_.AllocateNewBuffer(desiredCapacity); - } - } - - // We always use the internal buffer for the unbuffered I/O - // or we simply use it for its original purpose to accumulate many small - // chunks - if (!use_os_buffer_ || (buffer_.GetCapacity() >= left)) { - while (left > 0) { - size_t appended = buffer_.Append(src, left); - left -= appended; - src += appended; - - if (left > 0) { - s = Flush(); - if (!s.ok()) { - break; - } - - size_t cursize = buffer_.GetCurrentSize(); - size_t capacity = buffer_.GetCapacity(); - - // We double the buffer here because - // Flush calls do not keep up with the incoming bytes - // This is the only place when buffer is changed with unbuffered I/O - if (cursize == 0 && capacity < c_OneMB) { - size_t desiredCapacity = capacity * 2; - desiredCapacity = std::min(desiredCapacity, c_OneMB); - buffer_.AllocateNewBuffer(desiredCapacity); - } - } - } + DWORD bytesWritten = 0; + if (!WriteFile(hFile_, data.data(), + data.size(), &bytesWritten, NULL)) { + auto lastError = GetLastError(); + s = IOErrorFromWindowsError( + "Failed to WriteFile: " + filename_, + lastError); } else { - // Writing directly to file bypassing what is in the buffer - assert(buffer_.GetCurrentSize() == 0); - // Use rate limiter for normal I/O very large request if available - s = WriteBuffered(src, left); + assert(size_t(bytesWritten) == data.size()); + filesize_ += data.size(); } return s; } - virtual Status Close() override { + virtual Status Append(const Slice& data, uint64_t offset) override { Status s; - // If there is any data in the cache not written we need to deal with it - const size_t cursize = buffer_.GetCurrentSize(); - const uint64_t final_size = filesize_ + cursize; + SSIZE_T ret = pwrite(hFile_, data.data(), + data.size(), offset); - if (cursize > 0) { - // If OS buffering is on, we just flush the remainder, otherwise need - if (!use_os_buffer_) { - s = WriteUnbuffered(); - } else { - s = WriteBuffered(buffer_.GetBufferStart(), cursize); - } + // Error break + if (ret < 0) { + auto lastError = GetLastError(); + s = IOErrorFromWindowsError( + "Failed to pwrite for: " + filename_, lastError); + } else { + // With positional write it is not clear at all + // if this actually extends the filesize + assert(size_t(ret) == data.size()); + filesize_ += data.size(); } + return s; + } + // Need to implement this so the file is truncated correctly + // when buffered and unbuffered mode + virtual Status Truncate(uint64_t size) override { + Status s = ftruncate(filename_, hFile_, size); if (s.ok()) { - s = ftruncate(filename_, hFile_, final_size); + filesize_ = size; } + return s; + } - // Sync data if buffer was flushed - if (s.ok() && (cursize > 0) && fsync(hFile_) < 0) { + virtual Status Close() override { + + Status s; + + assert(INVALID_HANDLE_VALUE != hFile_); + + if (fsync(hFile_) < 0) { auto lastError = GetLastError(); s = IOErrorFromWindowsError("fsync failed at Close() for: " + filename_, - lastError); + lastError); } if (FALSE == ::CloseHandle(hFile_)) { - if (s.ok()) { - auto lastError = GetLastError(); - s = IOErrorFromWindowsError("CloseHandle failed for: " + filename_, - lastError); - } + auto lastError = GetLastError(); + s = IOErrorFromWindowsError("CloseHandle failed for: " + filename_, + lastError); } hFile_ = INVALID_HANDLE_VALUE; @@ -1057,36 +933,18 @@ class WinWritableFile : public WritableFile { } // write out the cached data to the OS cache + // This is now taken care of the WritableFileWriter virtual Status Flush() override { - Status status; - - if (buffer_.GetCurrentSize() > 0) { - if (!use_os_buffer_) { - status = WriteUnbuffered(); - } else { - status = - WriteBuffered(buffer_.GetBufferStart(), buffer_.GetCurrentSize()); - if (status.ok()) { - buffer_.SetSize(0); - } - } - } - return status; + return Status::OK(); } virtual Status Sync() override { - Status s = Flush(); - if (!s.ok()) { - return s; - } - + Status s; // Calls flush buffers - if (pending_sync_ && fsync(hFile_) < 0) { + if (fsync(hFile_) < 0) { auto lastError = GetLastError(); s = IOErrorFromWindowsError("fsync failed at Sync() for: " + filename_, lastError); - } else { - pending_sync_ = false; } return s; } @@ -1094,7 +952,12 @@ class WinWritableFile : public WritableFile { virtual Status Fsync() override { return Sync(); } virtual uint64_t GetFileSize() override { - return filesize_ + buffer_.GetCurrentSize(); + // Double accounting now here with WritableFileWriter + // and this size will be wrong when unbuffered access is used + // but tests implement their own writable files and do not use WritableFileWrapper + // so we need to squeeze a square peg through + // a round hole here. + return filesize_; } virtual Status Allocate(off_t offset, off_t len) override { @@ -1104,7 +967,7 @@ class WinWritableFile : public WritableFile { // Make sure that we reserve an aligned amount of space // since the reservation block size is driven outside so we want // to check if we are ok with reservation here - size_t spaceToReserve = Roundup(offset + len, buffer_.GetAlignment()); + size_t spaceToReserve = Roundup(offset + len, alignment_); // Nothing to do if (spaceToReserve <= reservedsize_) { return status; @@ -1117,133 +980,6 @@ class WinWritableFile : public WritableFile { } return status; } - - private: - // This method writes to disk the specified data and makes use of the rate - // limiter - // if available - Status WriteBuffered(const char* data, size_t size) { - Status s; - assert(use_os_buffer_); - const char* src = data; - size_t left = size; - - size_t actually_written = 0; - - while (left > 0) { - size_t bytes_allowed = RequestToken(left, false); - - DWORD bytesWritten = 0; - if (!WriteFile(hFile_, src, bytes_allowed, &bytesWritten, NULL)) { - auto lastError = GetLastError(); - s = IOErrorFromWindowsError( - "Failed to write buffered via rate_limiter: " + filename_, - lastError); - break; - } else { - actually_written += bytesWritten; - src += bytesWritten; - left -= bytesWritten; - } - } - - IOSTATS_ADD(bytes_written, actually_written); - filesize_ += actually_written; - - return s; - } - - // This flushes the accumulated data in the buffer. We pad data with zeros if - // necessary to the whole page. - // However, during automatic flushes padding would not be necessary. - // We always use RateLimiter if available. We move (Refit) any buffer bytes - // that are left over the - // whole number of pages to be written again on the next flush because we can - // only write on aligned - // offsets. - Status WriteUnbuffered() { - Status s; - - assert(!use_os_buffer_); - size_t alignment = buffer_.GetAlignment(); - assert((filesize_ % alignment) == 0); - - // Calculate whole page final file advance if all writes succeed - size_t file_advance = - TruncateToPageBoundary(alignment, buffer_.GetCurrentSize()); - - // Calculate the leftover tail, we write it here padded with zeros BUT we - // will write - // it again in the future either on Close() OR when the current whole page - // fills out - size_t leftover_tail = buffer_.GetCurrentSize() - file_advance; - - // Round up and pad - buffer_.PadToAlignmentWith(0); - - const char* src = buffer_.GetBufferStart(); - size_t left = buffer_.GetCurrentSize(); - uint64_t file_offset = filesize_; - size_t actually_written = 0; - - while (left > 0) { - // Request how much is allowed. If this is less than one alignment we may - // be blocking a lot on every write - // because we can not write less than one alignment (page) unit thus check - // the configuration. - size_t bytes_allowed = RequestToken(left, true); - SSIZE_T ret = pwrite(hFile_, buffer_.GetBufferStart() + actually_written, - bytes_allowed, file_offset); - - // Error break - if (ret < 0) { - auto lastError = GetLastError(); - s = IOErrorFromWindowsError( - "Failed to pwrite for unbuffered: " + filename_, lastError); - buffer_.SetSize(file_advance + leftover_tail); - break; - } - actually_written += ret; - file_offset += ret; - left -= ret; - } - - IOSTATS_ADD(bytes_written, actually_written); - - if (s.ok()) { - // Move the tail to the beginning of the buffer - // This never happens during normal Append but rather during - // explicit call to Flush()/Sync() or Close() - buffer_.RefitTail(file_advance, leftover_tail); - // This is where we start writing next time which may or not be - // the actual file size on disk. They match if the buffer size - // is a multiple of whole pages otherwise filesize_ is leftover_tail - // behind - filesize_ += file_advance; - } - return s; - } - - // This truncates the request to a single burst bytes - // and then goes through the request to make sure we are - // satisfied in the order of the I/O priority - size_t RequestToken(size_t bytes, bool align) const { - if (rate_limiter_ && io_priority_ < Env::IO_TOTAL) { - bytes = std::min( - bytes, static_cast(rate_limiter_->GetSingleBurstBytes())); - - if (align) { - // Here we may actually require more than burst and block - // but we can not write less than one page at a time on unbuffered - // thus we may want not to use ratelimiter s - size_t alignment = buffer_.GetAlignment(); - bytes = std::max(alignment, TruncateToPageBoundary(alignment, bytes)); - } - - rate_limiter_->Request(bytes, io_priority_); - } - return bytes; - } }; class WinDirectory : public Directory { @@ -2092,7 +1828,7 @@ class WinEnv : public Env { ThreadPool* thread_pool_; size_t thread_id_; // Thread count in the thread. - explicit BGThreadMetadata(ThreadPool* thread_pool, size_t thread_id) + BGThreadMetadata(ThreadPool* thread_pool, size_t thread_id) : thread_pool_(thread_pool), thread_id_(thread_id) {} }; diff --git a/util/aligned_buffer.h b/util/aligned_buffer.h new file mode 100644 index 0000000000..7fa80926cc --- /dev/null +++ b/util/aligned_buffer.h @@ -0,0 +1,154 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. +#pragma once + +#include +#include "port/port.h" + +namespace rocksdb { + +inline size_t TruncateToPageBoundary(size_t page_size, size_t s) { + s -= (s & (page_size - 1)); + assert((s % page_size) == 0); + return s; +} + +inline size_t Roundup(size_t x, size_t y) { + return ((x + y - 1) / y) * y; +} + +// This class is to manage an aligned user +// allocated buffer for unbuffered I/O purposes +// though it does not make a difference if you need a buffer. +class AlignedBuffer { + size_t alignment_; + std::unique_ptr buf_; + size_t capacity_; + size_t cursize_; + char* bufstart_; + +public: + AlignedBuffer() + : alignment_(), + capacity_(0), + cursize_(0), + bufstart_(nullptr) { + } + + AlignedBuffer(AlignedBuffer&& o) ROCKSDB_NOEXCEPT { + *this = std::move(o); + } + + AlignedBuffer& operator=(AlignedBuffer&& o) ROCKSDB_NOEXCEPT { + alignment_ = std::move(o.alignment_); + buf_ = std::move(o.buf_); + capacity_ = std::move(o.capacity_); + cursize_ = std::move(o.cursize_); + bufstart_ = std::move(o.bufstart_); + return *this; + } + + AlignedBuffer(const AlignedBuffer&) = delete; + + AlignedBuffer& operator=(const AlignedBuffer&) = delete; + + size_t GetAlignment() const { + return alignment_; + } + + size_t GetCapacity() const { + return capacity_; + } + + size_t GetCurrentSize() const { + return cursize_; + } + + const char* GetBufferStart() const { + return bufstart_; + } + + void Clear() { + cursize_ = 0; + } + + void SetAlignment(size_t alignment) { + assert(alignment > 0); + assert((alignment & (alignment - 1)) == 0); + alignment_ = alignment; + } + + // Allocates a new buffer and sets bufstart_ to the aligned first byte + void AllocateNewBuffer(size_t requestedCapacity) { + + assert(alignment_ > 0); + assert((alignment_ & (alignment_ - 1)) == 0); + + size_t size = Roundup(requestedCapacity, alignment_); + buf_.reset(new char[size + alignment_]); + + char* p = buf_.get(); + bufstart_ = reinterpret_cast( + (reinterpret_cast(p)+(alignment_ - 1)) & + ~static_cast(alignment_ - 1)); + capacity_ = size; + cursize_ = 0; + } + // Used for write + // Returns the number of bytes appended + size_t Append(const char* src, size_t append_size) { + size_t buffer_remaining = capacity_ - cursize_; + size_t to_copy = std::min(append_size, buffer_remaining); + + if (to_copy > 0) { + memcpy(bufstart_ + cursize_, src, to_copy); + cursize_ += to_copy; + } + return to_copy; + } + + size_t Read(char* dest, size_t offset, size_t read_size) const { + assert(offset < cursize_); + size_t to_read = std::min(cursize_ - offset, read_size); + if (to_read > 0) { + memcpy(dest, bufstart_ + offset, to_read); + } + return to_read; + } + + /// Pad to alignment + void PadToAlignmentWith(int padding) { + size_t total_size = Roundup(cursize_, alignment_); + size_t pad_size = total_size - cursize_; + + if (pad_size > 0) { + assert((pad_size + cursize_) <= capacity_); + memset(bufstart_ + cursize_, padding, pad_size); + cursize_ += pad_size; + } + } + + // After a partial flush move the tail to the beginning of the buffer + void RefitTail(size_t tail_offset, size_t tail_size) { + if (tail_size > 0) { + memmove(bufstart_, bufstart_ + tail_offset, tail_size); + } + cursize_ = tail_size; + } + + // Returns place to start writing + char* GetDestination() { + return bufstart_ + cursize_; + } + + void SetSize(size_t cursize) { + cursize_ = cursize; + } +}; +} diff --git a/util/db_test_util.h b/util/db_test_util.h index bb1bfa7cb2..dfb2b3e6d4 100644 --- a/util/db_test_util.h +++ b/util/db_test_util.h @@ -148,6 +148,9 @@ class SpecialEnv : public EnvWrapper { return base_->Append(data); } } + Status Truncate(uint64_t size) override { + return base_->Truncate(size); + } Status Close() override { // SyncPoint is not supported in Released Windows Mode. #if !(defined NDEBUG) || !defined(OS_WIN) @@ -185,6 +188,7 @@ class SpecialEnv : public EnvWrapper { return base_->Append(data); } } + Status Truncate(uint64_t size) override { return base_->Truncate(size); } Status Close() override { return base_->Close(); } Status Flush() override { return base_->Flush(); } Status Sync() override { @@ -225,6 +229,7 @@ class SpecialEnv : public EnvWrapper { #endif return s; } + Status Truncate(uint64_t size) override { return base_->Truncate(size); } Status Close() override { return base_->Close(); } Status Flush() override { return base_->Flush(); } Status Sync() override { diff --git a/util/env_posix.cc b/util/env_posix.cc index af2ab8e5de..b856f1d708 100644 --- a/util/env_posix.cc +++ b/util/env_posix.cc @@ -494,6 +494,12 @@ class PosixMmapFile : public WritableFile { return Status::OK(); } + // Means Close() will properly take care of truncate + // and it does not need any additional information + virtual Status Truncate(uint64_t size) override { + return Status::OK(); + } + virtual Status Close() override { Status s; size_t unused = limit_ - dst_; @@ -624,6 +630,12 @@ class PosixWritableFile : public WritableFile { return Status::OK(); } + // Means Close() will properly take care of truncate + // and it does not need any additional information + virtual Status Truncate(uint64_t size) override { + return Status::OK(); + } + virtual Status Close() override { Status s; diff --git a/util/env_test.cc b/util/env_test.cc index 0e4feabfdd..6fe8ed80bc 100644 --- a/util/env_test.cc +++ b/util/env_test.cc @@ -989,6 +989,7 @@ TEST_F(EnvPosixTest, WritableFileWrapper) { } Status Append(const Slice& data) override { inc(1); return Status::OK(); } + Status Truncate(uint64_t size) override { return Status::OK(); } Status Close() override { inc(2); return Status::OK(); } Status Flush() override { inc(3); return Status::OK(); } Status Sync() override { inc(4); return Status::OK(); } diff --git a/util/file_reader_writer.cc b/util/file_reader_writer.cc index 1409698437..453bb7461f 100644 --- a/util/file_reader_writer.cc +++ b/util/file_reader_writer.cc @@ -59,81 +59,116 @@ Status WritableFileWriter::Append(const Slice& data) { TEST_SYNC_POINT("WritableFileWriter::Append:BeforePrepareWrite"); writable_file_->PrepareWrite(static_cast(GetFileSize()), left); } - // if there is no space in the cache, then flush - if (cursize_ + left > capacity_) { - s = Flush(); - if (!s.ok()) { - return s; + + // Flush only when I/O is buffered + if (use_os_buffer_ && + (buf_.GetCapacity() - buf_.GetCurrentSize()) < left) { + if (buf_.GetCurrentSize() > 0) { + s = Flush(); + if (!s.ok()) { + return s; + } } - // Increase the buffer size, but capped at 1MB - if (capacity_ < (1 << 20)) { - capacity_ *= 2; - buf_.reset(new char[capacity_]); + + if (buf_.GetCapacity() < (1 << 20)) { + size_t desiredCapacity = buf_.GetCapacity() * 2; + desiredCapacity = std::min(desiredCapacity, size_t(1 << 20)); + buf_.AllocateNewBuffer(desiredCapacity); } - assert(cursize_ == 0); + assert(buf_.GetCurrentSize() == 0); } - // if the write fits into the cache, then write to cache - // otherwise do a write() syscall to write to OS buffers. - if (cursize_ + left <= capacity_) { - memcpy(buf_.get() + cursize_, src, left); - cursize_ += left; - } else { - while (left != 0) { - size_t size = RequestToken(left); - { - IOSTATS_TIMER_GUARD(write_nanos); - s = writable_file_->Append(Slice(src, size)); + // We never write directly to disk with unbuffered I/O on. + // or we simply use it for its original purpose to accumulate many small + // chunks + if (!use_os_buffer_ || (buf_.GetCapacity() >= left)) { + while (left > 0) { + size_t appended = buf_.Append(src, left); + left -= appended; + src += appended; + + if (left > 0) { + s = Flush(); if (!s.ok()) { - return s; + break; + } + + // We double the buffer here because + // Flush calls do not keep up with the incoming bytes + // This is the only place when buffer is changed with unbuffered I/O + if (buf_.GetCapacity() < (1 << 20)) { + size_t desiredCapacity = buf_.GetCapacity() * 2; + desiredCapacity = std::min(desiredCapacity, size_t(1 << 20)); + buf_.AllocateNewBuffer(desiredCapacity); } } - IOSTATS_ADD(bytes_written, size); - TEST_KILL_RANDOM(rocksdb_kill_odds); - - left -= size; - src += size; } + } else { + // Writing directly to file bypassing the buffer + assert(buf_.GetCurrentSize() == 0); + s = WriteBuffered(src, left); } + TEST_KILL_RANDOM(rocksdb_kill_odds); filesize_ += data.size(); return Status::OK(); } Status WritableFileWriter::Close() { + + // Do not quit immediately on failure the file MUST be closed Status s; - s = Flush(); // flush cache to OS - if (!s.ok()) { + + // Possible to close it twice now as we MUST close + // in __dtor, simply flushing is not enough + // Windows when pre-allocating does not fill with zeros + // also with unbuffered access we also set the end of data. + if (!writable_file_) { return s; } + s = Flush(); // flush cache to OS + + // In unbuffered mode we write whole pages so + // we need to let the file know where data ends. + Status interim = writable_file_->Truncate(filesize_); + if (!interim.ok() && s.ok()) { + s = interim; + } + TEST_KILL_RANDOM(rocksdb_kill_odds); - return writable_file_->Close(); + interim = writable_file_->Close(); + if (!interim.ok() && s.ok()) { + s = interim; + } + + writable_file_.reset(); + + return s; } + // write out the cached data to the OS cache Status WritableFileWriter::Flush() { + Status s; TEST_KILL_RANDOM(rocksdb_kill_odds * REDUCE_ODDS2); - size_t left = cursize_; - char* src = buf_.get(); - while (left != 0) { - size_t size = RequestToken(left); - { - IOSTATS_TIMER_GUARD(write_nanos); - TEST_SYNC_POINT("WritableFileWriter::Flush:BeforeAppend"); - Status s = writable_file_->Append(Slice(src, size)); - if (!s.ok()) { - return s; - } - } - IOSTATS_ADD(bytes_written, size); - TEST_KILL_RANDOM(rocksdb_kill_odds * REDUCE_ODDS2); - left -= size; - src += size; - } - cursize_ = 0; - writable_file_->Flush(); + if (buf_.GetCurrentSize() > 0) { + if (use_os_buffer_) { + s = WriteBuffered(buf_.GetBufferStart(), buf_.GetCurrentSize()); + } else { + s = WriteUnbuffered(); + } + if (!s.ok()) { + return s; + } + } + + s = writable_file_->Flush(); + + if (!s.ok()) { + return s; + } // sync OS cache to disk for every bytes_per_sync_ // TODO: give log file and sst file different options (log @@ -147,21 +182,21 @@ Status WritableFileWriter::Flush() { // Xfs does neighbor page flushing outside of the specified ranges. We // need to make sure sync range is far from the write offset. if (!direct_io_ && bytes_per_sync_) { - uint64_t kBytesNotSyncRange = 1024 * 1024; // recent 1MB is not synced. - uint64_t kBytesAlignWhenSync = 4 * 1024; // Align 4KB. + const uint64_t kBytesNotSyncRange = 1024 * 1024; // recent 1MB is not synced. + const uint64_t kBytesAlignWhenSync = 4 * 1024; // Align 4KB. if (filesize_ > kBytesNotSyncRange) { uint64_t offset_sync_to = filesize_ - kBytesNotSyncRange; offset_sync_to -= offset_sync_to % kBytesAlignWhenSync; assert(offset_sync_to >= last_sync_size_); if (offset_sync_to > 0 && offset_sync_to - last_sync_size_ >= bytes_per_sync_) { - RangeSync(last_sync_size_, offset_sync_to - last_sync_size_); + s = RangeSync(last_sync_size_, offset_sync_to - last_sync_size_); last_sync_size_ = offset_sync_to; } } } - return Status::OK(); + return s; } Status WritableFileWriter::Sync(bool use_fsync) { @@ -214,27 +249,140 @@ Status WritableFileWriter::RangeSync(off_t offset, off_t nbytes) { return writable_file_->RangeSync(offset, nbytes); } -size_t WritableFileWriter::RequestToken(size_t bytes) { +size_t WritableFileWriter::RequestToken(size_t bytes, bool align) { Env::IOPriority io_priority; - if (rate_limiter_&&(io_priority = writable_file_->GetIOPriority()) < + if (rate_limiter_ && (io_priority = writable_file_->GetIOPriority()) < Env::IO_TOTAL) { - bytes = std::min(bytes, - static_cast(rate_limiter_->GetSingleBurstBytes())); + bytes = std::min( + bytes, static_cast(rate_limiter_->GetSingleBurstBytes())); + + if (align) { + // Here we may actually require more than burst and block + // but we can not write less than one page at a time on unbuffered + // thus we may want not to use ratelimiter s + size_t alignment = buf_.GetAlignment(); + bytes = std::max(alignment, TruncateToPageBoundary(alignment, bytes)); + } rate_limiter_->Request(bytes, io_priority); } return bytes; } +// This method writes to disk the specified data and makes use of the rate +// limiter if available +Status WritableFileWriter::WriteBuffered(const char* data, size_t size) { + Status s; + assert(use_os_buffer_); + const char* src = data; + size_t left = size; + + while (left > 0) { + size_t allowed = RequestToken(left, false); + + { + IOSTATS_TIMER_GUARD(write_nanos); + TEST_SYNC_POINT("WritableFileWriter::Flush:BeforeAppend"); + s = writable_file_->Append(Slice(src, allowed)); + if (!s.ok()) { + return s; + } + } + + IOSTATS_ADD(bytes_written, allowed); + TEST_KILL_RANDOM(rocksdb_kill_odds); + + left -= allowed; + src += allowed; + } + buf_.SetSize(0); + return s; +} + + +// This flushes the accumulated data in the buffer. We pad data with zeros if +// necessary to the whole page. +// However, during automatic flushes padding would not be necessary. +// We always use RateLimiter if available. We move (Refit) any buffer bytes +// that are left over the +// whole number of pages to be written again on the next flush because we can +// only write on aligned +// offsets. +Status WritableFileWriter::WriteUnbuffered() { + Status s; + + assert(!use_os_buffer_); + const size_t alignment = buf_.GetAlignment(); + assert((next_write_offset_ % alignment) == 0); + + // Calculate whole page final file advance if all writes succeed + size_t file_advance = + TruncateToPageBoundary(alignment, buf_.GetCurrentSize()); + + // Calculate the leftover tail, we write it here padded with zeros BUT we + // will write + // it again in the future either on Close() OR when the current whole page + // fills out + size_t leftover_tail = buf_.GetCurrentSize() - file_advance; + + // Round up and pad + buf_.PadToAlignmentWith(0); + + const char* src = buf_.GetBufferStart(); + uint64_t write_offset = next_write_offset_; + size_t left = buf_.GetCurrentSize(); + + while (left > 0) { + // Check how much is allowed + size_t size = RequestToken(left, true); + + { + IOSTATS_TIMER_GUARD(write_nanos); + TEST_SYNC_POINT("WritableFileWriter::Flush:BeforeAppend"); + // Unbuffered writes must be positional + s = writable_file_->Append(Slice(src, size), write_offset); + if (!s.ok()) { + buf_.SetSize(file_advance + leftover_tail); + return s; + } + } + + IOSTATS_ADD(bytes_written, size); + left -= size; + src += size; + write_offset += size; + assert((next_write_offset_ % alignment) == 0); + } + + if (s.ok()) { + // Move the tail to the beginning of the buffer + // This never happens during normal Append but rather during + // explicit call to Flush()/Sync() or Close() + buf_.RefitTail(file_advance, leftover_tail); + // This is where we start writing next time which may or not be + // the actual file size on disk. They match if the buffer size + // is a multiple of whole pages otherwise filesize_ is leftover_tail + // behind + next_write_offset_ += file_advance; + } + return s; +} + + namespace { class ReadaheadRandomAccessFile : public RandomAccessFile { public: - ReadaheadRandomAccessFile(std::unique_ptr file, - size_t readahead_size) - : file_(std::move(file)), - readahead_size_(readahead_size), - buffer_(new char[readahead_size_]), - buffer_offset_(0), - buffer_len_(0) {} + ReadaheadRandomAccessFile(std::unique_ptr&& file, + size_t readahead_size) + : file_(std::move(file)), + readahead_size_(readahead_size), + forward_calls_(file_->ReaderWriterForward()), + buffer_(new char[readahead_size_]), + buffer_offset_(0), + buffer_len_(0) {} + + ReadaheadRandomAccessFile(const ReadaheadRandomAccessFile&) = delete; + + ReadaheadRandomAccessFile& operator=(const ReadaheadRandomAccessFile&) = delete; virtual Status Read(uint64_t offset, size_t n, Slice* result, char* scratch) const override { @@ -242,14 +390,22 @@ class ReadaheadRandomAccessFile : public RandomAccessFile { return file_->Read(offset, n, result, scratch); } + // On Windows in unbuffered mode this will lead to double buffering + // and double locking so we avoid that. + // In normal mode Windows caches so much data from disk that we do + // not need readahead. + if (forward_calls_) { + return file_->Read(offset, n, result, scratch); + } + std::unique_lock lk(lock_); size_t copied = 0; // if offset between [buffer_offset_, buffer_offset_ + buffer_len> if (offset >= buffer_offset_ && offset < buffer_len_ + buffer_offset_) { uint64_t offset_in_buffer = offset - buffer_offset_; - copied = std::min(static_cast(buffer_len_) - offset_in_buffer, - static_cast(n)); + copied = std::min(static_cast(buffer_len_)-offset_in_buffer, + static_cast(n)); memcpy(scratch, buffer_.get() + offset_in_buffer, copied); if (copied == n) { // fully cached @@ -259,7 +415,7 @@ class ReadaheadRandomAccessFile : public RandomAccessFile { } Slice readahead_result; Status s = file_->Read(offset + copied, readahead_size_, &readahead_result, - buffer_.get()); + buffer_.get()); if (!s.ok()) { return s; } @@ -290,20 +446,20 @@ class ReadaheadRandomAccessFile : public RandomAccessFile { private: std::unique_ptr file_; - size_t readahead_size_; + size_t readahead_size_; + const bool forward_calls_; - mutable std::mutex lock_; + mutable std::mutex lock_; mutable std::unique_ptr buffer_; - mutable uint64_t buffer_offset_; - mutable size_t buffer_len_; + mutable uint64_t buffer_offset_; + mutable size_t buffer_len_; }; } // namespace std::unique_ptr NewReadaheadRandomAccessFile( - std::unique_ptr file, size_t readahead_size) { - std::unique_ptr wrapped_file( - new ReadaheadRandomAccessFile(std::move(file), readahead_size)); - return std::move(wrapped_file); + std::unique_ptr&& file, size_t readahead_size) { + return std::make_unique ( + std::move(file), readahead_size); } } // namespace rocksdb diff --git a/util/file_reader_writer.h b/util/file_reader_writer.h index 616d174a23..a6d52c8814 100644 --- a/util/file_reader_writer.h +++ b/util/file_reader_writer.h @@ -8,6 +8,8 @@ // found in the LICENSE file. See the AUTHORS file for names of contributors. #pragma once #include "rocksdb/env.h" +#include "util/aligned_buffer.h" +#include "port/port.h" namespace rocksdb { @@ -15,7 +17,7 @@ class Statistics; class HistogramImpl; std::unique_ptr NewReadaheadRandomAccessFile( - std::unique_ptr file, size_t readahead_size); + std::unique_ptr&& file, size_t readahead_size); class SequentialFileReader { private: @@ -24,6 +26,19 @@ class SequentialFileReader { public: explicit SequentialFileReader(std::unique_ptr&& _file) : file_(std::move(_file)) {} + + SequentialFileReader(SequentialFileReader&& o) ROCKSDB_NOEXCEPT { + *this = std::move(o); + } + + SequentialFileReader& operator=(SequentialFileReader&& o) ROCKSDB_NOEXCEPT { + file_ = std::move(o.file_); + return *this; + } + + SequentialFileReader(SequentialFileReader&) = delete; + SequentialFileReader& operator=(SequentialFileReader&) = delete; + Status Read(size_t n, Slice* result, char* scratch); Status Skip(uint64_t n); @@ -34,10 +49,10 @@ class SequentialFileReader { class RandomAccessFileReader : public RandomAccessFile { private: std::unique_ptr file_; - Env* env_; - Statistics* stats_; - uint32_t hist_type_; - HistogramImpl* file_read_hist_; + Env* env_; + Statistics* stats_; + uint32_t hist_type_; + HistogramImpl* file_read_hist_; public: explicit RandomAccessFileReader(std::unique_ptr&& raf, @@ -51,6 +66,22 @@ class RandomAccessFileReader : public RandomAccessFile { hist_type_(hist_type), file_read_hist_(file_read_hist) {} + RandomAccessFileReader(RandomAccessFileReader&& o) ROCKSDB_NOEXCEPT { + *this = std::move(o); + } + + RandomAccessFileReader& operator=(RandomAccessFileReader&& o) ROCKSDB_NOEXCEPT{ + file_ = std::move(o.file_); + env_ = std::move(o.env_); + stats_ = std::move(o.stats_); + hist_type_ = std::move(o.hist_type_); + file_read_hist_ = std::move(o.file_read_hist_); + return *this; + } + + RandomAccessFileReader(const RandomAccessFileReader&) = delete; + RandomAccessFileReader& operator=(const RandomAccessFileReader&) = delete; + Status Read(uint64_t offset, size_t n, Slice* result, char* scratch) const; RandomAccessFile* file() { return file_.get(); } @@ -60,33 +91,47 @@ class RandomAccessFileReader : public RandomAccessFile { class WritableFileWriter { private: std::unique_ptr writable_file_; - size_t cursize_; // current size of cached data in buf_ - size_t capacity_; // max size of buf_ - unique_ptr buf_; // a buffer to cache writes - uint64_t filesize_; - bool pending_sync_; - bool pending_fsync_; - bool direct_io_; - uint64_t last_sync_size_; - uint64_t bytes_per_sync_; - RateLimiter* rate_limiter_; + AlignedBuffer buf_; + // Actually written data size can be used for truncate + // not counting padding data + uint64_t filesize_; + // This is necessary when we use unbuffered access + // and writes must happen on aligned offsets + // so we need to go back and write that page again + uint64_t next_write_offset_; + bool pending_sync_; + bool pending_fsync_; + const bool direct_io_; + const bool use_os_buffer_; + uint64_t last_sync_size_; + uint64_t bytes_per_sync_; + RateLimiter* rate_limiter_; public: - explicit WritableFileWriter(std::unique_ptr&& file, - const EnvOptions& options) + WritableFileWriter(std::unique_ptr&& file, + const EnvOptions& options) : writable_file_(std::move(file)), - cursize_(0), - capacity_(65536), - buf_(new char[capacity_]), + buf_(), filesize_(0), + next_write_offset_(0), pending_sync_(false), pending_fsync_(false), direct_io_(writable_file_->UseDirectIO()), + use_os_buffer_(writable_file_->UseOSBuffer()), last_sync_size_(0), bytes_per_sync_(options.bytes_per_sync), - rate_limiter_(options.rate_limiter) {} + rate_limiter_(options.rate_limiter) { + + buf_.SetAlignment(writable_file_->GetRequiredBufferAlignment()); + buf_.AllocateNewBuffer(65536); + } + + WritableFileWriter(const WritableFileWriter&) = delete; + + WritableFileWriter& operator=(const WritableFileWriter&) = delete; + + ~WritableFileWriter() { Close(); } - ~WritableFileWriter() { Flush(); } Status Append(const Slice& data); Status Flush(); @@ -109,8 +154,13 @@ class WritableFileWriter { WritableFile* writable_file() const { return writable_file_.get(); } private: + // Used when os buffering is OFF and we are writing + // DMA such as in Windows unbuffered mode + Status WriteUnbuffered(); + // Normal write + Status WriteBuffered(const char* data, size_t size); Status RangeSync(off_t offset, off_t nbytes); - size_t RequestToken(size_t bytes); + size_t RequestToken(size_t bytes, bool align); Status SyncInternal(bool use_fsync); }; } // namespace rocksdb diff --git a/util/file_reader_writer_test.cc b/util/file_reader_writer_test.cc index 924c171b70..d1f0dcbec7 100644 --- a/util/file_reader_writer_test.cc +++ b/util/file_reader_writer_test.cc @@ -24,6 +24,9 @@ TEST_F(WritableFileWriterTest, RangeSync) { size_ += data.size(); return Status::OK(); } + virtual Status Truncate(uint64_t size) override { + return Status::OK(); + } Status Close() override { EXPECT_GE(size_, last_synced_ + kMb); EXPECT_LT(size_, last_synced_ + 2 * kMb); diff --git a/util/memenv.cc b/util/memenv.cc index 72fdbd2421..5737370230 100644 --- a/util/memenv.cc +++ b/util/memenv.cc @@ -232,7 +232,9 @@ class WritableFileImpl : public WritableFile { virtual Status Append(const Slice& data) override { return file_->Append(data); } - + virtual Status Truncate(uint64_t size) override { + return Status::OK(); + } virtual Status Close() override { return Status::OK(); } virtual Status Flush() override { return Status::OK(); } virtual Status Sync() override { return Status::OK(); } diff --git a/util/mock_env.cc b/util/mock_env.cc index 088675071c..409e16e3af 100644 --- a/util/mock_env.cc +++ b/util/mock_env.cc @@ -250,7 +250,9 @@ class MockWritableFile : public WritableFile { } return Status::OK(); } - + virtual Status Truncate(uint64_t size) override { + return Status::OK(); + } virtual Status Close() override { return file_->Fsync(); } virtual Status Flush() override { return Status::OK(); } diff --git a/util/testutil.h b/util/testutil.h index 67a2aafad7..990a3ba818 100644 --- a/util/testutil.h +++ b/util/testutil.h @@ -184,6 +184,10 @@ class StringSink: public WritableFile { const std::string& contents() const { return contents_; } + virtual Status Truncate(uint64_t size) override { + contents_.resize(size); + return Status::OK(); + } virtual Status Close() override { return Status::OK(); } virtual Status Flush() override { if (reader_contents_ != nullptr) {