Pass rate_limiter_priority from SequentialFileReader to FS (#12296)

Summary:
**Context/Summary:**
The rate_limiter_priority passed to SequentialFileReader is now passed down to underlying file system. This allows the priority associated with backup/restore SST reads to be exposed to FS.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/12296

Test Plan: - Modified existing UT

Reviewed By: pdillinger

Differential Revision: D53100368

Pulled By: hx235

fbshipit-source-id: b4a28917efbb1b0d16f9d1c2b38769bffcff0f34
This commit is contained in:
Hui Xiao 2024-01-25 18:20:31 -08:00
parent 36797a43fe
commit cc20520d5f
2 changed files with 57 additions and 3 deletions

View file

@ -16,6 +16,7 @@
#include "monitoring/histogram.h"
#include "monitoring/iostats_context_imp.h"
#include "port/port.h"
#include "rocksdb/file_system.h"
#include "test_util/sync_point.h"
#include "util/aligned_buffer.h"
#include "util/random.h"
@ -38,6 +39,8 @@ IOStatus SequentialFileReader::Create(
IOStatus SequentialFileReader::Read(size_t n, Slice* result, char* scratch,
Env::IOPriority rate_limiter_priority) {
IOStatus io_s;
IOOptions io_opts;
io_opts.rate_limiter_priority = rate_limiter_priority;
if (use_direct_io()) {
//
// |-offset_advance-|---bytes returned--|
@ -76,7 +79,7 @@ IOStatus SequentialFileReader::Read(size_t n, Slice* result, char* scratch,
start_ts = FileOperationInfo::StartNow();
}
io_s = file_->PositionedRead(aligned_offset + buf.CurrentSize(), allowed,
IOOptions(), &tmp, buf.Destination(),
io_opts, &tmp, buf.Destination(),
nullptr /* dbg */);
if (ShouldNotifyListeners()) {
auto finish_ts = FileOperationInfo::FinishNow();
@ -119,7 +122,7 @@ IOStatus SequentialFileReader::Read(size_t n, Slice* result, char* scratch,
start_ts = FileOperationInfo::StartNow();
}
Slice tmp;
io_s = file_->Read(allowed, IOOptions(), &tmp, scratch + read,
io_s = file_->Read(allowed, io_opts, &tmp, scratch + read,
nullptr /* dbg */);
if (ShouldNotifyListeners()) {
auto finish_ts = FileOperationInfo::FinishNow();

View file

@ -181,6 +181,40 @@ class TestFs : public FileSystemWrapper {
bool fail_reads_;
};
class CheckIOOptsSequentialFile : public FSSequentialFileOwnerWrapper {
public:
CheckIOOptsSequentialFile(std::unique_ptr<FSSequentialFile>&& f,
const std::string& file_name)
: FSSequentialFileOwnerWrapper(std::move(f)) {
is_sst_file_ = file_name.find(".sst") != std::string::npos;
}
IOStatus Read(size_t n, const IOOptions& options, Slice* result,
char* scratch, IODebugContext* dbg) override {
// Backup currently associates only SST read with rate limiter priority
assert(!is_sst_file_ || options.rate_limiter_priority ==
kExpectedBackupReadRateLimiterPri);
IOStatus rv = target()->Read(n, options, result, scratch, dbg);
return rv;
}
IOStatus PositionedRead(uint64_t offset, size_t n, const IOOptions& options,
Slice* result, char* scratch,
IODebugContext* dbg) override {
// Backup currently associates only SST read with rate limiter priority
assert(!is_sst_file_ || options.rate_limiter_priority ==
kExpectedBackupReadRateLimiterPri);
IOStatus rv =
target()->PositionedRead(offset, n, options, result, scratch, dbg);
return rv;
}
private:
static const Env::IOPriority kExpectedBackupReadRateLimiterPri =
Env::IO_LOW;
bool is_sst_file_;
};
IOStatus NewSequentialFile(const std::string& f, const FileOptions& file_opts,
std::unique_ptr<FSSequentialFile>* r,
IODebugContext* dbg) override {
@ -189,6 +223,14 @@ class TestFs : public FileSystemWrapper {
r->reset(
new TestFs::DummySequentialFile(dummy_sequential_file_fail_reads_));
return IOStatus::OK();
} else if (check_iooptions_sequential_file_) {
std::unique_ptr<FSSequentialFile> file;
IOStatus s =
FileSystemWrapper::NewSequentialFile(f, file_opts, &file, dbg);
if (s.ok()) {
r->reset(new TestFs::CheckIOOptsSequentialFile(std::move(file), f));
}
return s;
} else {
IOStatus s = FileSystemWrapper::NewSequentialFile(f, file_opts, r, dbg);
if (s.ok()) {
@ -292,6 +334,11 @@ class TestFs : public FileSystemWrapper {
dummy_sequential_file_fail_reads_ = dummy_sequential_file_fail_reads;
}
void SetCheckIOOptionsSequentialFile(bool check_iooptions_sequential_file) {
MutexLock l(&mutex_);
check_iooptions_sequential_file_ = check_iooptions_sequential_file;
}
void SetGetChildrenFailure(bool fail) { get_children_failure_ = fail; }
IOStatus GetChildren(const std::string& dir, const IOOptions& io_opts,
std::vector<std::string>* r,
@ -387,6 +434,7 @@ class TestFs : public FileSystemWrapper {
port::Mutex mutex_;
bool dummy_sequential_file_ = false;
bool dummy_sequential_file_fail_reads_ = false;
bool check_iooptions_sequential_file_ = false;
std::vector<std::string> written_files_;
std::vector<std::string> filenames_for_mocked_attrs_;
uint64_t limit_written_files_ = 1000000;
@ -1184,7 +1232,8 @@ TEST_P(BackupEngineTestWithParam, OnlineIntegrationTest) {
// restore)
// options_.db_paths.emplace_back(dbname_, 500 * 1024);
// options_.db_paths.emplace_back(dbname_ + "_2", 1024 * 1024 * 1024);
test_db_fs_->SetCheckIOOptionsSequentialFile(true);
test_backup_fs_->SetCheckIOOptionsSequentialFile(true);
OpenDBAndBackupEngine(true);
// write some data, backup, repeat
for (int i = 0; i < 5; ++i) {
@ -1241,6 +1290,8 @@ TEST_P(BackupEngineTestWithParam, OnlineIntegrationTest) {
AssertBackupConsistency(0, 0, 3 * keys_iteration, max_key);
CloseBackupEngine();
test_db_fs_->SetCheckIOOptionsSequentialFile(false);
test_backup_fs_->SetCheckIOOptionsSequentialFile(false);
}
#endif // !defined(ROCKSDB_VALGRIND_RUN) || defined(ROCKSDB_FULL_VALGRIND_RUN)