diff --git a/file/sequence_file_reader.cc b/file/sequence_file_reader.cc index a753c1d098..ac2f37b0d0 100644 --- a/file/sequence_file_reader.cc +++ b/file/sequence_file_reader.cc @@ -16,6 +16,7 @@ #include "monitoring/histogram.h" #include "monitoring/iostats_context_imp.h" #include "port/port.h" +#include "rocksdb/file_system.h" #include "test_util/sync_point.h" #include "util/aligned_buffer.h" #include "util/random.h" @@ -38,6 +39,8 @@ IOStatus SequentialFileReader::Create( IOStatus SequentialFileReader::Read(size_t n, Slice* result, char* scratch, Env::IOPriority rate_limiter_priority) { IOStatus io_s; + IOOptions io_opts; + io_opts.rate_limiter_priority = rate_limiter_priority; if (use_direct_io()) { // // |-offset_advance-|---bytes returned--| @@ -76,7 +79,7 @@ IOStatus SequentialFileReader::Read(size_t n, Slice* result, char* scratch, start_ts = FileOperationInfo::StartNow(); } io_s = file_->PositionedRead(aligned_offset + buf.CurrentSize(), allowed, - IOOptions(), &tmp, buf.Destination(), + io_opts, &tmp, buf.Destination(), nullptr /* dbg */); if (ShouldNotifyListeners()) { auto finish_ts = FileOperationInfo::FinishNow(); @@ -119,7 +122,7 @@ IOStatus SequentialFileReader::Read(size_t n, Slice* result, char* scratch, start_ts = FileOperationInfo::StartNow(); } Slice tmp; - io_s = file_->Read(allowed, IOOptions(), &tmp, scratch + read, + io_s = file_->Read(allowed, io_opts, &tmp, scratch + read, nullptr /* dbg */); if (ShouldNotifyListeners()) { auto finish_ts = FileOperationInfo::FinishNow(); diff --git a/utilities/backup/backup_engine_test.cc b/utilities/backup/backup_engine_test.cc index d21db35561..e162799ee6 100644 --- a/utilities/backup/backup_engine_test.cc +++ b/utilities/backup/backup_engine_test.cc @@ -181,6 +181,40 @@ class TestFs : public FileSystemWrapper { bool fail_reads_; }; + class CheckIOOptsSequentialFile : public FSSequentialFileOwnerWrapper { + public: + CheckIOOptsSequentialFile(std::unique_ptr&& f, + const std::string& file_name) + : FSSequentialFileOwnerWrapper(std::move(f)) { + is_sst_file_ = file_name.find(".sst") != std::string::npos; + } + + IOStatus Read(size_t n, const IOOptions& options, Slice* result, + char* scratch, IODebugContext* dbg) override { + // Backup currently associates only SST read with rate limiter priority + assert(!is_sst_file_ || options.rate_limiter_priority == + kExpectedBackupReadRateLimiterPri); + IOStatus rv = target()->Read(n, options, result, scratch, dbg); + return rv; + } + + IOStatus PositionedRead(uint64_t offset, size_t n, const IOOptions& options, + Slice* result, char* scratch, + IODebugContext* dbg) override { + // Backup currently associates only SST read with rate limiter priority + assert(!is_sst_file_ || options.rate_limiter_priority == + kExpectedBackupReadRateLimiterPri); + IOStatus rv = + target()->PositionedRead(offset, n, options, result, scratch, dbg); + return rv; + } + + private: + static const Env::IOPriority kExpectedBackupReadRateLimiterPri = + Env::IO_LOW; + bool is_sst_file_; + }; + IOStatus NewSequentialFile(const std::string& f, const FileOptions& file_opts, std::unique_ptr* r, IODebugContext* dbg) override { @@ -189,6 +223,14 @@ class TestFs : public FileSystemWrapper { r->reset( new TestFs::DummySequentialFile(dummy_sequential_file_fail_reads_)); return IOStatus::OK(); + } else if (check_iooptions_sequential_file_) { + std::unique_ptr file; + IOStatus s = + FileSystemWrapper::NewSequentialFile(f, file_opts, &file, dbg); + if (s.ok()) { + r->reset(new TestFs::CheckIOOptsSequentialFile(std::move(file), f)); + } + return s; } else { IOStatus s = FileSystemWrapper::NewSequentialFile(f, file_opts, r, dbg); if (s.ok()) { @@ -292,6 +334,11 @@ class TestFs : public FileSystemWrapper { dummy_sequential_file_fail_reads_ = dummy_sequential_file_fail_reads; } + void SetCheckIOOptionsSequentialFile(bool check_iooptions_sequential_file) { + MutexLock l(&mutex_); + check_iooptions_sequential_file_ = check_iooptions_sequential_file; + } + void SetGetChildrenFailure(bool fail) { get_children_failure_ = fail; } IOStatus GetChildren(const std::string& dir, const IOOptions& io_opts, std::vector* r, @@ -387,6 +434,7 @@ class TestFs : public FileSystemWrapper { port::Mutex mutex_; bool dummy_sequential_file_ = false; bool dummy_sequential_file_fail_reads_ = false; + bool check_iooptions_sequential_file_ = false; std::vector written_files_; std::vector filenames_for_mocked_attrs_; uint64_t limit_written_files_ = 1000000; @@ -1184,7 +1232,8 @@ TEST_P(BackupEngineTestWithParam, OnlineIntegrationTest) { // restore) // options_.db_paths.emplace_back(dbname_, 500 * 1024); // options_.db_paths.emplace_back(dbname_ + "_2", 1024 * 1024 * 1024); - + test_db_fs_->SetCheckIOOptionsSequentialFile(true); + test_backup_fs_->SetCheckIOOptionsSequentialFile(true); OpenDBAndBackupEngine(true); // write some data, backup, repeat for (int i = 0; i < 5; ++i) { @@ -1241,6 +1290,8 @@ TEST_P(BackupEngineTestWithParam, OnlineIntegrationTest) { AssertBackupConsistency(0, 0, 3 * keys_iteration, max_key); CloseBackupEngine(); + test_db_fs_->SetCheckIOOptionsSequentialFile(false); + test_backup_fs_->SetCheckIOOptionsSequentialFile(false); } #endif // !defined(ROCKSDB_VALGRIND_RUN) || defined(ROCKSDB_FULL_VALGRIND_RUN)