Support read rate-limiting in SequentialFileReader (#9973)

Summary:
Added rate limiter and read rate-limiting support to SequentialFileReader. I've updated call sites to SequentialFileReader::Read with appropriate IO priority (or left a TODO and specified IO_TOTAL for now).

The PR is separated into four commits: the first one added the rate-limiting support, but with some fixes in the unit test since the number of request bytes from rate limiter in SequentialFileReader are not accurate (there is overcharge at EOF). The second commit fixed this by allowing SequentialFileReader to check file size and determine how many bytes are left in the file to read. The third commit added benchmark related code. The fourth commit moved the logic of using file size to avoid overcharging the rate limiter into backup engine (the main user of SequentialFileReader).

Pull Request resolved: https://github.com/facebook/rocksdb/pull/9973

Test Plan:
- `make check`, backup_engine_test covers usage of SequentialFileReader with rate limiter.
- Run db_bench to check if rate limiting is throttling as expected: Verified that reads and writes are together throttled at 2MB/s, and at 0.2MB chunks that are 100ms apart.
  - Set up: `./db_bench --benchmarks=fillrandom -db=/dev/shm/test_rocksdb`
  - Benchmark:
```
strace -ttfe read,write ./db_bench --benchmarks=backup -db=/dev/shm/test_rocksdb --backup_rate_limit=2097152 --use_existing_db
strace -ttfe read,write ./db_bench --benchmarks=restore -db=/dev/shm/test_rocksdb --restore_rate_limit=2097152 --use_existing_db
```
- db bench on backup and restore to ensure no performance regression.
  - backup (avg over 50 runs): pre-change: 1.90443e+06 micros/op; post-change: 1.8993e+06 micros/op (improve by 0.2%)
  - restore (avg over 50 runs): pre-change: 1.79105e+06 micros/op; post-change: 1.78192e+06 micros/op (improve by 0.5%)

```
# Set up
./db_bench --benchmarks=fillrandom -db=/tmp/test_rocksdb -num=10000000

# benchmark
TEST_TMPDIR=/tmp/test_rocksdb
NUM_RUN=50
for ((j=0;j<$NUM_RUN;j++))
do
   ./db_bench -db=$TEST_TMPDIR -num=10000000 -benchmarks=backup -use_existing_db | egrep 'backup'
  # Restore
  #./db_bench -db=$TEST_TMPDIR -num=10000000 -benchmarks=restore -use_existing_db
done > rate_limit.txt && awk -v NUM_RUN=$NUM_RUN '{sum+=$3;sum_sqrt+=$3^2}END{print sum/NUM_RUN, sqrt(sum_sqrt/NUM_RUN-(sum/NUM_RUN)^2)}' rate_limit.txt >> rate_limit_2.txt
```

Reviewed By: hx235

Differential Revision: D36327418

Pulled By: cbi42

fbshipit-source-id: e75d4307cff815945482df5ba630c1e88d064691
This commit is contained in:
Changyu Bi 2022-05-24 10:28:57 -07:00 committed by Facebook GitHub Bot
parent fd24e4479b
commit 8515bd50c9
17 changed files with 265 additions and 125 deletions

View File

@ -303,8 +303,14 @@ void Reader::UnmarkEOFInternal() {
}
Slice read_buffer;
Status status = file_->Read(remaining, &read_buffer,
backing_store_ + eof_offset_);
// TODO: rate limit log reader with approriate priority.
// TODO: avoid overcharging rate limiter:
// Note that the Read here might overcharge SequentialFileReader's internal
// rate limiter if priority is not IO_TOTAL, e.g., when there is not enough
// content left until EOF to read.
Status status =
file_->Read(remaining, &read_buffer, backing_store_ + eof_offset_,
Env::IO_TOTAL /* rate_limiter_priority */);
size_t added = read_buffer.size();
end_of_buffer_offset_ += added;
@ -349,7 +355,13 @@ bool Reader::ReadMore(size_t* drop_size, int *error) {
if (!eof_ && !read_error_) {
// Last read was a full read, so this is a trailer to skip
buffer_.clear();
Status status = file_->Read(kBlockSize, &buffer_, backing_store_);
// TODO: rate limit log reader with approriate priority.
// TODO: avoid overcharging rate limiter:
// Note that the Read here might overcharge SequentialFileReader's internal
// rate limiter if priority is not IO_TOTAL, e.g., when there is not enough
// content left until EOF to read.
Status status = file_->Read(kBlockSize, &buffer_, backing_store_,
Env::IO_TOTAL /* rate_limiter_priority */);
TEST_SYNC_POINT_CALLBACK("LogReader::ReadMore:AfterReadFile", &status);
end_of_buffer_offset_ += buffer_.size();
if (!status.ok()) {
@ -639,7 +651,13 @@ bool FragmentBufferedReader::TryReadMore(size_t* drop_size, int* error) {
if (!eof_ && !read_error_) {
// Last read was a full read, so this is a trailer to skip
buffer_.clear();
Status status = file_->Read(kBlockSize, &buffer_, backing_store_);
// TODO: rate limit log reader with approriate priority.
// TODO: avoid overcharging rate limiter:
// Note that the Read here might overcharge SequentialFileReader's internal
// rate limiter if priority is not IO_TOTAL, e.g., when there is not enough
// content left until EOF to read.
Status status = file_->Read(kBlockSize, &buffer_, backing_store_,
Env::IO_TOTAL /* rate_limiter_priority */);
end_of_buffer_offset_ += buffer_.size();
if (!status.ok()) {
buffer_.clear();

View File

@ -358,7 +358,7 @@ class Repairer {
std::unique_ptr<SequentialFileReader> lfile_reader;
Status status = SequentialFileReader::Create(
fs, logname, fs->OptimizeForLogRead(file_options_), &lfile_reader,
nullptr);
nullptr /* dbg */, nullptr /* rate limiter */);
if (!status.ok()) {
return status;
}

View File

@ -67,8 +67,9 @@ Status TransactionLogIteratorImpl::OpenLogFile(
}
}
if (s.ok()) {
file_reader->reset(new SequentialFileReader(
std::move(file), fname, io_tracer_, options_->listeners));
file_reader->reset(new SequentialFileReader(std::move(file), fname,
io_tracer_, options_->listeners,
options_->rate_limiter.get()));
}
return s;
}

3
env/io_posix.h vendored
View File

@ -187,8 +187,7 @@ class PosixSequentialFile : public FSSequentialFile {
public:
PosixSequentialFile(const std::string& fname, FILE* file, int fd,
size_t logical_block_size,
const EnvOptions& options);
size_t logical_block_size, const EnvOptions& options);
virtual ~PosixSequentialFile();
virtual IOStatus Read(size_t n, const IOOptions& opts, Slice* result,

View File

@ -49,7 +49,10 @@ IOStatus CopyFile(FileSystem* fs, const std::string& source,
Slice slice;
while (size > 0) {
size_t bytes_to_read = std::min(sizeof(buffer), static_cast<size_t>(size));
io_s = status_to_io_status(src_reader->Read(bytes_to_read, &slice, buffer));
// TODO: rate limit copy file
io_s = status_to_io_status(
src_reader->Read(bytes_to_read, &slice, buffer,
Env::IO_TOTAL /* rate_limiter_priority */));
if (!io_s.ok()) {
return io_s;
}

View File

@ -15,16 +15,20 @@ IOStatus LineFileReader::Create(const std::shared_ptr<FileSystem>& fs,
const std::string& fname,
const FileOptions& file_opts,
std::unique_ptr<LineFileReader>* reader,
IODebugContext* dbg) {
IODebugContext* dbg,
RateLimiter* rate_limiter) {
std::unique_ptr<FSSequentialFile> file;
IOStatus io_s = fs->NewSequentialFile(fname, file_opts, &file, dbg);
if (io_s.ok()) {
reader->reset(new LineFileReader(std::move(file), fname));
reader->reset(new LineFileReader(
std::move(file), fname, nullptr,
std::vector<std::shared_ptr<EventListener>>{}, rate_limiter));
}
return io_s;
}
bool LineFileReader::ReadLine(std::string* out) {
bool LineFileReader::ReadLine(std::string* out,
Env::IOPriority rate_limiter_priority) {
assert(out);
if (!io_status_.ok()) {
// Status should be checked (or permit unchecked) any time we return false.
@ -50,7 +54,8 @@ bool LineFileReader::ReadLine(std::string* out) {
// else flush and reload buffer
out->append(buf_begin_, buf_end_ - buf_begin_);
Slice result;
io_status_ = sfr_.Read(buf_.size(), &result, buf_.data());
io_status_ =
sfr_.Read(buf_.size(), &result, buf_.data(), rate_limiter_priority);
IOSTATS_ADD(bytes_read, result.size());
if (!io_status_.ok()) {
io_status_.MustCheck();

View File

@ -32,7 +32,7 @@ class LineFileReader {
static IOStatus Create(const std::shared_ptr<FileSystem>& fs,
const std::string& fname, const FileOptions& file_opts,
std::unique_ptr<LineFileReader>* reader,
IODebugContext* dbg);
IODebugContext* dbg, RateLimiter* rate_limiter);
LineFileReader(const LineFileReader&) = delete;
LineFileReader& operator=(const LineFileReader&) = delete;
@ -41,7 +41,8 @@ class LineFileReader {
// the line to `out`, without delimiter, or returning false on failure. You
// must check GetStatus() to determine whether the failure was just
// end-of-file (OK status) or an I/O error (another status).
bool ReadLine(std::string* out);
// The internal rate limiter will be charged at the specified priority.
bool ReadLine(std::string* out, Env::IOPriority rate_limiter_priority);
// Returns the number of the line most recently returned from ReadLine.
// Return value is unspecified if ReadLine has returned false due to

View File

@ -283,7 +283,7 @@ IOStatus RandomAccessFileReader::MultiRead(
#endif // !NDEBUG
// To be paranoid modify scratch a little bit, so in case underlying
// FileSystem doesn't fill the buffer but return succee and `scratch` returns
// FileSystem doesn't fill the buffer but return success and `scratch` returns
// contains a previous block, returned value will not pass checksum.
// This byte might not change anything for direct I/O case, but it's OK.
for (size_t i = 0; i < num_reqs; i++) {

View File

@ -25,16 +25,18 @@ namespace ROCKSDB_NAMESPACE {
IOStatus SequentialFileReader::Create(
const std::shared_ptr<FileSystem>& fs, const std::string& fname,
const FileOptions& file_opts, std::unique_ptr<SequentialFileReader>* reader,
IODebugContext* dbg) {
IODebugContext* dbg, RateLimiter* rate_limiter) {
std::unique_ptr<FSSequentialFile> file;
IOStatus io_s = fs->NewSequentialFile(fname, file_opts, &file, dbg);
if (io_s.ok()) {
reader->reset(new SequentialFileReader(std::move(file), fname));
reader->reset(new SequentialFileReader(std::move(file), fname, nullptr, {},
rate_limiter));
}
return io_s;
}
IOStatus SequentialFileReader::Read(size_t n, Slice* result, char* scratch) {
IOStatus SequentialFileReader::Read(size_t n, Slice* result, char* scratch,
Env::IOPriority rate_limiter_priority) {
IOStatus io_s;
if (use_direct_io()) {
#ifndef ROCKSDB_LITE
@ -55,30 +57,48 @@ IOStatus SequentialFileReader::Read(size_t n, Slice* result, char* scratch) {
buf.Alignment(alignment);
buf.AllocateNewBuffer(size);
Slice tmp;
uint64_t orig_offset = 0;
FileOperationInfo::StartTimePoint start_ts;
if (ShouldNotifyListeners()) {
orig_offset = aligned_offset + buf.CurrentSize();
start_ts = FileOperationInfo::StartNow();
while (buf.CurrentSize() < size) {
size_t allowed;
if (rate_limiter_priority != Env::IO_TOTAL && rate_limiter_ != nullptr) {
allowed = rate_limiter_->RequestToken(
buf.Capacity() - buf.CurrentSize(), buf.Alignment(),
rate_limiter_priority, nullptr /* stats */,
RateLimiter::OpType::kRead);
} else {
assert(buf.CurrentSize() == 0);
allowed = size;
}
Slice tmp;
uint64_t orig_offset = 0;
FileOperationInfo::StartTimePoint start_ts;
if (ShouldNotifyListeners()) {
orig_offset = aligned_offset + buf.CurrentSize();
start_ts = FileOperationInfo::StartNow();
}
io_s = file_->PositionedRead(aligned_offset + buf.CurrentSize(), allowed,
IOOptions(), &tmp, buf.Destination(),
nullptr /* dbg */);
if (ShouldNotifyListeners()) {
auto finish_ts = FileOperationInfo::FinishNow();
NotifyOnFileReadFinish(orig_offset, tmp.size(), start_ts, finish_ts,
io_s);
}
buf.Size(buf.CurrentSize() + tmp.size());
if (!io_s.ok() || tmp.size() < allowed) {
break;
}
}
io_s = file_->PositionedRead(aligned_offset, size, IOOptions(), &tmp,
buf.BufferStart(), nullptr);
if (io_s.ok() && offset_advance < tmp.size()) {
buf.Size(tmp.size());
if (io_s.ok() && offset_advance < buf.CurrentSize()) {
r = buf.Read(scratch, offset_advance,
std::min(tmp.size() - offset_advance, n));
std::min(buf.CurrentSize() - offset_advance, n));
}
*result = Slice(scratch, r);
if (ShouldNotifyListeners()) {
auto finish_ts = FileOperationInfo::FinishNow();
NotifyOnFileReadFinish(orig_offset, tmp.size(), start_ts, finish_ts,
io_s);
}
#endif // !ROCKSDB_LITE
} else {
// To be paranoid, modify scratch a little bit, so in case underlying
// FileSystem doesn't fill the buffer but return succee and `scratch`
// FileSystem doesn't fill the buffer but return success and `scratch`
// returns contains a previous block, returned value will not pass
// checksum.
// It's hard to find useful byte for direct I/O case, so we skip it.
@ -86,22 +106,38 @@ IOStatus SequentialFileReader::Read(size_t n, Slice* result, char* scratch) {
scratch[0]++;
}
size_t read = 0;
while (read < n) {
size_t allowed;
if (rate_limiter_priority != Env::IO_TOTAL && rate_limiter_ != nullptr) {
allowed = rate_limiter_->RequestToken(
n - read, 0 /* alignment */, rate_limiter_priority,
nullptr /* stats */, RateLimiter::OpType::kRead);
} else {
allowed = n;
}
#ifndef ROCKSDB_LITE
FileOperationInfo::StartTimePoint start_ts;
if (ShouldNotifyListeners()) {
start_ts = FileOperationInfo::StartNow();
}
FileOperationInfo::StartTimePoint start_ts;
if (ShouldNotifyListeners()) {
start_ts = FileOperationInfo::StartNow();
}
#endif
io_s = file_->Read(n, IOOptions(), result, scratch, nullptr);
Slice tmp;
io_s = file_->Read(allowed, IOOptions(), &tmp, scratch + read,
nullptr /* dbg */);
#ifndef ROCKSDB_LITE
if (ShouldNotifyListeners()) {
auto finish_ts = FileOperationInfo::FinishNow();
size_t offset = offset_.fetch_add(result->size());
NotifyOnFileReadFinish(offset, result->size(), start_ts, finish_ts, io_s);
}
if (ShouldNotifyListeners()) {
auto finish_ts = FileOperationInfo::FinishNow();
size_t offset = offset_.fetch_add(tmp.size());
NotifyOnFileReadFinish(offset, tmp.size(), start_ts, finish_ts, io_s);
}
#endif
read += tmp.size();
if (!io_s.ok() || tmp.size() < allowed) {
break;
}
}
*result = Slice(scratch, read);
}
IOSTATS_ADD(bytes_read, result->size());
return io_s;

View File

@ -57,15 +57,19 @@ class SequentialFileReader {
FSSequentialFilePtr file_;
std::atomic<size_t> offset_{0}; // read offset
std::vector<std::shared_ptr<EventListener>> listeners_{};
RateLimiter* rate_limiter_;
public:
explicit SequentialFileReader(
std::unique_ptr<FSSequentialFile>&& _file, const std::string& _file_name,
const std::shared_ptr<IOTracer>& io_tracer = nullptr,
const std::vector<std::shared_ptr<EventListener>>& listeners = {})
const std::vector<std::shared_ptr<EventListener>>& listeners = {},
RateLimiter* rate_limiter =
nullptr) // TODO: migrate call sites to provide rate limiter
: file_name_(_file_name),
file_(std::move(_file), io_tracer, _file_name),
listeners_() {
listeners_(),
rate_limiter_(rate_limiter) {
#ifndef ROCKSDB_LITE
AddFileIOListeners(listeners);
#else
@ -77,11 +81,14 @@ class SequentialFileReader {
std::unique_ptr<FSSequentialFile>&& _file, const std::string& _file_name,
size_t _readahead_size,
const std::shared_ptr<IOTracer>& io_tracer = nullptr,
const std::vector<std::shared_ptr<EventListener>>& listeners = {})
const std::vector<std::shared_ptr<EventListener>>& listeners = {},
RateLimiter* rate_limiter =
nullptr) // TODO: migrate call sites to provide rate limiter
: file_name_(_file_name),
file_(NewReadaheadSequentialFile(std::move(_file), _readahead_size),
io_tracer, _file_name),
listeners_() {
listeners_(),
rate_limiter_(rate_limiter) {
#ifndef ROCKSDB_LITE
AddFileIOListeners(listeners);
#else
@ -91,12 +98,19 @@ class SequentialFileReader {
static IOStatus Create(const std::shared_ptr<FileSystem>& fs,
const std::string& fname, const FileOptions& file_opts,
std::unique_ptr<SequentialFileReader>* reader,
IODebugContext* dbg);
IODebugContext* dbg, RateLimiter* rate_limiter);
SequentialFileReader(const SequentialFileReader&) = delete;
SequentialFileReader& operator=(const SequentialFileReader&) = delete;
IOStatus Read(size_t n, Slice* result, char* scratch);
// `rate_limiter_priority` is used to charge the internal rate limiter when
// enabled. The special value `Env::IO_TOTAL` makes this operation bypass the
// rate limiter. The amount charged to the internal rate limiter is n, even
// when less than n bytes are actually read (e.g. at end of file). To avoid
// overcharging the rate limiter, the caller can use file size to cap n to
// read until end of file.
IOStatus Read(size_t n, Slice* result, char* scratch,
Env::IOPriority rate_limiter_priority);
IOStatus Skip(uint64_t n);

View File

@ -271,7 +271,7 @@ Status RocksDBOptionsParser::Parse(const ConfigOptions& config_options_in,
std::unordered_map<std::string, std::string> opt_map;
std::string line;
// we only support single-lined statement.
while (lf_reader.ReadLine(&line)) {
while (lf_reader.ReadLine(&line, Env::IO_TOTAL /* rate_limiter_priority */)) {
int line_num = static_cast<int>(lf_reader.GetLineNumber());
line = TrimAndRemoveComment(line);
if (line.empty()) {

View File

@ -60,6 +60,7 @@
#include "rocksdb/slice_transform.h"
#include "rocksdb/stats_history.h"
#include "rocksdb/table.h"
#include "rocksdb/utilities/backup_engine.h"
#include "rocksdb/utilities/object_registry.h"
#include "rocksdb/utilities/optimistic_transaction_db.h"
#include "rocksdb/utilities/options_type.h"
@ -159,8 +160,10 @@ IF_ROCKSDB_LITE("",
"randomtransaction,"
"randomreplacekeys,"
"timeseries,"
"getmergeoperands",
"getmergeoperands,",
"readrandomoperands,"
"backup,"
"restore"
"Comma-separated list of operations to run in the specified"
" order. Available benchmarks:\n"
@ -250,7 +253,10 @@ IF_ROCKSDB_LITE("",
"\treadrandomoperands -- read random keys using `GetMergeOperands()`. An "
"operation includes a rare but possible retry in case it got "
"`Status::Incomplete()`. This happens upon encountering more keys than "
"have ever been seen by the thread (or eight initially)\n");
"have ever been seen by the thread (or eight initially)\n"
"\tbackup -- Create a backup of the current DB and verify that a new backup is corrected. "
"Rate limit can be specified through --backup_rate_limit\n"
"\trestore -- Restore the DB from the latest backup available, rate limit can be specified through --restore_rate_limit\n");
DEFINE_int64(num, 1000000, "Number of key/values to place in database");
@ -1146,6 +1152,22 @@ DEFINE_bool(charge_table_reader, false,
"CacheEntryRoleOptions::charged of"
"CacheEntryRole::kBlockBasedTableReader");
DEFINE_uint64(backup_rate_limit, 0ull,
"If non-zero, db_bench will rate limit reads and writes for DB "
"backup. This "
"is the global rate in ops/second.");
DEFINE_uint64(restore_rate_limit, 0ull,
"If non-zero, db_bench will rate limit reads and writes for DB "
"restore. This "
"is the global rate in ops/second.");
DEFINE_string(backup_dir, "",
"If not empty string, use the given dir for backup.");
DEFINE_string(restore_dir, "",
"If not empty string, use the given dir for restore.");
static enum ROCKSDB_NAMESPACE::CompressionType StringToCompressionType(
const char* ctype) {
assert(ctype);
@ -3512,6 +3534,12 @@ class Benchmark {
} else if (name == "readrandomoperands") {
read_operands_ = true;
method = &Benchmark::ReadRandom;
#ifndef ROCKSDB_LITE
} else if (name == "backup") {
method = &Benchmark::Backup;
} else if (name == "restore") {
method = &Benchmark::Restore;
#endif
} else if (!name.empty()) { // No error message for empty name
fprintf(stderr, "unknown benchmark '%s'\n", name.c_str());
ErrorExit();
@ -3544,6 +3572,13 @@ class Benchmark {
fprintf(stdout, "DB path: [%s]\n", FLAGS_db.c_str());
#ifndef ROCKSDB_LITE
if (name == "backup") {
std::cout << "Backup path: [" << FLAGS_backup_dir << "]" << std::endl;
} else if (name == "restore") {
std::cout << "Backup path: [" << FLAGS_backup_dir << "]" << std::endl;
std::cout << "Restore path: [" << FLAGS_restore_dir << "]"
<< std::endl;
}
// A trace_file option can be provided both for trace and replay
// operations. But db_bench does not support tracing and replaying at
// the same time, for now. So, start tracing only when it is not a
@ -8224,6 +8259,47 @@ class Benchmark {
}
}
void Backup(ThreadState* thread) {
DB* db = SelectDB(thread);
std::unique_ptr<BackupEngineOptions> engine_options(
new BackupEngineOptions(FLAGS_backup_dir));
Status s;
BackupEngine* backup_engine;
if (FLAGS_backup_rate_limit > 0) {
engine_options->backup_rate_limiter.reset(NewGenericRateLimiter(
FLAGS_backup_rate_limit, 100000 /* refill_period_us */,
10 /* fairness */, RateLimiter::Mode::kAllIo));
}
// Build new backup of the entire DB
engine_options->destroy_old_data = true;
s = BackupEngine::Open(FLAGS_env, *engine_options, &backup_engine);
assert(s.ok());
s = backup_engine->CreateNewBackup(db);
assert(s.ok());
std::vector<BackupInfo> backup_info;
backup_engine->GetBackupInfo(&backup_info);
// Verify that a new backup is created
assert(backup_info.size() == 1);
}
void Restore(ThreadState* /* thread */) {
std::unique_ptr<BackupEngineOptions> engine_options(
new BackupEngineOptions(FLAGS_backup_dir));
if (FLAGS_restore_rate_limit > 0) {
engine_options->restore_rate_limiter.reset(NewGenericRateLimiter(
FLAGS_restore_rate_limit, 100000 /* refill_period_us */,
10 /* fairness */, RateLimiter::Mode::kAllIo));
}
BackupEngineReadOnly* backup_engine;
Status s =
BackupEngineReadOnly::Open(FLAGS_env, *engine_options, &backup_engine);
assert(s.ok());
s = backup_engine->RestoreDBFromLatestBackup(FLAGS_restore_dir,
FLAGS_restore_dir);
assert(s.ok());
delete backup_engine;
}
#endif // ROCKSDB_LITE
};
@ -8369,6 +8445,14 @@ int db_bench_tool(int argc, char** argv) {
FLAGS_db = default_db_path;
}
if (FLAGS_backup_dir.empty()) {
FLAGS_backup_dir = FLAGS_db + "/backup";
}
if (FLAGS_restore_dir.empty()) {
FLAGS_restore_dir = FLAGS_db + "/restore";
}
if (FLAGS_stats_interval_seconds > 0) {
// When both are set then FLAGS_stats_interval determines the frequency
// at which the timer is checked for FLAGS_stats_interval_seconds

View File

@ -2578,8 +2578,9 @@ void DumpWalFile(Options options, std::string wal_file, bool print_header,
const auto& fs = options.env->GetFileSystem();
FileOptions soptions(options);
std::unique_ptr<SequentialFileReader> wal_file_reader;
Status status = SequentialFileReader::Create(fs, wal_file, soptions,
&wal_file_reader, nullptr);
Status status = SequentialFileReader::Create(
fs, wal_file, soptions, &wal_file_reader, nullptr /* dbg */,
nullptr /* rate_limiter */);
if (!status.ok()) {
if (exec_state) {
*exec_state = LDBCommandExecuteResult::Failed("Failed to open WAL file " +

View File

@ -160,7 +160,8 @@ class TraceAnalyzerTest : public testing::Test {
std::vector<std::string> result;
std::string line;
while (lf_reader.ReadLine(&line)) {
while (
lf_reader.ReadLine(&line, Env::IO_TOTAL /* rate_limiter_priority */)) {
result.push_back(line);
}

View File

@ -1054,7 +1054,8 @@ Status TraceAnalyzer::ReProcessing() {
LineFileReader lf_reader(
std::move(file), whole_key_path,
kTraceFileReadaheadSize /* filereadahead_size */);
for (cfs_[cf_id].w_count = 0; lf_reader.ReadLine(&get_key);
for (cfs_[cf_id].w_count = 0; lf_reader.ReadLine(
&get_key, Env::IO_TOTAL /* rate_limiter_priority */);
++cfs_[cf_id].w_count) {
input_key = ROCKSDB_NAMESPACE::LDBCommand::HexToString(get_key);
for (int type = 0; type < kTaTypeNum; type++) {

View File

@ -597,7 +597,8 @@ class ReadaheadSequentialFileTest : public testing::Test,
ReadaheadSequentialFileTest() {}
std::string Read(size_t n) {
Slice result;
Status s = test_read_holder_->Read(n, &result, scratch_.get());
Status s = test_read_holder_->Read(
n, &result, scratch_.get(), Env::IO_TOTAL /* rate_limiter_priority*/);
EXPECT_TRUE(s.ok() || s.IsInvalidArgument());
return std::string(result.data(), result.size());
}
@ -724,10 +725,11 @@ TEST(LineFileReaderTest, LineFileReaderTest) {
{
std::unique_ptr<LineFileReader> reader;
ASSERT_OK(LineFileReader::Create(fs, "testfile", FileOptions(), &reader,
nullptr));
nullptr /* dbg */,
nullptr /* rate_limiter */));
std::string line;
int count = 0;
while (reader->ReadLine(&line)) {
while (reader->ReadLine(&line, Env::IO_TOTAL /* rate_limiter_priority */)) {
ASSERT_EQ(line, GenerateLine(count));
++count;
ASSERT_EQ(static_cast<int>(reader->GetLineNumber()), count);
@ -736,7 +738,8 @@ TEST(LineFileReaderTest, LineFileReaderTest) {
ASSERT_EQ(count, nlines);
ASSERT_EQ(static_cast<int>(reader->GetLineNumber()), count);
// And still
ASSERT_FALSE(reader->ReadLine(&line));
ASSERT_FALSE(
reader->ReadLine(&line, Env::IO_TOTAL /* rate_limiter_priority */));
ASSERT_OK(reader->GetStatus());
ASSERT_EQ(static_cast<int>(reader->GetLineNumber()), count);
}
@ -745,12 +748,14 @@ TEST(LineFileReaderTest, LineFileReaderTest) {
{
std::unique_ptr<LineFileReader> reader;
ASSERT_OK(LineFileReader::Create(fs, "testfile", FileOptions(), &reader,
nullptr));
nullptr /* dbg */,
nullptr /* rate_limiter */));
std::string line;
int count = 0;
// Read part way through the file
while (count < nlines / 4) {
ASSERT_TRUE(reader->ReadLine(&line));
ASSERT_TRUE(
reader->ReadLine(&line, Env::IO_TOTAL /* rate_limiter_priority */));
ASSERT_EQ(line, GenerateLine(count));
++count;
ASSERT_EQ(static_cast<int>(reader->GetLineNumber()), count);
@ -767,7 +772,7 @@ TEST(LineFileReaderTest, LineFileReaderTest) {
});
SyncPoint::GetInstance()->EnableProcessing();
while (reader->ReadLine(&line)) {
while (reader->ReadLine(&line, Env::IO_TOTAL /* rate_limiter_priority */)) {
ASSERT_EQ(line, GenerateLine(count));
++count;
ASSERT_EQ(static_cast<int>(reader->GetLineNumber()), count);
@ -777,7 +782,8 @@ TEST(LineFileReaderTest, LineFileReaderTest) {
ASSERT_EQ(callback_count, 1);
// Still get error & no retry
ASSERT_FALSE(reader->ReadLine(&line));
ASSERT_FALSE(
reader->ReadLine(&line, Env::IO_TOTAL /* rate_limiter_priority */));
ASSERT_TRUE(reader->GetStatus().IsCorruption());
ASSERT_EQ(callback_count, 1);

View File

@ -1922,7 +1922,7 @@ IOStatus BackupEngineImpl::RestoreDBFromBackup(
GetAbsolutePath(file), dst, Temperature::kUnknown /* src_temp */,
file_info->temp, "" /* contents */, backup_env_, db_env_,
EnvOptions() /* src_env_options */, options_.sync,
options_.restore_rate_limiter.get(), 0 /* size_limit */,
options_.restore_rate_limiter.get(), file_info->size,
nullptr /* stats */);
RestoreAfterCopyOrCreateWorkItem after_copy_or_create_work_item(
copy_or_create_work_item.result.get_future(), file, dst,
@ -2104,7 +2104,8 @@ IOStatus BackupEngineImpl::CopyOrCreateFile(
// Return back current temperature in FileSystem
*src_temperature = src_file->GetTemperature();
src_reader.reset(new SequentialFileReader(std::move(src_file), src));
src_reader.reset(new SequentialFileReader(
std::move(src_file), src, nullptr /* io_tracer */, {}, rate_limiter));
buf.reset(new char[buf_size]);
}
@ -2116,11 +2117,8 @@ IOStatus BackupEngineImpl::CopyOrCreateFile(
if (!src.empty()) {
size_t buffer_to_read =
(buf_size < size_limit) ? buf_size : static_cast<size_t>(size_limit);
io_s = src_reader->Read(buffer_to_read, &data, buf.get());
if (rate_limiter != nullptr) {
rate_limiter->Request(data.size(), Env::IO_LOW, nullptr /* stats */,
RateLimiter::OpType::kRead);
}
io_s = src_reader->Read(buffer_to_read, &data, buf.get(),
Env::IO_LOW /* rate_limiter_priority */);
*bytes_toward_next_callback += data.size();
} else {
data = contents;
@ -2421,16 +2419,14 @@ IOStatus BackupEngineImpl::ReadFileAndComputeChecksum(
std::unique_ptr<SequentialFileReader> src_reader;
auto file_options = FileOptions(src_env_options);
file_options.temperature = src_temperature;
IOStatus io_s = SequentialFileReader::Create(src_fs, src, file_options,
&src_reader, nullptr);
RateLimiter* rate_limiter = options_.backup_rate_limiter.get();
IOStatus io_s = SequentialFileReader::Create(
src_fs, src, file_options, &src_reader, nullptr /* dbg */, rate_limiter);
if (!io_s.ok()) {
return io_s;
}
RateLimiter* rate_limiter = options_.backup_rate_limiter.get();
size_t buf_size =
rate_limiter ? static_cast<size_t>(rate_limiter->GetSingleBurstBytes())
: kDefaultCopyFileBufferSize;
size_t buf_size = kDefaultCopyFileBufferSize;
std::unique_ptr<char[]> buf(new char[buf_size]);
Slice data;
@ -2440,11 +2436,8 @@ IOStatus BackupEngineImpl::ReadFileAndComputeChecksum(
}
size_t buffer_to_read =
(buf_size < size_limit) ? buf_size : static_cast<size_t>(size_limit);
io_s = src_reader->Read(buffer_to_read, &data, buf.get());
if (rate_limiter != nullptr) {
rate_limiter->Request(data.size(), Env::IO_LOW, nullptr /* stats */,
RateLimiter::OpType::kRead);
}
io_s = src_reader->Read(buffer_to_read, &data, buf.get(),
Env::IO_LOW /* rate_limiter_priority */);
if (!io_s.ok()) {
return io_s;
}
@ -2847,7 +2840,8 @@ IOStatus BackupEngineImpl::BackupMeta::LoadFromFile(
std::unique_ptr<LineFileReader> backup_meta_reader;
{
IOStatus io_s = LineFileReader::Create(fs_, meta_filename_, FileOptions(),
&backup_meta_reader, nullptr);
&backup_meta_reader,
nullptr /* dbg */, rate_limiter);
if (!io_s.ok()) {
return io_s;
}
@ -2859,12 +2853,8 @@ IOStatus BackupEngineImpl::BackupMeta::LoadFromFile(
// Failures handled at the end
std::string line;
if (backup_meta_reader->ReadLine(&line)) {
if (rate_limiter != nullptr) {
LoopRateLimitRequestHelper(line.size(), rate_limiter, Env::IO_LOW,
nullptr /* stats */,
RateLimiter::OpType::kRead);
}
if (backup_meta_reader->ReadLine(&line,
Env::IO_LOW /* rate_limiter_priority */)) {
if (StartsWith(line, kSchemaVersionPrefix)) {
std::string ver = line.substr(kSchemaVersionPrefix.size());
if (ver == "2" || StartsWith(ver, "2.")) {
@ -2880,29 +2870,17 @@ IOStatus BackupEngineImpl::BackupMeta::LoadFromFile(
}
if (!line.empty()) {
timestamp_ = std::strtoull(line.c_str(), nullptr, /*base*/ 10);
} else if (backup_meta_reader->ReadLine(&line)) {
if (rate_limiter != nullptr) {
LoopRateLimitRequestHelper(line.size(), rate_limiter, Env::IO_LOW,
nullptr /* stats */,
RateLimiter::OpType::kRead);
}
} else if (backup_meta_reader->ReadLine(
&line, Env::IO_LOW /* rate_limiter_priority */)) {
timestamp_ = std::strtoull(line.c_str(), nullptr, /*base*/ 10);
}
if (backup_meta_reader->ReadLine(&line)) {
if (rate_limiter != nullptr) {
LoopRateLimitRequestHelper(line.size(), rate_limiter, Env::IO_LOW,
nullptr /* stats */,
RateLimiter::OpType::kRead);
}
if (backup_meta_reader->ReadLine(&line,
Env::IO_LOW /* rate_limiter_priority */)) {
sequence_number_ = std::strtoull(line.c_str(), nullptr, /*base*/ 10);
}
uint32_t num_files = UINT32_MAX;
while (backup_meta_reader->ReadLine(&line)) {
if (rate_limiter != nullptr) {
LoopRateLimitRequestHelper(line.size(), rate_limiter, Env::IO_LOW,
nullptr /* stats */,
RateLimiter::OpType::kRead);
}
while (backup_meta_reader->ReadLine(
&line, Env::IO_LOW /* rate_limiter_priority */)) {
if (line.empty()) {
return IOStatus::Corruption("Unexpected empty line");
}
@ -2941,12 +2919,8 @@ IOStatus BackupEngineImpl::BackupMeta::LoadFromFile(
}
std::vector<std::shared_ptr<FileInfo>> files;
bool footer_present = false;
while (backup_meta_reader->ReadLine(&line)) {
if (rate_limiter != nullptr) {
LoopRateLimitRequestHelper(line.size(), rate_limiter, Env::IO_LOW,
nullptr /* stats */,
RateLimiter::OpType::kRead);
}
while (backup_meta_reader->ReadLine(
&line, Env::IO_LOW /* rate_limiter_priority */)) {
std::vector<std::string> components = StringSplit(line, ' ');
if (components.size() < 1) {
@ -3046,12 +3020,8 @@ IOStatus BackupEngineImpl::BackupMeta::LoadFromFile(
if (footer_present) {
assert(schema_major_version >= 2);
while (backup_meta_reader->ReadLine(&line)) {
if (rate_limiter != nullptr) {
LoopRateLimitRequestHelper(line.size(), rate_limiter, Env::IO_LOW,
nullptr /* stats */,
RateLimiter::OpType::kRead);
}
while (backup_meta_reader->ReadLine(
&line, Env::IO_LOW /* rate_limiter_priority */)) {
if (line.empty()) {
return IOStatus::Corruption("Unexpected empty line");
}