mirror of https://github.com/facebook/rocksdb.git
Pass rate_limiter_priority from SequentialFileReader to FS (#12296)
Summary: **Context/Summary:** The rate_limiter_priority passed to SequentialFileReader is now passed down to underlying file system. This allows the priority associated with backup/restore SST reads to be exposed to FS. Pull Request resolved: https://github.com/facebook/rocksdb/pull/12296 Test Plan: - Modified existing UT Reviewed By: pdillinger Differential Revision: D53100368 Pulled By: hx235 fbshipit-source-id: b4a28917efbb1b0d16f9d1c2b38769bffcff0f34
This commit is contained in:
parent
f046a8f617
commit
a31fded253
|
@ -16,6 +16,7 @@
|
|||
#include "monitoring/histogram.h"
|
||||
#include "monitoring/iostats_context_imp.h"
|
||||
#include "port/port.h"
|
||||
#include "rocksdb/file_system.h"
|
||||
#include "test_util/sync_point.h"
|
||||
#include "util/aligned_buffer.h"
|
||||
#include "util/random.h"
|
||||
|
@ -38,6 +39,8 @@ IOStatus SequentialFileReader::Create(
|
|||
IOStatus SequentialFileReader::Read(size_t n, Slice* result, char* scratch,
|
||||
Env::IOPriority rate_limiter_priority) {
|
||||
IOStatus io_s;
|
||||
IOOptions io_opts;
|
||||
io_opts.rate_limiter_priority = rate_limiter_priority;
|
||||
if (use_direct_io()) {
|
||||
//
|
||||
// |-offset_advance-|---bytes returned--|
|
||||
|
@ -76,7 +79,7 @@ IOStatus SequentialFileReader::Read(size_t n, Slice* result, char* scratch,
|
|||
start_ts = FileOperationInfo::StartNow();
|
||||
}
|
||||
io_s = file_->PositionedRead(aligned_offset + buf.CurrentSize(), allowed,
|
||||
IOOptions(), &tmp, buf.Destination(),
|
||||
io_opts, &tmp, buf.Destination(),
|
||||
nullptr /* dbg */);
|
||||
if (ShouldNotifyListeners()) {
|
||||
auto finish_ts = FileOperationInfo::FinishNow();
|
||||
|
@ -119,7 +122,7 @@ IOStatus SequentialFileReader::Read(size_t n, Slice* result, char* scratch,
|
|||
start_ts = FileOperationInfo::StartNow();
|
||||
}
|
||||
Slice tmp;
|
||||
io_s = file_->Read(allowed, IOOptions(), &tmp, scratch + read,
|
||||
io_s = file_->Read(allowed, io_opts, &tmp, scratch + read,
|
||||
nullptr /* dbg */);
|
||||
if (ShouldNotifyListeners()) {
|
||||
auto finish_ts = FileOperationInfo::FinishNow();
|
||||
|
|
|
@ -181,6 +181,40 @@ class TestFs : public FileSystemWrapper {
|
|||
bool fail_reads_;
|
||||
};
|
||||
|
||||
class CheckIOOptsSequentialFile : public FSSequentialFileOwnerWrapper {
|
||||
public:
|
||||
CheckIOOptsSequentialFile(std::unique_ptr<FSSequentialFile>&& f,
|
||||
const std::string& file_name)
|
||||
: FSSequentialFileOwnerWrapper(std::move(f)) {
|
||||
is_sst_file_ = file_name.find(".sst") != std::string::npos;
|
||||
}
|
||||
|
||||
IOStatus Read(size_t n, const IOOptions& options, Slice* result,
|
||||
char* scratch, IODebugContext* dbg) override {
|
||||
// Backup currently associates only SST read with rate limiter priority
|
||||
assert(!is_sst_file_ || options.rate_limiter_priority ==
|
||||
kExpectedBackupReadRateLimiterPri);
|
||||
IOStatus rv = target()->Read(n, options, result, scratch, dbg);
|
||||
return rv;
|
||||
}
|
||||
|
||||
IOStatus PositionedRead(uint64_t offset, size_t n, const IOOptions& options,
|
||||
Slice* result, char* scratch,
|
||||
IODebugContext* dbg) override {
|
||||
// Backup currently associates only SST read with rate limiter priority
|
||||
assert(!is_sst_file_ || options.rate_limiter_priority ==
|
||||
kExpectedBackupReadRateLimiterPri);
|
||||
IOStatus rv =
|
||||
target()->PositionedRead(offset, n, options, result, scratch, dbg);
|
||||
return rv;
|
||||
}
|
||||
|
||||
private:
|
||||
static const Env::IOPriority kExpectedBackupReadRateLimiterPri =
|
||||
Env::IO_LOW;
|
||||
bool is_sst_file_;
|
||||
};
|
||||
|
||||
IOStatus NewSequentialFile(const std::string& f, const FileOptions& file_opts,
|
||||
std::unique_ptr<FSSequentialFile>* r,
|
||||
IODebugContext* dbg) override {
|
||||
|
@ -189,6 +223,14 @@ class TestFs : public FileSystemWrapper {
|
|||
r->reset(
|
||||
new TestFs::DummySequentialFile(dummy_sequential_file_fail_reads_));
|
||||
return IOStatus::OK();
|
||||
} else if (check_iooptions_sequential_file_) {
|
||||
std::unique_ptr<FSSequentialFile> file;
|
||||
IOStatus s =
|
||||
FileSystemWrapper::NewSequentialFile(f, file_opts, &file, dbg);
|
||||
if (s.ok()) {
|
||||
r->reset(new TestFs::CheckIOOptsSequentialFile(std::move(file), f));
|
||||
}
|
||||
return s;
|
||||
} else {
|
||||
IOStatus s = FileSystemWrapper::NewSequentialFile(f, file_opts, r, dbg);
|
||||
if (s.ok()) {
|
||||
|
@ -292,6 +334,11 @@ class TestFs : public FileSystemWrapper {
|
|||
dummy_sequential_file_fail_reads_ = dummy_sequential_file_fail_reads;
|
||||
}
|
||||
|
||||
void SetCheckIOOptionsSequentialFile(bool check_iooptions_sequential_file) {
|
||||
MutexLock l(&mutex_);
|
||||
check_iooptions_sequential_file_ = check_iooptions_sequential_file;
|
||||
}
|
||||
|
||||
void SetGetChildrenFailure(bool fail) { get_children_failure_ = fail; }
|
||||
IOStatus GetChildren(const std::string& dir, const IOOptions& io_opts,
|
||||
std::vector<std::string>* r,
|
||||
|
@ -387,6 +434,7 @@ class TestFs : public FileSystemWrapper {
|
|||
port::Mutex mutex_;
|
||||
bool dummy_sequential_file_ = false;
|
||||
bool dummy_sequential_file_fail_reads_ = false;
|
||||
bool check_iooptions_sequential_file_ = false;
|
||||
std::vector<std::string> written_files_;
|
||||
std::vector<std::string> filenames_for_mocked_attrs_;
|
||||
uint64_t limit_written_files_ = 1000000;
|
||||
|
@ -1184,7 +1232,8 @@ TEST_P(BackupEngineTestWithParam, OnlineIntegrationTest) {
|
|||
// restore)
|
||||
// options_.db_paths.emplace_back(dbname_, 500 * 1024);
|
||||
// options_.db_paths.emplace_back(dbname_ + "_2", 1024 * 1024 * 1024);
|
||||
|
||||
test_db_fs_->SetCheckIOOptionsSequentialFile(true);
|
||||
test_backup_fs_->SetCheckIOOptionsSequentialFile(true);
|
||||
OpenDBAndBackupEngine(true);
|
||||
// write some data, backup, repeat
|
||||
for (int i = 0; i < 5; ++i) {
|
||||
|
@ -1241,6 +1290,8 @@ TEST_P(BackupEngineTestWithParam, OnlineIntegrationTest) {
|
|||
AssertBackupConsistency(0, 0, 3 * keys_iteration, max_key);
|
||||
|
||||
CloseBackupEngine();
|
||||
test_db_fs_->SetCheckIOOptionsSequentialFile(false);
|
||||
test_backup_fs_->SetCheckIOOptionsSequentialFile(false);
|
||||
}
|
||||
#endif // !defined(ROCKSDB_VALGRIND_RUN) || defined(ROCKSDB_FULL_VALGRIND_RUN)
|
||||
|
||||
|
|
Loading…
Reference in New Issue