From 534357ca3aa1216d39fa88c32d2ed17ff04308bc Mon Sep 17 00:00:00 2001 From: Lei Jin Date: Tue, 8 Jul 2014 12:31:49 -0700 Subject: [PATCH] integrate rate limiter into rocksdb Summary: Add option and plugin rate limiter for PosixWritableFile. The rate limiter only applies to flush and compaction. WAL and MANIFEST are excluded from this enforcement. Test Plan: db_test Reviewers: igor, yhchiang, sdong Reviewed By: sdong Subscribers: leveldb Differential Revision: https://reviews.facebook.net/D19425 --- HISTORY.md | 1 + db/builder.cc | 4 ++- db/builder.h | 4 ++- db/db_impl.cc | 5 +-- db/db_test.cc | 72 ++++++++++++++++++++++++++++++++++++++- include/rocksdb/env.h | 20 ++++++++++- include/rocksdb/options.h | 18 ++++++---- util/env.cc | 1 + util/env_posix.cc | 22 +++++++++--- util/options.cc | 2 ++ 10 files changed, 132 insertions(+), 17 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index a4ef5d659c..a334de234d 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -6,6 +6,7 @@ * HashLinklist reduces performance outlier caused by skewed bucket by switching data in the bucket from linked list to skip list. Add parameter threshold_use_skiplist in NewHashLinkListRepFactory(). * RocksDB is now able to reclaim storage space more effectively during the compaction process. This is done by compensating the size of each deletion entry by the 2X average value size, which makes compaction to be triggerred by deletion entries more easily. * Add TimeOut API to write. Now WriteOptions have a variable called timeout_hint_us. With timeout_hint_us set to non-zero, any write associated with this timeout_hint_us may be aborted when it runs longer than the specified timeout_hint_us, and it is guaranteed that any write completes earlier than the specified time-out will not be aborted due to the time-out condition. +* Add a rate_limiter option, which controls total throughput of flush and compaction. The throughput is specified in bytes/sec. Flush always has precedence over compaction when available bandwidth is constrained. ## 3.2.0 (06/20/2014) diff --git a/db/builder.cc b/db/builder.cc index 3be61bd10c..c84e92ec08 100644 --- a/db/builder.cc +++ b/db/builder.cc @@ -40,7 +40,8 @@ Status BuildTable(const std::string& dbname, Env* env, const Options& options, const InternalKeyComparator& internal_comparator, const SequenceNumber newest_snapshot, const SequenceNumber earliest_seqno_in_memtable, - const CompressionType compression) { + const CompressionType compression, + const Env::IOPriority io_priority) { Status s; meta->fd.file_size = 0; meta->smallest_seqno = meta->largest_seqno = 0; @@ -62,6 +63,7 @@ Status BuildTable(const std::string& dbname, Env* env, const Options& options, if (!s.ok()) { return s; } + file->SetIOPriority(io_priority); TableBuilder* builder = NewTableBuilder(options, internal_comparator, file.get(), compression); diff --git a/db/builder.h b/db/builder.h index 68eb3fc6fd..f57501abd1 100644 --- a/db/builder.h +++ b/db/builder.h @@ -7,6 +7,7 @@ // found in the LICENSE file. See the AUTHORS file for names of contributors. #pragma once #include "rocksdb/comparator.h" +#include "rocksdb/env.h" #include "rocksdb/status.h" #include "rocksdb/types.h" #include "rocksdb/options.h" @@ -40,6 +41,7 @@ extern Status BuildTable(const std::string& dbname, Env* env, const InternalKeyComparator& internal_comparator, const SequenceNumber newest_snapshot, const SequenceNumber earliest_seqno_in_memtable, - const CompressionType compression); + const CompressionType compression, + const Env::IOPriority io_priority = Env::IO_HIGH); } // namespace rocksdb diff --git a/db/db_impl.cc b/db/db_impl.cc index 6862f9a776..2b96cdee5a 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1406,7 +1406,7 @@ Status DBImpl::WriteLevel0TableForRecovery(ColumnFamilyData* cfd, MemTable* mem, s = BuildTable(dbname_, env_, *cfd->options(), storage_options_, cfd->table_cache(), iter, &meta, cfd->internal_comparator(), newest_snapshot, earliest_seqno_in_memtable, - GetCompressionFlush(*cfd->options())); + GetCompressionFlush(*cfd->options()), Env::IO_HIGH); LogFlush(options_.info_log); mutex_.Lock(); } @@ -1473,7 +1473,7 @@ Status DBImpl::WriteLevel0Table(ColumnFamilyData* cfd, s = BuildTable(dbname_, env_, *cfd->options(), storage_options_, cfd->table_cache(), iter, &meta, cfd->internal_comparator(), newest_snapshot, earliest_seqno_in_memtable, - GetCompressionFlush(*cfd->options())); + GetCompressionFlush(*cfd->options()), Env::IO_HIGH); LogFlush(options_.info_log); delete iter; Log(options_.info_log, @@ -2385,6 +2385,7 @@ Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) { Status s = env_->NewWritableFile(fname, &compact->outfile, storage_options_); if (s.ok()) { + compact->outfile->SetIOPriority(Env::IO_LOW); compact->outfile->SetPreallocationBlockSize( compact->compaction->OutputFilePreallocationSize()); diff --git a/db/db_test.cc b/db/db_test.cc index 025e04f24a..23f6bc1e2e 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -27,6 +27,7 @@ #include "rocksdb/slice.h" #include "rocksdb/slice_transform.h" #include "rocksdb/table.h" +#include "rocksdb/options.h" #include "rocksdb/table_properties.h" #include "table/block_based_table_factory.h" #include "table/plain_table_factory.h" @@ -35,6 +36,7 @@ #include "utilities/merge_operators.h" #include "util/logging.h" #include "util/mutexlock.h" +#include "util/rate_limiter.h" #include "util/statistics.h" #include "util/testharness.h" #include "util/sync_point.h" @@ -135,6 +137,8 @@ class SpecialEnv : public EnvWrapper { anon::AtomicCounter sleep_counter_; + std::atomic bytes_written_; + explicit SpecialEnv(Env* base) : EnvWrapper(base) { delay_sstable_sync_.Release_Store(nullptr); no_space_.Release_Store(nullptr); @@ -144,7 +148,8 @@ class SpecialEnv : public EnvWrapper { manifest_sync_error_.Release_Store(nullptr); manifest_write_error_.Release_Store(nullptr); log_write_error_.Release_Store(nullptr); - } + bytes_written_ = 0; + } Status NewWritableFile(const std::string& f, unique_ptr* r, const EnvOptions& soptions) { @@ -163,6 +168,7 @@ class SpecialEnv : public EnvWrapper { // Drop writes on the floor return Status::OK(); } else { + env_->bytes_written_ += data.size(); return base_->Append(data); } } @@ -174,6 +180,9 @@ class SpecialEnv : public EnvWrapper { } return base_->Sync(); } + void SetIOPriority(Env::IOPriority pri) { + base_->SetIOPriority(pri); + } }; class ManifestFile : public WritableFile { private: @@ -7124,6 +7133,67 @@ TEST(DBTest, MTRandomTimeoutTest) { } // anonymous namespace +TEST(DBTest, RateLimitingTest) { + Options options = CurrentOptions(); + options.write_buffer_size = 1 << 20; // 1MB + options.level0_file_num_compaction_trigger = 10; + options.target_file_size_base = 1 << 20; // 1MB + options.max_bytes_for_level_base = 10 << 20; // 10MB + options.compression = kNoCompression; + options.create_if_missing = true; + options.env = env_; + DestroyAndReopen(&options); + + // # no rate limiting + Random rnd(301); + uint64_t start = env_->NowMicros(); + // Write ~32M data + for (int64_t i = 0; i < (32 << 10); ++i) { + ASSERT_OK(Put(std::to_string(i), RandomString(&rnd, (1 << 10) + 1))); + } + uint64_t elapsed = env_->NowMicros() - start; + double raw_rate = env_->bytes_written_ * 1000000 / elapsed; + Close(); + + // # rate limiting with 0.7 x threshold + options.rate_limiter.reset( + NewRateLimiter(static_cast(0.7 * raw_rate))); + env_->bytes_written_ = 0; + DestroyAndReopen(&options); + + start = env_->NowMicros(); + // Write ~32M data + for (int64_t i = 0; i < (32 << 10); ++i) { + ASSERT_OK(Put(std::to_string(i), RandomString(&rnd, (1 << 10) + 1))); + } + elapsed = env_->NowMicros() - start; + Close(); + ASSERT_TRUE(options.rate_limiter->GetTotalBytesThrough() == + env_->bytes_written_); + double ratio = env_->bytes_written_ * 1000000 / elapsed / raw_rate; + fprintf(stderr, "write rate ratio = %.2lf, expected 0.7\n", ratio); + ASSERT_TRUE(ratio > 0.6 && ratio < 0.8); + + // # rate limiting with half of the raw_rate + options.rate_limiter.reset( + NewRateLimiter(static_cast(raw_rate / 2))); + env_->bytes_written_ = 0; + DestroyAndReopen(&options); + + start = env_->NowMicros(); + // Write ~32M data + for (int64_t i = 0; i < (32 << 10); ++i) { + ASSERT_OK(Put(std::to_string(i), RandomString(&rnd, (1 << 10) + 1))); + } + elapsed = env_->NowMicros() - start; + Close(); + ASSERT_TRUE(options.rate_limiter->GetTotalBytesThrough() == + env_->bytes_written_); + ratio = env_->bytes_written_ * 1000000 / elapsed / raw_rate; + fprintf(stderr, "write rate ratio = %.2lf, expected 0.5\n", ratio); + ASSERT_TRUE(ratio > 0.4 && ratio < 0.6); +} + } // namespace rocksdb int main(int argc, char** argv) { diff --git a/include/rocksdb/env.h b/include/rocksdb/env.h index 9698c66aed..fc4665d903 100644 --- a/include/rocksdb/env.h +++ b/include/rocksdb/env.h @@ -35,6 +35,7 @@ class WritableFile; class RandomRWFile; class Directory; struct DBOptions; +class RateLimiter; using std::unique_ptr; using std::shared_ptr; @@ -74,6 +75,9 @@ struct EnvOptions { // write. By default, we set it to true for MANIFEST writes and false for // WAL writes bool fallocate_with_keep_size = true; + + // If not nullptr, write rate limiting is enabled for flush and compaction + RateLimiter* rate_limiter = nullptr; }; class Env { @@ -379,7 +383,10 @@ class RandomAccessFile { // at a time to the file. class WritableFile { public: - WritableFile() : last_preallocated_block_(0), preallocation_block_size_ (0) { + WritableFile() + : last_preallocated_block_(0), + preallocation_block_size_(0), + io_priority_(Env::IO_TOTAL) { } virtual ~WritableFile(); @@ -398,6 +405,14 @@ class WritableFile { return Sync(); } + /* + * Change the priority in rate limiter if rate limiting is enabled. + * If rate limiting is not enabled, this call has no effect. + */ + virtual void SetIOPriority(Env::IOPriority pri) { + io_priority_ = pri; + } + /* * Get the size of valid data in the file. */ @@ -482,6 +497,9 @@ class WritableFile { // No copying allowed WritableFile(const WritableFile&); void operator=(const WritableFile&); + + protected: + Env::IOPriority io_priority_; }; // A file abstraction for random reading and writing. diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index f1f807c159..dea14f2a7b 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -39,8 +39,7 @@ class Slice; class SliceTransform; class Statistics; class InternalKeyComparator; - -using std::shared_ptr; +class RateLimiter; // DB contents are stored in a set of blocks, each of which holds a // sequence of key,value pairs. Each block may be compressed before @@ -133,7 +132,7 @@ struct ColumnFamilyOptions { // for the first time. It's necessary to specify a merge operator when // openning the DB in this case. // Default: nullptr - shared_ptr merge_operator; + std::shared_ptr merge_operator; // A single CompactionFilter instance to call into during compaction. // Allows an application to modify/delete a key-value during background @@ -206,12 +205,12 @@ struct ColumnFamilyOptions { // If non-NULL use the specified cache for blocks. // If NULL, rocksdb will automatically create and use an 8MB internal cache. // Default: nullptr - shared_ptr block_cache; + std::shared_ptr block_cache; // If non-NULL use the specified cache for compressed blocks. // If NULL, rocksdb will not use a compressed block cache. // Default: nullptr - shared_ptr block_cache_compressed; + std::shared_ptr block_cache_compressed; // Approximate size of user data packed per block. Note that the // block size specified here corresponds to uncompressed data. The @@ -626,11 +625,16 @@ struct DBOptions { // Default: Env::Default() Env* env; + // Use to control write rate of flush and compaction. Flush has higher + // priority than compaction. Rate limiting is disabled if nullptr. + // Default: nullptr + std::shared_ptr rate_limiter; + // Any internal progress/error information generated by the db will // be written to info_log if it is non-nullptr, or to a file stored // in the same directory as the DB contents if info_log is nullptr. // Default: nullptr - shared_ptr info_log; + std::shared_ptr info_log; InfoLogLevel info_log_level; @@ -653,7 +657,7 @@ struct DBOptions { // If non-null, then we should collect metrics about database operations // Statistics objects should not be shared between DB instances as // it does not use any locks to prevent concurrent updates. - shared_ptr statistics; + std::shared_ptr statistics; // If true, then the contents of data files are not synced // to stable storage. Their contents remain in the OS buffers till the diff --git a/util/env.cc b/util/env.cc index 1c0cae4c34..91ae0784b5 100644 --- a/util/env.cc +++ b/util/env.cc @@ -226,6 +226,7 @@ void AssignEnvOptions(EnvOptions* env_options, const DBOptions& options) { env_options->use_mmap_writes = options.allow_mmap_writes; env_options->set_fd_cloexec = options.is_fd_close_on_exec; env_options->bytes_per_sync = options.bytes_per_sync; + env_options->rate_limiter = options.rate_limiter.get(); } } diff --git a/util/env_posix.cc b/util/env_posix.cc index a73ec6b0ea..dc8696e557 100644 --- a/util/env_posix.cc +++ b/util/env_posix.cc @@ -33,6 +33,8 @@ #if defined(LEVELDB_PLATFORM_ANDROID) #include #endif +#include +#include #include "rocksdb/env.h" #include "rocksdb/slice.h" #include "port/port.h" @@ -41,7 +43,7 @@ #include "util/posix_logger.h" #include "util/random.h" #include "util/iostats_context_imp.h" -#include +#include "util/rate_limiter.h" // Get nano time for mach systems #ifdef __MACH__ @@ -634,6 +636,7 @@ class PosixWritableFile : public WritableFile { #ifdef ROCKSDB_FALLOCATE_PRESENT bool fallocate_with_keep_size_; #endif + RateLimiter* rate_limiter_; public: PosixWritableFile(const std::string& fname, int fd, size_t capacity, @@ -647,7 +650,8 @@ class PosixWritableFile : public WritableFile { pending_sync_(false), pending_fsync_(false), last_sync_size_(0), - bytes_per_sync_(options.bytes_per_sync) { + bytes_per_sync_(options.bytes_per_sync), + rate_limiter_(options.rate_limiter) { #ifdef ROCKSDB_FALLOCATE_PRESENT fallocate_with_keep_size_ = options.fallocate_with_keep_size; #endif @@ -691,7 +695,7 @@ class PosixWritableFile : public WritableFile { cursize_ += left; } else { while (left != 0) { - ssize_t done = write(fd_, src, left); + ssize_t done = write(fd_, src, RequestToken(left)); if (done < 0) { if (errno == EINTR) { continue; @@ -742,7 +746,7 @@ class PosixWritableFile : public WritableFile { size_t left = cursize_; char* src = buf_.get(); while (left != 0) { - ssize_t done = write(fd_, src, left); + ssize_t done = write(fd_, src, RequestToken(left)); if (done < 0) { if (errno == EINTR) { continue; @@ -838,6 +842,16 @@ class PosixWritableFile : public WritableFile { return GetUniqueIdFromFile(fd_, id, max_size); } #endif + + private: + inline size_t RequestToken(size_t bytes) { + if (rate_limiter_ && io_priority_ < Env::IO_TOTAL) { + bytes = std::min(bytes, + static_cast(rate_limiter_->GetSingleBurstBytes())); + rate_limiter_->Request(bytes, io_priority_); + } + return bytes; + } }; class PosixRandomRWFile : public RandomRWFile { diff --git a/util/options.cc b/util/options.cc index 88d26fa018..e37fe9e746 100644 --- a/util/options.cc +++ b/util/options.cc @@ -166,6 +166,7 @@ DBOptions::DBOptions() error_if_exists(false), paranoid_checks(true), env(Env::Default()), + rate_limiter(nullptr), info_log(nullptr), info_log_level(INFO_LEVEL), max_open_files(5000), @@ -206,6 +207,7 @@ DBOptions::DBOptions(const Options& options) error_if_exists(options.error_if_exists), paranoid_checks(options.paranoid_checks), env(options.env), + rate_limiter(options.rate_limiter), info_log(options.info_log), info_log_level(options.info_log_level), max_open_files(options.max_open_files),