From 6fbc4f9f3e48ff18b381d9ec22dc3dcd58cb8bf0 Mon Sep 17 00:00:00 2001 From: Dmitri Smirnov Date: Tue, 27 Oct 2015 14:44:16 -0700 Subject: [PATCH 1/2] Implement smart buffer management. introduce a new DBOption random_access_max_buffer_size to limit the size of the random access buffer used for unbuffered access. Implement read ahead buffering when enabled. To that effect propagate compaction_readahead_size and the new option to the env options to make it available for the implementation. Add Hint() override so SetupForCompaction() call would call Hint() readahead can now be setup from both Hint() and EnableReadAhead() Add new option random_access_max_buffer_size support db_bench, options_helper to make it string parsable and the unit test. --- db/db_bench.cc | 3 + include/rocksdb/env.h | 11 +++ include/rocksdb/options.h | 14 ++++ port/win/env_win.cc | 156 +++++++++++++++++++++++++++++-------- util/env.cc | 2 + util/file_reader_writer.cc | 10 ++- util/options.cc | 6 ++ util/options_helper.h | 3 + util/options_test.cc | 2 + 9 files changed, 172 insertions(+), 35 deletions(-) diff --git a/db/db_bench.cc b/db/db_bench.cc index 9e11b56e82..9ba04a24b8 100644 --- a/db/db_bench.cc +++ b/db/db_bench.cc @@ -373,6 +373,8 @@ DEFINE_int32(new_table_reader_for_compaction_inputs, true, DEFINE_int32(compaction_readahead_size, 0, "Compaction readahead size"); +DEFINE_int32(random_access_max_buffer_size, 1024 * 1024, "Maximum windows randomaccess buffer size"); + DEFINE_int32(bloom_bits, -1, "Bloom filter bits per key. Negative means" " use default settings."); DEFINE_int32(memtable_bloom_bits, 0, "Bloom filter bits per key for memtable. " @@ -2295,6 +2297,7 @@ class Benchmark { options.new_table_reader_for_compaction_inputs = FLAGS_new_table_reader_for_compaction_inputs; options.compaction_readahead_size = FLAGS_compaction_readahead_size; + options.random_access_max_buffer_size = FLAGS_random_access_max_buffer_size; options.statistics = dbstats; if (FLAGS_enable_io_prio) { FLAGS_env->LowerThreadPoolIOPriority(Env::LOW); diff --git a/include/rocksdb/env.h b/include/rocksdb/env.h index 1dfd0f997b..7290d4b1d1 100644 --- a/include/rocksdb/env.h +++ b/include/rocksdb/env.h @@ -88,6 +88,12 @@ struct EnvOptions { // WAL writes bool fallocate_with_keep_size = true; + // See DBOPtions doc + size_t compaction_readahead_size; + + // See DBOPtions doc + size_t random_access_max_buffer_size; + // If not nullptr, write rate limiting is enabled for flush and compaction RateLimiter* rate_limiter = nullptr; }; @@ -408,6 +414,11 @@ class RandomAccessFile { return false; } + // For cases when read-ahead is implemented in the platform dependent + // layer + virtual void EnableReadAhead() { + } + // Tries to get an unique ID for this file that will be the same each time // the file is opened (and will stay the same while the file is open). // Furthermore, it tries to make this ID at most "max_size" bytes. If such an diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index a9830221ac..470faa52f6 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -1074,6 +1074,20 @@ struct DBOptions { // Default: 0 size_t compaction_readahead_size; + // This is a maximum buffer size that is used by WinMmapReadableFile in + // unbuffered disk I/O mode. We need to maintain an aligned buffer for + // reads. We allow the buffer to grow until the specified value and then + // for bigger requests allocate one shot buffers. In unbuffered mode we + // always bypass read-ahead buffer at ReadaheadRandomAccessFile + // When read-ahead is required we then make use of compaction_readahead_size + // value and always try to read ahead. With read-ahead we always + // pre-allocate buffer to the size instead of growing it up to a limit. + // + // This option is currently honored only on Windows + // + // Default: 1 Mb + size_t random_access_max_buffer_size; + // Use adaptive mutex, which spins in the user space before resorting // to kernel. This could reduce context switch when the mutex is not // heavily contended. However, if the mutex is hot, we could end up diff --git a/port/win/env_win.cc b/port/win/env_win.cc index c55cd30391..b672981505 100644 --- a/port/win/env_win.cc +++ b/port/win/env_win.cc @@ -688,28 +688,98 @@ class WinRandomAccessFile : public RandomAccessFile { const std::string filename_; HANDLE hFile_; const bool use_os_buffer_; + bool read_ahead_; + const size_t compaction_readahead_size_; + const size_t random_access_max_buffer_size_; mutable std::mutex buffer_mut_; mutable AlignedBuffer buffer_; mutable uint64_t buffered_start_; // file offset set that is currently buffered + /* + * The function reads a requested amount of bytes into the specified aligned buffer + * Upon success the function sets the length of the buffer to the amount of bytes actually + * read even though it might be less than actually requested. + * It then copies the amount of bytes requested by the user (left) to the user supplied + * buffer (dest) and reduces left by the amount of bytes copied to the user buffer + * + * @user_offset [in] - offset on disk where the read was requested by the user + * @first_page_start [in] - actual page aligned disk offset that we want to read from + * @bytes_to_read [in] - total amount of bytes that will be read from disk which is generally + * greater or equal to the amount that the user has requested due to the + * either alignment requirements or read_ahead in effect. + * @left [in/out] total amount of bytes that needs to be copied to the user buffer. It is reduced + * by the amount of bytes that actually copied + * @buffer - buffer to use + * @dest - user supplied buffer + */ + SSIZE_T ReadIntoBuffer(uint64_t user_offset, uint64_t first_page_start, + size_t bytes_to_read, size_t& left, AlignedBuffer& buffer, char* dest) const { + + assert(buffer.CurrentSize() == 0); + assert(buffer.Capacity() >= bytes_to_read); + + SSIZE_T read = pread(hFile_, buffer.Destination(), bytes_to_read, + first_page_start); + + if (read > 0) { + buffer.Size(read); + + // Let's figure out how much we read from the users standpoint + if ((first_page_start + buffer.CurrentSize()) > user_offset) { + assert(first_page_start <= user_offset); + size_t buffer_offset = user_offset - first_page_start; + read = buffer.Read(dest, buffer_offset, left); + } else { + read = 0; + } + left -= read; + } + return read; + } + + SSIZE_T ReadIntoOneShotBuffer(uint64_t user_offset, uint64_t first_page_start, + size_t bytes_to_read, size_t& left, char* dest) const { + + AlignedBuffer bigBuffer; + bigBuffer.Alignment(buffer_.Alignment()); + bigBuffer.AllocateNewBuffer(bytes_to_read); + + return ReadIntoBuffer(user_offset, first_page_start, bytes_to_read, left, + bigBuffer, dest); + } + + SSIZE_T ReadIntoInstanceBuffer(uint64_t user_offset, uint64_t first_page_start, + size_t bytes_to_read, size_t& left, char* dest) const { + + SSIZE_T read = ReadIntoBuffer(user_offset, first_page_start, bytes_to_read, + left, buffer_, dest); + + if (read > 0) { + buffered_start_ = first_page_start; + } + + return read; + } + public: WinRandomAccessFile(const std::string& fname, HANDLE hFile, size_t alignment, const EnvOptions& options) : filename_(fname), hFile_(hFile), use_os_buffer_(options.use_os_buffer), + read_ahead_(false), + compaction_readahead_size_(options.compaction_readahead_size), + random_access_max_buffer_size_(options.random_access_max_buffer_size), buffer_(), buffered_start_(0) { assert(!options.use_mmap_reads); // Unbuffered access, use internal buffer for reads if (!use_os_buffer_) { + // Do not allocate the buffer either until the first request or + // until there is a call to allocate a read-ahead buffer buffer_.Alignment(alignment); - // Random read, no need in a big buffer - // We read things in database blocks which are likely to be similar to - // the alignment we use. - buffer_.AllocateNewBuffer(alignment * 2); } } @@ -719,6 +789,10 @@ class WinRandomAccessFile : public RandomAccessFile { } } + virtual void EnableReadAhead() override { + this->Hint(SEQUENTIAL); + } + virtual Status Read(uint64_t offset, size_t n, Slice* result, char* scratch) const override { Status s; @@ -730,7 +804,7 @@ class WinRandomAccessFile : public RandomAccessFile { // - use our own aligned buffer // - always read at the offset of that is a multiple of alignment if (!use_os_buffer_) { - std::lock_guard lg(buffer_mut_); + std::unique_lock lock(buffer_mut_); // Let's see if at least some of the requested data is already // in the buffer @@ -749,40 +823,40 @@ class WinRandomAccessFile : public RandomAccessFile { if (left > 0) { // Figure out the start/end offset for reading and amount to read const size_t alignment = buffer_.Alignment(); - const size_t start_page_start = - TruncateToPageBoundary(alignment, offset); - const size_t end_page_start = - TruncateToPageBoundary(alignment, offset + left - 1); + const size_t first_page_start = + TruncateToPageBoundary(alignment, offset); + + size_t bytes_requested = left; + if (read_ahead_ && bytes_requested < compaction_readahead_size_) { + bytes_requested = compaction_readahead_size_; + } + + const size_t last_page_start = + TruncateToPageBoundary(alignment, offset + bytes_requested - 1); const size_t actual_bytes_toread = - (end_page_start - start_page_start) + alignment; + (last_page_start - first_page_start) + alignment; if (buffer_.Capacity() < actual_bytes_toread) { - buffer_.AllocateNewBuffer(actual_bytes_toread); + // If we are in read-ahead mode or the requested size + // exceeds max buffer size then use one-shot + // big buffer otherwise reallocate main buffer + if (read_ahead_ || + (actual_bytes_toread > random_access_max_buffer_size_)) { + // Unlock the mutex since we are not using instance buffer + lock.unlock(); + r = ReadIntoOneShotBuffer(offset, first_page_start, + actual_bytes_toread, left, dest); + } else { + buffer_.AllocateNewBuffer(actual_bytes_toread); + r = ReadIntoInstanceBuffer(offset, first_page_start, + actual_bytes_toread, left, dest); + } } else { buffer_.Clear(); - } - - SSIZE_T read = 0; - read = pread(hFile_, buffer_.Destination(), actual_bytes_toread, - start_page_start); - - if (read > 0) { - buffer_.Size(read); - buffered_start_ = start_page_start; - - // Let's figure out how much we read from the users standpoint - if ((buffered_start_ + uint64_t(read)) > offset) { - size_t buffer_offset = offset - buffered_start_; - r = buffer_.Read(dest, buffer_offset, left); - } else { - r = 0; - } - left -= r; - } else { - r = read; + r = ReadIntoInstanceBuffer(offset, first_page_start, + actual_bytes_toread, left, dest); } } - } else { r = pread(hFile_, scratch, left, offset); if (r > 0) { @@ -802,7 +876,23 @@ class WinRandomAccessFile : public RandomAccessFile { return true; } - virtual void Hint(AccessPattern pattern) override {} + virtual void Hint(AccessPattern pattern) override { + + if (pattern == SEQUENTIAL && + !use_os_buffer_ && + compaction_readahead_size_ > 0) { + std::lock_guard lg(buffer_mut_); + if (!read_ahead_) { + read_ahead_ = true; + // This would allocate read-ahead size + 2 alignments + // - one for memory alignment which added implicitly by AlignedBuffer + // - We add one more alignment because we will read one alignment more + // from disk + buffer_.AllocateNewBuffer(compaction_readahead_size_ + buffer_.Alignment()); + } + } + } + virtual Status InvalidateCache(size_t offset, size_t length) override { return Status::OK(); diff --git a/util/env.cc b/util/env.cc index df45d9804d..2c2339eaf3 100644 --- a/util/env.cc +++ b/util/env.cc @@ -292,6 +292,8 @@ void AssignEnvOptions(EnvOptions* env_options, const DBOptions& options) { env_options->use_mmap_writes = options.allow_mmap_writes; env_options->set_fd_cloexec = options.is_fd_close_on_exec; env_options->bytes_per_sync = options.bytes_per_sync; + env_options->compaction_readahead_size = options.compaction_readahead_size; + env_options->random_access_max_buffer_size = options.random_access_max_buffer_size; env_options->rate_limiter = options.rate_limiter.get(); env_options->allow_fallocate = options.allow_fallocate; } diff --git a/util/file_reader_writer.cc b/util/file_reader_writer.cc index ff459262c1..95e8c3998d 100644 --- a/util/file_reader_writer.cc +++ b/util/file_reader_writer.cc @@ -384,9 +384,15 @@ class ReadaheadRandomAccessFile : public RandomAccessFile { : file_(std::move(file)), readahead_size_(readahead_size), forward_calls_(file_->ShouldForwardRawRequest()), - buffer_(new char[readahead_size_]), + buffer_(), buffer_offset_(0), - buffer_len_(0) {} + buffer_len_(0) { + if (!forward_calls_) { + buffer_.reset(new char[readahead_size_]); + } else if (readahead_size_ > 0) { + file_->EnableReadAhead(); + } + } ReadaheadRandomAccessFile(const ReadaheadRandomAccessFile&) = delete; diff --git a/util/options.cc b/util/options.cc index 70bcf70639..ff7f5b2ce8 100644 --- a/util/options.cc +++ b/util/options.cc @@ -249,6 +249,7 @@ DBOptions::DBOptions() access_hint_on_compaction_start(NORMAL), new_table_reader_for_compaction_inputs(false), compaction_readahead_size(0), + random_access_max_buffer_size(1024 * 1024), use_adaptive_mutex(false), bytes_per_sync(0), wal_bytes_per_sync(0), @@ -305,6 +306,7 @@ DBOptions::DBOptions(const Options& options) new_table_reader_for_compaction_inputs( options.new_table_reader_for_compaction_inputs), compaction_readahead_size(options.compaction_readahead_size), + random_access_max_buffer_size(options.random_access_max_buffer_size), use_adaptive_mutex(options.use_adaptive_mutex), bytes_per_sync(options.bytes_per_sync), wal_bytes_per_sync(options.wal_bytes_per_sync), @@ -393,6 +395,10 @@ void DBOptions::Dump(Logger* log) const { " Options.compaction_readahead_size: %" ROCKSDB_PRIszt "d", compaction_readahead_size); + Header(log, + " Options.random_access_max_buffer_size: %" ROCKSDB_PRIszt + "d", + random_access_max_buffer_size); Header(log, " Options.use_adaptive_mutex: %d", use_adaptive_mutex); Header(log, " Options.rate_limiter: %p", diff --git a/util/options_helper.h b/util/options_helper.h index 3f6eab40ad..052bafd25c 100644 --- a/util/options_helper.h +++ b/util/options_helper.h @@ -180,6 +180,9 @@ static std::unordered_map db_options_type_info = { {"compaction_readahead_size", {offsetof(struct DBOptions, compaction_readahead_size), OptionType::kSizeT, OptionVerificationType::kNormal}}, + {"random_access_max_buffer_size", + { offsetof(struct DBOptions, random_access_max_buffer_size), OptionType::kSizeT, + OptionVerificationType::kNormal}}, {"use_adaptive_mutex", {offsetof(struct DBOptions, use_adaptive_mutex), OptionType::kBoolean, OptionVerificationType::kNormal}}, diff --git a/util/options_test.cc b/util/options_test.cc index e1849bb3fe..542151adb4 100644 --- a/util/options_test.cc +++ b/util/options_test.cc @@ -339,6 +339,7 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) { {"use_adaptive_mutex", "false"}, {"new_table_reader_for_compaction_inputs", "true"}, {"compaction_readahead_size", "100"}, + {"random_access_max_buffer_size", "3145728" }, {"bytes_per_sync", "47"}, {"wal_bytes_per_sync", "48"}, }; @@ -449,6 +450,7 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) { ASSERT_EQ(new_db_opt.use_adaptive_mutex, false); ASSERT_EQ(new_db_opt.new_table_reader_for_compaction_inputs, true); ASSERT_EQ(new_db_opt.compaction_readahead_size, 100); + ASSERT_EQ(new_db_opt.random_access_max_buffer_size, 3145728); ASSERT_EQ(new_db_opt.bytes_per_sync, static_cast(47)); ASSERT_EQ(new_db_opt.wal_bytes_per_sync, static_cast(48)); } From 1277a48f1b155ffa81b8f2c6b2fdd1597e6c5998 Mon Sep 17 00:00:00 2001 From: Dmitri Smirnov Date: Thu, 29 Oct 2015 11:34:34 -0700 Subject: [PATCH 2/2] Fix 80 character limit issue. --- db/db_bench.cc | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/db/db_bench.cc b/db/db_bench.cc index 9ba04a24b8..e3edc83967 100644 --- a/db/db_bench.cc +++ b/db/db_bench.cc @@ -373,7 +373,8 @@ DEFINE_int32(new_table_reader_for_compaction_inputs, true, DEFINE_int32(compaction_readahead_size, 0, "Compaction readahead size"); -DEFINE_int32(random_access_max_buffer_size, 1024 * 1024, "Maximum windows randomaccess buffer size"); +DEFINE_int32(random_access_max_buffer_size, 1024 * 1024, + "Maximum windows randomaccess buffer size"); DEFINE_int32(bloom_bits, -1, "Bloom filter bits per key. Negative means" " use default settings.");