mirror of
https://github.com/facebook/rocksdb.git
synced 2024-12-02 10:15:54 +00:00
8515bd50c9
Summary: Added rate limiter and read rate-limiting support to SequentialFileReader. I've updated call sites to SequentialFileReader::Read with appropriate IO priority (or left a TODO and specified IO_TOTAL for now). The PR is separated into four commits: the first one added the rate-limiting support, but with some fixes in the unit test since the number of request bytes from rate limiter in SequentialFileReader are not accurate (there is overcharge at EOF). The second commit fixed this by allowing SequentialFileReader to check file size and determine how many bytes are left in the file to read. The third commit added benchmark related code. The fourth commit moved the logic of using file size to avoid overcharging the rate limiter into backup engine (the main user of SequentialFileReader). Pull Request resolved: https://github.com/facebook/rocksdb/pull/9973 Test Plan: - `make check`, backup_engine_test covers usage of SequentialFileReader with rate limiter. - Run db_bench to check if rate limiting is throttling as expected: Verified that reads and writes are together throttled at 2MB/s, and at 0.2MB chunks that are 100ms apart. - Set up: `./db_bench --benchmarks=fillrandom -db=/dev/shm/test_rocksdb` - Benchmark: ``` strace -ttfe read,write ./db_bench --benchmarks=backup -db=/dev/shm/test_rocksdb --backup_rate_limit=2097152 --use_existing_db strace -ttfe read,write ./db_bench --benchmarks=restore -db=/dev/shm/test_rocksdb --restore_rate_limit=2097152 --use_existing_db ``` - db bench on backup and restore to ensure no performance regression. - backup (avg over 50 runs): pre-change: 1.90443e+06 micros/op; post-change: 1.8993e+06 micros/op (improve by 0.2%) - restore (avg over 50 runs): pre-change: 1.79105e+06 micros/op; post-change: 1.78192e+06 micros/op (improve by 0.5%) ``` # Set up ./db_bench --benchmarks=fillrandom -db=/tmp/test_rocksdb -num=10000000 # benchmark TEST_TMPDIR=/tmp/test_rocksdb NUM_RUN=50 for ((j=0;j<$NUM_RUN;j++)) do ./db_bench -db=$TEST_TMPDIR -num=10000000 -benchmarks=backup -use_existing_db | egrep 'backup' # Restore #./db_bench -db=$TEST_TMPDIR -num=10000000 -benchmarks=restore -use_existing_db done > rate_limit.txt && awk -v NUM_RUN=$NUM_RUN '{sum+=$3;sum_sqrt+=$3^2}END{print sum/NUM_RUN, sqrt(sum_sqrt/NUM_RUN-(sum/NUM_RUN)^2)}' rate_limit.txt >> rate_limit_2.txt ``` Reviewed By: hx235 Differential Revision: D36327418 Pulled By: cbi42 fbshipit-source-id: e75d4307cff815945482df5ba630c1e88d064691
329 lines
12 KiB
C++
329 lines
12 KiB
C++
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
|
// This source code is licensed under both the GPLv2 (found in the
|
|
// COPYING file in the root directory) and Apache 2.0 License
|
|
// (found in the LICENSE.Apache file in the root directory).
|
|
//
|
|
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
|
|
// Use of this source code is governed by a BSD-style license that can be
|
|
// found in the LICENSE file. See the AUTHORS file for names of contributors.
|
|
|
|
#include "file/sequence_file_reader.h"
|
|
|
|
#include <algorithm>
|
|
#include <mutex>
|
|
|
|
#include "file/read_write_util.h"
|
|
#include "monitoring/histogram.h"
|
|
#include "monitoring/iostats_context_imp.h"
|
|
#include "port/port.h"
|
|
#include "test_util/sync_point.h"
|
|
#include "util/aligned_buffer.h"
|
|
#include "util/random.h"
|
|
#include "util/rate_limiter.h"
|
|
|
|
namespace ROCKSDB_NAMESPACE {
|
|
IOStatus SequentialFileReader::Create(
|
|
const std::shared_ptr<FileSystem>& fs, const std::string& fname,
|
|
const FileOptions& file_opts, std::unique_ptr<SequentialFileReader>* reader,
|
|
IODebugContext* dbg, RateLimiter* rate_limiter) {
|
|
std::unique_ptr<FSSequentialFile> file;
|
|
IOStatus io_s = fs->NewSequentialFile(fname, file_opts, &file, dbg);
|
|
if (io_s.ok()) {
|
|
reader->reset(new SequentialFileReader(std::move(file), fname, nullptr, {},
|
|
rate_limiter));
|
|
}
|
|
return io_s;
|
|
}
|
|
|
|
IOStatus SequentialFileReader::Read(size_t n, Slice* result, char* scratch,
|
|
Env::IOPriority rate_limiter_priority) {
|
|
IOStatus io_s;
|
|
if (use_direct_io()) {
|
|
#ifndef ROCKSDB_LITE
|
|
//
|
|
// |-offset_advance-|---bytes returned--|
|
|
// |----------------------buf size-------------------------|
|
|
// | | | |
|
|
// aligned offset offset + n Roundup(offset + n,
|
|
// offset alignment)
|
|
//
|
|
size_t offset = offset_.fetch_add(n);
|
|
size_t alignment = file_->GetRequiredBufferAlignment();
|
|
size_t aligned_offset = TruncateToPageBoundary(alignment, offset);
|
|
size_t offset_advance = offset - aligned_offset;
|
|
size_t size = Roundup(offset + n, alignment) - aligned_offset;
|
|
size_t r = 0;
|
|
AlignedBuffer buf;
|
|
buf.Alignment(alignment);
|
|
buf.AllocateNewBuffer(size);
|
|
|
|
while (buf.CurrentSize() < size) {
|
|
size_t allowed;
|
|
if (rate_limiter_priority != Env::IO_TOTAL && rate_limiter_ != nullptr) {
|
|
allowed = rate_limiter_->RequestToken(
|
|
buf.Capacity() - buf.CurrentSize(), buf.Alignment(),
|
|
rate_limiter_priority, nullptr /* stats */,
|
|
RateLimiter::OpType::kRead);
|
|
} else {
|
|
assert(buf.CurrentSize() == 0);
|
|
allowed = size;
|
|
}
|
|
|
|
Slice tmp;
|
|
uint64_t orig_offset = 0;
|
|
FileOperationInfo::StartTimePoint start_ts;
|
|
if (ShouldNotifyListeners()) {
|
|
orig_offset = aligned_offset + buf.CurrentSize();
|
|
start_ts = FileOperationInfo::StartNow();
|
|
}
|
|
io_s = file_->PositionedRead(aligned_offset + buf.CurrentSize(), allowed,
|
|
IOOptions(), &tmp, buf.Destination(),
|
|
nullptr /* dbg */);
|
|
if (ShouldNotifyListeners()) {
|
|
auto finish_ts = FileOperationInfo::FinishNow();
|
|
NotifyOnFileReadFinish(orig_offset, tmp.size(), start_ts, finish_ts,
|
|
io_s);
|
|
}
|
|
buf.Size(buf.CurrentSize() + tmp.size());
|
|
if (!io_s.ok() || tmp.size() < allowed) {
|
|
break;
|
|
}
|
|
}
|
|
|
|
if (io_s.ok() && offset_advance < buf.CurrentSize()) {
|
|
r = buf.Read(scratch, offset_advance,
|
|
std::min(buf.CurrentSize() - offset_advance, n));
|
|
}
|
|
*result = Slice(scratch, r);
|
|
#endif // !ROCKSDB_LITE
|
|
} else {
|
|
// To be paranoid, modify scratch a little bit, so in case underlying
|
|
// FileSystem doesn't fill the buffer but return success and `scratch`
|
|
// returns contains a previous block, returned value will not pass
|
|
// checksum.
|
|
// It's hard to find useful byte for direct I/O case, so we skip it.
|
|
if (n > 0 && scratch != nullptr) {
|
|
scratch[0]++;
|
|
}
|
|
|
|
size_t read = 0;
|
|
while (read < n) {
|
|
size_t allowed;
|
|
if (rate_limiter_priority != Env::IO_TOTAL && rate_limiter_ != nullptr) {
|
|
allowed = rate_limiter_->RequestToken(
|
|
n - read, 0 /* alignment */, rate_limiter_priority,
|
|
nullptr /* stats */, RateLimiter::OpType::kRead);
|
|
} else {
|
|
allowed = n;
|
|
}
|
|
#ifndef ROCKSDB_LITE
|
|
FileOperationInfo::StartTimePoint start_ts;
|
|
if (ShouldNotifyListeners()) {
|
|
start_ts = FileOperationInfo::StartNow();
|
|
}
|
|
#endif
|
|
Slice tmp;
|
|
io_s = file_->Read(allowed, IOOptions(), &tmp, scratch + read,
|
|
nullptr /* dbg */);
|
|
#ifndef ROCKSDB_LITE
|
|
if (ShouldNotifyListeners()) {
|
|
auto finish_ts = FileOperationInfo::FinishNow();
|
|
size_t offset = offset_.fetch_add(tmp.size());
|
|
NotifyOnFileReadFinish(offset, tmp.size(), start_ts, finish_ts, io_s);
|
|
}
|
|
#endif
|
|
read += tmp.size();
|
|
if (!io_s.ok() || tmp.size() < allowed) {
|
|
break;
|
|
}
|
|
}
|
|
*result = Slice(scratch, read);
|
|
}
|
|
IOSTATS_ADD(bytes_read, result->size());
|
|
return io_s;
|
|
}
|
|
|
|
IOStatus SequentialFileReader::Skip(uint64_t n) {
|
|
#ifndef ROCKSDB_LITE
|
|
if (use_direct_io()) {
|
|
offset_ += static_cast<size_t>(n);
|
|
return IOStatus::OK();
|
|
}
|
|
#endif // !ROCKSDB_LITE
|
|
return file_->Skip(n);
|
|
}
|
|
|
|
namespace {
|
|
// This class wraps a SequentialFile, exposing same API, with the differenece
|
|
// of being able to prefetch up to readahead_size bytes and then serve them
|
|
// from memory, avoiding the entire round-trip if, for example, the data for the
|
|
// file is actually remote.
|
|
class ReadaheadSequentialFile : public FSSequentialFile {
|
|
public:
|
|
ReadaheadSequentialFile(std::unique_ptr<FSSequentialFile>&& file,
|
|
size_t readahead_size)
|
|
: file_(std::move(file)),
|
|
alignment_(file_->GetRequiredBufferAlignment()),
|
|
readahead_size_(Roundup(readahead_size, alignment_)),
|
|
buffer_(),
|
|
buffer_offset_(0),
|
|
read_offset_(0) {
|
|
buffer_.Alignment(alignment_);
|
|
buffer_.AllocateNewBuffer(readahead_size_);
|
|
}
|
|
|
|
ReadaheadSequentialFile(const ReadaheadSequentialFile&) = delete;
|
|
|
|
ReadaheadSequentialFile& operator=(const ReadaheadSequentialFile&) = delete;
|
|
|
|
IOStatus Read(size_t n, const IOOptions& opts, Slice* result, char* scratch,
|
|
IODebugContext* dbg) override {
|
|
std::unique_lock<std::mutex> lk(lock_);
|
|
|
|
size_t cached_len = 0;
|
|
// Check if there is a cache hit, meaning that [offset, offset + n) is
|
|
// either completely or partially in the buffer. If it's completely cached,
|
|
// including end of file case when offset + n is greater than EOF, then
|
|
// return.
|
|
if (TryReadFromCache(n, &cached_len, scratch) &&
|
|
(cached_len == n || buffer_.CurrentSize() < readahead_size_)) {
|
|
// We read exactly what we needed, or we hit end of file - return.
|
|
*result = Slice(scratch, cached_len);
|
|
return IOStatus::OK();
|
|
}
|
|
n -= cached_len;
|
|
|
|
IOStatus s;
|
|
// Read-ahead only make sense if we have some slack left after reading
|
|
if (n + alignment_ >= readahead_size_) {
|
|
s = file_->Read(n, opts, result, scratch + cached_len, dbg);
|
|
if (s.ok()) {
|
|
read_offset_ += result->size();
|
|
*result = Slice(scratch, cached_len + result->size());
|
|
}
|
|
buffer_.Clear();
|
|
return s;
|
|
}
|
|
|
|
s = ReadIntoBuffer(readahead_size_, opts, dbg);
|
|
if (s.ok()) {
|
|
// The data we need is now in cache, so we can safely read it
|
|
size_t remaining_len;
|
|
TryReadFromCache(n, &remaining_len, scratch + cached_len);
|
|
*result = Slice(scratch, cached_len + remaining_len);
|
|
}
|
|
return s;
|
|
}
|
|
|
|
IOStatus Skip(uint64_t n) override {
|
|
std::unique_lock<std::mutex> lk(lock_);
|
|
IOStatus s = IOStatus::OK();
|
|
// First check if we need to skip already cached data
|
|
if (buffer_.CurrentSize() > 0) {
|
|
// Do we need to skip beyond cached data?
|
|
if (read_offset_ + n >= buffer_offset_ + buffer_.CurrentSize()) {
|
|
// Yes. Skip whaterver is in memory and adjust offset accordingly
|
|
n -= buffer_offset_ + buffer_.CurrentSize() - read_offset_;
|
|
read_offset_ = buffer_offset_ + buffer_.CurrentSize();
|
|
} else {
|
|
// No. The entire section to be skipped is entirely i cache.
|
|
read_offset_ += n;
|
|
n = 0;
|
|
}
|
|
}
|
|
if (n > 0) {
|
|
// We still need to skip more, so call the file API for skipping
|
|
s = file_->Skip(n);
|
|
if (s.ok()) {
|
|
read_offset_ += n;
|
|
}
|
|
buffer_.Clear();
|
|
}
|
|
return s;
|
|
}
|
|
|
|
IOStatus PositionedRead(uint64_t offset, size_t n, const IOOptions& opts,
|
|
Slice* result, char* scratch,
|
|
IODebugContext* dbg) override {
|
|
return file_->PositionedRead(offset, n, opts, result, scratch, dbg);
|
|
}
|
|
|
|
IOStatus InvalidateCache(size_t offset, size_t length) override {
|
|
std::unique_lock<std::mutex> lk(lock_);
|
|
buffer_.Clear();
|
|
return file_->InvalidateCache(offset, length);
|
|
}
|
|
|
|
bool use_direct_io() const override { return file_->use_direct_io(); }
|
|
|
|
private:
|
|
// Tries to read from buffer_ n bytes. If anything was read from the cache, it
|
|
// sets cached_len to the number of bytes actually read, copies these number
|
|
// of bytes to scratch and returns true.
|
|
// If nothing was read sets cached_len to 0 and returns false.
|
|
bool TryReadFromCache(size_t n, size_t* cached_len, char* scratch) {
|
|
if (read_offset_ < buffer_offset_ ||
|
|
read_offset_ >= buffer_offset_ + buffer_.CurrentSize()) {
|
|
*cached_len = 0;
|
|
return false;
|
|
}
|
|
uint64_t offset_in_buffer = read_offset_ - buffer_offset_;
|
|
*cached_len = std::min(
|
|
buffer_.CurrentSize() - static_cast<size_t>(offset_in_buffer), n);
|
|
memcpy(scratch, buffer_.BufferStart() + offset_in_buffer, *cached_len);
|
|
read_offset_ += *cached_len;
|
|
return true;
|
|
}
|
|
|
|
// Reads into buffer_ the next n bytes from file_.
|
|
// Can actually read less if EOF was reached.
|
|
// Returns the status of the read operastion on the file.
|
|
IOStatus ReadIntoBuffer(size_t n, const IOOptions& opts,
|
|
IODebugContext* dbg) {
|
|
if (n > buffer_.Capacity()) {
|
|
n = buffer_.Capacity();
|
|
}
|
|
assert(IsFileSectorAligned(n, alignment_));
|
|
Slice result;
|
|
IOStatus s = file_->Read(n, opts, &result, buffer_.BufferStart(), dbg);
|
|
if (s.ok()) {
|
|
buffer_offset_ = read_offset_;
|
|
buffer_.Size(result.size());
|
|
assert(result.size() == 0 || buffer_.BufferStart() == result.data());
|
|
}
|
|
return s;
|
|
}
|
|
|
|
const std::unique_ptr<FSSequentialFile> file_;
|
|
const size_t alignment_;
|
|
const size_t readahead_size_;
|
|
|
|
std::mutex lock_;
|
|
// The buffer storing the prefetched data
|
|
AlignedBuffer buffer_;
|
|
// The offset in file_, corresponding to data stored in buffer_
|
|
uint64_t buffer_offset_;
|
|
// The offset up to which data was read from file_. In fact, it can be larger
|
|
// than the actual file size, since the file_->Skip(n) call doesn't return the
|
|
// actual number of bytes that were skipped, which can be less than n.
|
|
// This is not a problemm since read_offset_ is monotonically increasing and
|
|
// its only use is to figure out if next piece of data should be read from
|
|
// buffer_ or file_ directly.
|
|
uint64_t read_offset_;
|
|
};
|
|
} // namespace
|
|
|
|
std::unique_ptr<FSSequentialFile>
|
|
SequentialFileReader::NewReadaheadSequentialFile(
|
|
std::unique_ptr<FSSequentialFile>&& file, size_t readahead_size) {
|
|
if (file->GetRequiredBufferAlignment() >= readahead_size) {
|
|
// Short-circuit and return the original file if readahead_size is
|
|
// too small and hence doesn't make sense to be used for prefetching.
|
|
return std::move(file);
|
|
}
|
|
std::unique_ptr<FSSequentialFile> result(
|
|
new ReadaheadSequentialFile(std::move(file), readahead_size));
|
|
return result;
|
|
}
|
|
} // namespace ROCKSDB_NAMESPACE
|