Set Read rate limiter priority dynamically and pass it to FS (#9996)

Summary:
### Context:
Background compactions and flush generate large reads and writes, and can be long running, especially for universal compaction. In some cases, this can impact foreground reads and writes by users.

### Solution
User, Flush, and Compaction reads share some code path. For this task, we update the rate_limiter_priority in ReadOptions for code paths (e.g. FindTable (mainly in BlockBasedTable::Open()) and various iterators), and eventually update the rate_limiter_priority in IOOptions for FSRandomAccessFile.

**This PR is for the Read path.** The **Read:** dynamic priority for different state are listed as follows:

| State | Normal | Delayed | Stalled |
| ----- | ------ | ------- | ------- |
|  Flush (verification read in BuildTable()) | IO_USER | IO_USER | IO_USER |
|  Compaction | IO_LOW  | IO_USER | IO_USER |
|  User | User provided | User provided | User provided |

We will respect the read_options that the user provided and will not set it.
The only sst read for Flush is the verification read in BuildTable(). It claims to be "regard as user read".

**Details**
1. Set read_options.rate_limiter_priority dynamically:
- User: Do not update the read_options. Use the read_options that the user provided.
- Compaction: Update read_options in CompactionJob::ProcessKeyValueCompaction().
- Flush: Update read_options in BuildTable().

2. Pass the rate limiter priority to FSRandomAccessFile functions:
- After calling the FindTable(), read_options is passed through GetTableReader(table_cache.cc), BlockBasedTableFactory::NewTableReader(block_based_table_factory.cc), and BlockBasedTable::Open(). The Open() needs some updates for the ReadOptions variable and the updates are also needed for the called functions,  including PrefetchTail(), PrepareIOOptions(), ReadFooterFromFile(), ReadMetaIndexblock(), ReadPropertiesBlock(), PrefetchIndexAndFilterBlocks(), and ReadRangeDelBlock().
- In RandomAccessFileReader, the functions to be updated include Read(), MultiRead(), ReadAsync(), and Prefetch().
- Update the downstream functions of NewIndexIterator(), NewDataBlockIterator(), and BlockBasedTableIterator().

### Test Plans
Add unit tests.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/9996

Reviewed By: anand1976

Differential Revision: D36452483

Pulled By: gitbw95

fbshipit-source-id: 60978204a4f849bb9261cb78d9bc1cb56d6008cf
This commit is contained in:
gitbw95 2022-05-18 19:41:44 -07:00 committed by Facebook GitHub Bot
parent f1303bf8d8
commit 4da34b97ee
18 changed files with 73 additions and 47 deletions

View File

@ -11,7 +11,7 @@
### New Features
* DB::GetLiveFilesStorageInfo is ready for production use.
* Add new stats PREFETCHED_BYTES_DISCARDED which records number of prefetched bytes discarded by RocksDB FilePrefetchBuffer on destruction and POLL_WAIT_MICROS records wait time for FS::Poll API completion.
* RemoteCompaction supports table_properties_collector_factories override on compaction worker.
* RemoteCompaction supports table_properties_collector_factories override on compaction worker.
### Public API changes
* Add rollback_deletion_type_callback to TransactionDBOptions so that write-prepared transactions know whether to issue a Delete or SingleDelete to cancel a previous key written during prior prepare phase. The PR aims to prevent mixing SingleDeletes and Deletes for the same key that can lead to undefined behaviors for write-prepared transactions.
@ -30,7 +30,8 @@
### Behavior changes
* Enforce the existing contract of SingleDelete so that SingleDelete cannot be mixed with Delete because it leads to undefined behavior. Fix a number of unit tests that violate the contract but happen to pass.
* ldb `--try_load_options` default to true if `--db` is specified and not creating a new DB, the user can still explicitly disable that by `--try_load_options=false` (or explicitly enable that by `--try_load_options`).
* During Flush write or Compaction write, the WriteController is used to determine whether DB writes are stalled or slowed down. The priority (Env::IOPriority) can then be determined accordingly and be passed in IOOptions to the file system.
* During Flush write or Compaction write/read, the WriteController is used to determine whether DB writes are stalled or slowed down. The priority (Env::IOPriority) can then be determined accordingly and be passed in IOOptions to the file system.
## 7.2.0 (04/15/2022)
### Bug Fixes

View File

@ -336,6 +336,7 @@ Status BuildTable(
// we will regrad this verification as user reads since the goal is
// to cache it here for further user reads
ReadOptions read_options;
read_options.rate_limiter_priority = Env::IO_USER;
std::unique_ptr<InternalIterator> it(table_cache->NewIterator(
read_options, file_options, tboptions.internal_comparator, *meta,
nullptr /* range_del_agg */, mutable_cf_options.prefix_extractor,

View File

@ -1350,7 +1350,7 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
ReadOptions read_options;
read_options.verify_checksums = true;
read_options.fill_cache = false;
read_options.rate_limiter_priority = Env::IO_LOW;
read_options.rate_limiter_priority = GetRateLimiterPriority();
// Compaction iterators shouldn't be confined to a single prefix.
// Compactions use Seek() for
// (a) concurrent compactions,

View File

@ -116,9 +116,8 @@ TEST_P(DBRateLimiterOnReadTest, Get) {
}
Init();
ASSERT_EQ(0, options_.rate_limiter->GetTotalRequests(Env::IO_USER));
int expected = 0;
// In Init(), compaction may request tokens for `Env::IO_USER`.
int64_t expected = options_.rate_limiter->GetTotalRequests(Env::IO_USER);
for (int i = 0; i < kNumFiles; ++i) {
{
std::string value;
@ -146,7 +145,8 @@ TEST_P(DBRateLimiterOnReadTest, NewMultiGet) {
}
Init();
ASSERT_EQ(0, options_.rate_limiter->GetTotalRequests(Env::IO_USER));
// In Init(), compaction may request tokens for `Env::IO_USER`.
int64_t expected = options_.rate_limiter->GetTotalRequests(Env::IO_USER);
const int kNumKeys = kNumFiles * kNumKeysPerFile;
{
@ -166,7 +166,7 @@ TEST_P(DBRateLimiterOnReadTest, NewMultiGet) {
ASSERT_TRUE(statuses[i].IsNotSupported());
}
}
ASSERT_EQ(0, options_.rate_limiter->GetTotalRequests(Env::IO_USER));
ASSERT_EQ(expected, options_.rate_limiter->GetTotalRequests(Env::IO_USER));
}
TEST_P(DBRateLimiterOnReadTest, OldMultiGet) {
@ -177,10 +177,10 @@ TEST_P(DBRateLimiterOnReadTest, OldMultiGet) {
}
Init();
ASSERT_EQ(0, options_.rate_limiter->GetTotalRequests(Env::IO_USER));
// In Init(), compaction may request tokens for `Env::IO_USER`.
int64_t expected = options_.rate_limiter->GetTotalRequests(Env::IO_USER);
const int kNumKeys = kNumFiles * kNumKeysPerFile;
int expected = 0;
{
std::vector<std::string> key_bufs;
key_bufs.reserve(kNumKeys);
@ -207,10 +207,10 @@ TEST_P(DBRateLimiterOnReadTest, Iterator) {
}
Init();
// In Init(), compaction may request tokens for `Env::IO_USER`.
int64_t expected = options_.rate_limiter->GetTotalRequests(Env::IO_USER);
std::unique_ptr<Iterator> iter(db_->NewIterator(GetReadOptions()));
ASSERT_EQ(0, options_.rate_limiter->GetTotalRequests(Env::IO_USER));
int expected = 0;
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
++expected;
ASSERT_EQ(expected, options_.rate_limiter->GetTotalRequests(Env::IO_USER));
@ -236,12 +236,12 @@ TEST_P(DBRateLimiterOnReadTest, VerifyChecksum) {
return;
}
Init();
ASSERT_EQ(0, options_.rate_limiter->GetTotalRequests(Env::IO_USER));
// In Init(), compaction may request tokens for `Env::IO_USER`.
int64_t expected = options_.rate_limiter->GetTotalRequests(Env::IO_USER);
ASSERT_OK(db_->VerifyChecksum(GetReadOptions()));
// The files are tiny so there should have just been one read per file.
int expected = kNumFiles;
expected += kNumFiles;
ASSERT_EQ(expected, options_.rate_limiter->GetTotalRequests(Env::IO_USER));
}
@ -251,11 +251,12 @@ TEST_P(DBRateLimiterOnReadTest, VerifyFileChecksums) {
}
Init();
ASSERT_EQ(0, options_.rate_limiter->GetTotalRequests(Env::IO_USER));
// In Init(), compaction may request tokens for `Env::IO_USER`.
int64_t expected = options_.rate_limiter->GetTotalRequests(Env::IO_USER);
ASSERT_OK(db_->VerifyFileChecksums(GetReadOptions()));
// The files are tiny so there should have just been one read per file.
int expected = kNumFiles;
expected += kNumFiles;
ASSERT_EQ(expected, options_.rate_limiter->GetTotalRequests(Env::IO_USER));
}

View File

@ -3987,12 +3987,14 @@ TEST_F(DBTest2, RateLimitedCompactionReads) {
// should be slightly above 512KB due to non-data blocks read. Arbitrarily
// chose 1MB as the upper bound on the total bytes read.
size_t rate_limited_bytes =
options.rate_limiter->GetTotalBytesThrough(Env::IO_TOTAL);
// There must be no charges at non-`IO_LOW` priorities.
size_t rate_limited_bytes = static_cast<size_t>(
options.rate_limiter->GetTotalBytesThrough(Env::IO_TOTAL));
// The charges can exist for `IO_LOW` and `IO_USER` priorities.
size_t rate_limited_bytes_by_pri =
options.rate_limiter->GetTotalBytesThrough(Env::IO_LOW) +
options.rate_limiter->GetTotalBytesThrough(Env::IO_USER);
ASSERT_EQ(rate_limited_bytes,
static_cast<size_t>(
options.rate_limiter->GetTotalBytesThrough(Env::IO_LOW)));
static_cast<size_t>(rate_limited_bytes_by_pri));
// Include the explicit prefetch of the footer in direct I/O case.
size_t direct_io_extra = use_direct_io ? 512 * 1024 : 0;
ASSERT_GE(
@ -4010,9 +4012,11 @@ TEST_F(DBTest2, RateLimitedCompactionReads) {
}
delete iter;
// bytes read for user iterator shouldn't count against the rate limit.
rate_limited_bytes_by_pri =
options.rate_limiter->GetTotalBytesThrough(Env::IO_LOW) +
options.rate_limiter->GetTotalBytesThrough(Env::IO_USER);
ASSERT_EQ(rate_limited_bytes,
static_cast<size_t>(
options.rate_limiter->GetTotalBytesThrough(Env::IO_LOW)));
static_cast<size_t>(rate_limited_bytes_by_pri));
}
}
}

View File

@ -78,6 +78,8 @@ inline IOStatus PrepareIOFromReadOptions(const ReadOptions& ro,
(!opts.timeout.count() || ro.io_timeout < opts.timeout)) {
opts.timeout = ro.io_timeout;
}
opts.rate_limiter_priority = ro.rate_limiter_priority;
return IOStatus::OK();
}

View File

@ -172,8 +172,11 @@ class RandomAccessFileReader {
size_t num_reqs, AlignedBuf* aligned_buf,
Env::IOPriority rate_limiter_priority) const;
IOStatus Prefetch(uint64_t offset, size_t n) const {
return file_->Prefetch(offset, n, IOOptions(), nullptr);
IOStatus Prefetch(uint64_t offset, size_t n,
const Env::IOPriority rate_limiter_priority) const {
IOOptions opts;
opts.rate_limiter_priority = rate_limiter_priority;
return file_->Prefetch(offset, n, opts, nullptr);
}
FSRandomAccessFile* file() { return file_.get(); }

View File

@ -47,7 +47,8 @@ InternalIteratorBase<IndexValue>* BinarySearchIndexReader::NewIterator(
const bool no_io = (read_options.read_tier == kBlockCacheTier);
CachableEntry<Block> index_block;
const Status s =
GetOrReadIndexBlock(no_io, get_context, lookup_context, &index_block);
GetOrReadIndexBlock(no_io, read_options.rate_limiter_priority,
get_context, lookup_context, &index_block);
if (!s.ok()) {
if (iter != nullptr) {
iter->Invalidate(s);

View File

@ -234,7 +234,7 @@ void BlockBasedTableIterator::InitDataBlock() {
// Enabled from the very first IO when ReadOptions.readahead_size is set.
block_prefetcher_.PrefetchIfNeeded(
rep, data_block_handle, read_options_.readahead_size, is_for_compaction,
read_options_.async_io);
read_options_.async_io, read_options_.rate_limiter_priority);
Status s;
table_->NewDataBlockIterator<DataBlockIter>(
read_options_, data_block_handle, &block_iter_, BlockType::kData,

View File

@ -561,7 +561,7 @@ Status BlockBasedTable::Open(
Footer footer;
std::unique_ptr<FilePrefetchBuffer> prefetch_buffer;
// Only retain read_options.deadline and read_options.io_timeout.
// From read_options, retain deadline, io_timeout, and rate_limiter_priority.
// In future, we may retain more
// options. Specifically, w ignore verify_checksums and default to
// checksum verification anyway when creating the index and filter
@ -569,6 +569,7 @@ Status BlockBasedTable::Open(
ReadOptions ro;
ro.deadline = read_options.deadline;
ro.io_timeout = read_options.io_timeout;
ro.rate_limiter_priority = read_options.rate_limiter_priority;
// prefetch both index and filters, down to all partitions
const bool prefetch_all = prefetch_index_and_filter_in_cache || level == 0;
@ -766,7 +767,8 @@ Status BlockBasedTable::PrefetchTail(
// Try file system prefetch
if (!file->use_direct_io() && !force_direct_prefetch) {
if (!file->Prefetch(prefetch_off, prefetch_len).IsNotSupported()) {
if (!file->Prefetch(prefetch_off, prefetch_len, ro.rate_limiter_priority)
.IsNotSupported()) {
prefetch_buffer->reset(new FilePrefetchBuffer(
0 /* readahead_size */, 0 /* max_readahead_size */,
false /* enable */, true /* track_min_offset */));
@ -778,6 +780,7 @@ Status BlockBasedTable::PrefetchTail(
prefetch_buffer->reset(
new FilePrefetchBuffer(0 /* readahead_size */, 0 /* max_readahead_size */,
true /* enable */, true /* track_min_offset */));
IOOptions opts;
Status s = file->PrepareIOOptions(ro, opts);
if (s.ok()) {

View File

@ -8,13 +8,14 @@
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "table/block_based/block_prefetcher.h"
#include "rocksdb/file_system.h"
#include "table/block_based/block_based_table_reader.h"
namespace ROCKSDB_NAMESPACE {
void BlockPrefetcher::PrefetchIfNeeded(const BlockBasedTable::Rep* rep,
const BlockHandle& handle,
size_t readahead_size,
bool is_for_compaction, bool async_io) {
void BlockPrefetcher::PrefetchIfNeeded(
const BlockBasedTable::Rep* rep, const BlockHandle& handle,
const size_t readahead_size, bool is_for_compaction, const bool async_io,
const Env::IOPriority rate_limiter_priority) {
if (is_for_compaction) {
rep->CreateFilePrefetchBufferIfNotExists(
compaction_readahead_size_, compaction_readahead_size_,
@ -84,7 +85,8 @@ void BlockPrefetcher::PrefetchIfNeeded(const BlockBasedTable::Rep* rep,
// we can fallback to reading from disk if Prefetch fails.
Status s = rep->file->Prefetch(
handle.offset(),
BlockBasedTable::BlockSizeWithTrailer(handle) + readahead_size_);
BlockBasedTable::BlockSizeWithTrailer(handle) + readahead_size_,
rate_limiter_priority);
if (s.IsNotSupported()) {
rep->CreateFilePrefetchBufferIfNotExists(initial_auto_readahead_size_,
max_auto_readahead_size,

View File

@ -20,7 +20,8 @@ class BlockPrefetcher {
void PrefetchIfNeeded(const BlockBasedTable::Rep* rep,
const BlockHandle& handle, size_t readahead_size,
bool is_for_compaction, bool async_io);
bool is_for_compaction, bool async_io,
Env::IOPriority rate_limiter_priority);
FilePrefetchBuffer* prefetch_buffer() { return prefetch_buffer_.get(); }
void UpdateReadPattern(const uint64_t& offset, const size_t& len) {

View File

@ -117,7 +117,8 @@ InternalIteratorBase<IndexValue>* HashIndexReader::NewIterator(
const bool no_io = (read_options.read_tier == kBlockCacheTier);
CachableEntry<Block> index_block;
const Status s =
GetOrReadIndexBlock(no_io, get_context, lookup_context, &index_block);
GetOrReadIndexBlock(no_io, read_options.rate_limiter_priority,
get_context, lookup_context, &index_block);
if (!s.ok()) {
if (iter != nullptr) {
iter->Invalidate(s);

View File

@ -33,7 +33,7 @@ Status BlockBasedTable::IndexReaderCommon::ReadIndexBlock(
}
Status BlockBasedTable::IndexReaderCommon::GetOrReadIndexBlock(
bool no_io, GetContext* get_context,
bool no_io, Env::IOPriority rate_limiter_priority, GetContext* get_context,
BlockCacheLookupContext* lookup_context,
CachableEntry<Block>* index_block) const {
assert(index_block != nullptr);
@ -44,6 +44,7 @@ Status BlockBasedTable::IndexReaderCommon::GetOrReadIndexBlock(
}
ReadOptions read_options;
read_options.rate_limiter_priority = rate_limiter_priority;
if (no_io) {
read_options.read_tier = kBlockCacheTier;
}

View File

@ -66,7 +66,8 @@ class BlockBasedTable::IndexReaderCommon : public BlockBasedTable::IndexReader {
return table_->get_rep()->table_options.cache_index_and_filter_blocks;
}
Status GetOrReadIndexBlock(bool no_io, GetContext* get_context,
Status GetOrReadIndexBlock(bool no_io, Env::IOPriority rate_limiter_priority,
GetContext* get_context,
BlockCacheLookupContext* lookup_context,
CachableEntry<Block>* index_block) const;

View File

@ -91,7 +91,8 @@ void PartitionedIndexIterator::InitPartitionedIndexBlock() {
// Enabled from the very first IO when ReadOptions.readahead_size is set.
block_prefetcher_.PrefetchIfNeeded(
rep, partitioned_index_handle, read_options_.readahead_size,
is_for_compaction, read_options_.async_io);
is_for_compaction, read_options_.async_io,
read_options_.rate_limiter_priority);
Status s;
table_->NewDataBlockIterator<IndexBlockIter>(
read_options_, partitioned_index_handle, &block_iter_,

View File

@ -49,7 +49,8 @@ InternalIteratorBase<IndexValue>* PartitionIndexReader::NewIterator(
const bool no_io = (read_options.read_tier == kBlockCacheTier);
CachableEntry<Block> index_block;
const Status s =
GetOrReadIndexBlock(no_io, get_context, lookup_context, &index_block);
GetOrReadIndexBlock(no_io, read_options.rate_limiter_priority,
get_context, lookup_context, &index_block);
if (!s.ok()) {
if (iter != nullptr) {
iter->Invalidate(s);
@ -82,6 +83,7 @@ InternalIteratorBase<IndexValue>* PartitionIndexReader::NewIterator(
ro.io_timeout = read_options.io_timeout;
ro.adaptive_readahead = read_options.adaptive_readahead;
ro.async_io = read_options.async_io;
ro.rate_limiter_priority = read_options.rate_limiter_priority;
// We don't return pinned data from index blocks, so no need
// to set `block_contents_pinned`.
@ -119,8 +121,9 @@ Status PartitionIndexReader::CacheDependencies(const ReadOptions& ro,
CachableEntry<Block> index_block;
{
Status s = GetOrReadIndexBlock(false /* no_io */, nullptr /* get_context */,
&lookup_context, &index_block);
Status s = GetOrReadIndexBlock(false /* no_io */, ro.rate_limiter_priority,
nullptr /* get_context */, &lookup_context,
&index_block);
if (!s.ok()) {
return s;
}

View File

@ -372,17 +372,17 @@ Status ReadFooterFromFile(const IOOptions& opts, RandomAccessFileReader* file,
// TODO: rate limit footer reads.
if (prefetch_buffer == nullptr ||
!prefetch_buffer->TryReadFromCache(
IOOptions(), file, read_offset, Footer::kMaxEncodedLength,
&footer_input, nullptr, Env::IO_TOTAL /* rate_limiter_priority */)) {
opts, file, read_offset, Footer::kMaxEncodedLength, &footer_input,
nullptr, opts.rate_limiter_priority)) {
if (file->use_direct_io()) {
s = file->Read(opts, read_offset, Footer::kMaxEncodedLength,
&footer_input, nullptr, &internal_buf,
Env::IO_TOTAL /* rate_limiter_priority */);
opts.rate_limiter_priority);
} else {
footer_buf.reserve(Footer::kMaxEncodedLength);
s = file->Read(opts, read_offset, Footer::kMaxEncodedLength,
&footer_input, &footer_buf[0], nullptr,
Env::IO_TOTAL /* rate_limiter_priority */);
opts.rate_limiter_priority);
}
if (!s.ok()) return s;
}