Group rocksdb.sst.read.micros stat by different user read IOActivity + misc (#11444)

Summary:
**Context/Summary:**
- Similar to https://github.com/facebook/rocksdb/pull/11288 but for user read such as `Get(), MultiGet(), DBIterator::XXX(), Verify(File)Checksum()`.
   - For this, I refactored some user-facing `MultiGet` calls in `TransactionBase` and various types of `DB` so that it does not call a user-facing `Get()` but `GetImpl()` for passing the `ReadOptions::io_activity` check (see PR conversation)
   - New user read stats breakdown are guarded by `kExceptDetailedTimers` since measurement shows they have 4-5% regression to the upstream/main.

- Misc
   - More refactoring: with https://github.com/facebook/rocksdb/pull/11288, we complete passing `ReadOptions/IOOptions` to FS level. So we can now replace the previously [added](https://github.com/facebook/rocksdb/pull/9424) `rate_limiter_priority` parameter in `RandomAccessFileReader`'s `Read/MultiRead/Prefetch()` with `IOOptions::rate_limiter_priority`
   - Also, `ReadAsync()` call time is measured in `SST_READ_MICRO` now

Pull Request resolved: https://github.com/facebook/rocksdb/pull/11444

Test Plan:
- CI fake db crash/stress test
- Microbenchmarking

**Build** `make clean && ROCKSDB_NO_FBCODE=1 DEBUG_LEVEL=0 make -jN db_basic_bench`
- google benchmark version: 604f6fd3f4
- db_basic_bench_base: upstream
- db_basic_bench_pr: db_basic_bench_base + this PR
- asyncread_db_basic_bench_base: upstream + [db basic bench patch for IteratorNext](https://github.com/facebook/rocksdb/compare/main...hx235:rocksdb:micro_bench_async_read)
- asyncread_db_basic_bench_pr: asyncread_db_basic_bench_base + this PR

**Test**

Get
```
TEST_TMPDIR=/dev/shm ./db_basic_bench_{null_stat|base|pr} --benchmark_filter=DBGet/comp_style:0/max_data:134217728/per_key_size:256/enable_statistics:1/negative_query:0/enable_filter:0/mmap:1/threads:1 --benchmark_repetitions=1000
```

Result
```
Coming soon
```

AsyncRead
```
TEST_TMPDIR=/dev/shm ./asyncread_db_basic_bench_{base|pr} --benchmark_filter=IteratorNext/comp_style:0/max_data:134217728/per_key_size:256/enable_statistics:1/async_io:1/include_detailed_timers:0 --benchmark_repetitions=1000 > syncread_db_basic_bench_{base|pr}.out
```

Result
```
Base:
1956,1956,1968,1977,1979,1986,1988,1988,1988,1990,1991,1991,1993,1993,1993,1993,1994,1996,1997,1997,1997,1998,1999,2001,2001,2002,2004,2007,2007,2008,

PR (2.3% regression, due to measuring `SST_READ_MICRO` that wasn't measured before):
1993,2014,2016,2022,2024,2027,2027,2028,2028,2030,2031,2031,2032,2032,2038,2039,2042,2044,2044,2047,2047,2047,2048,2049,2050,2052,2052,2052,2053,2053,
```

Reviewed By: ajkr

Differential Revision: D45918925

Pulled By: hx235

fbshipit-source-id: 58a54560d9ebeb3a59b6d807639692614dad058a
This commit is contained in:
Hui Xiao 2023-08-08 17:26:50 -07:00 committed by Facebook GitHub Bot
parent 9c2ebcc2c3
commit 9a034801ce
83 changed files with 1127 additions and 494 deletions

View File

@ -154,11 +154,9 @@ Status BlobFileReader::ReadHeader(const RandomAccessFileReader* file_reader,
constexpr uint64_t read_offset = 0;
constexpr size_t read_size = BlobLogHeader::kSize;
// TODO: rate limit reading headers from blob files.
const Status s =
ReadFromFile(file_reader, read_options, read_offset, read_size,
statistics, &header_slice, &buf, &aligned_buf,
Env::IO_TOTAL /* rate_limiter_priority */);
statistics, &header_slice, &buf, &aligned_buf);
if (!s.ok()) {
return s;
}
@ -207,11 +205,9 @@ Status BlobFileReader::ReadFooter(const RandomAccessFileReader* file_reader,
const uint64_t read_offset = file_size - BlobLogFooter::kSize;
constexpr size_t read_size = BlobLogFooter::kSize;
// TODO: rate limit reading footers from blob files.
const Status s =
ReadFromFile(file_reader, read_options, read_offset, read_size,
statistics, &footer_slice, &buf, &aligned_buf,
Env::IO_TOTAL /* rate_limiter_priority */);
statistics, &footer_slice, &buf, &aligned_buf);
if (!s.ok()) {
return s;
}
@ -242,8 +238,7 @@ Status BlobFileReader::ReadFromFile(const RandomAccessFileReader* file_reader,
const ReadOptions& read_options,
uint64_t read_offset, size_t read_size,
Statistics* statistics, Slice* slice,
Buffer* buf, AlignedBuf* aligned_buf,
Env::IOPriority rate_limiter_priority) {
Buffer* buf, AlignedBuf* aligned_buf) {
assert(slice);
assert(buf);
assert(aligned_buf);
@ -264,13 +259,13 @@ Status BlobFileReader::ReadFromFile(const RandomAccessFileReader* file_reader,
constexpr char* scratch = nullptr;
s = file_reader->Read(io_options, read_offset, read_size, slice, scratch,
aligned_buf, rate_limiter_priority);
aligned_buf);
} else {
buf->reset(new char[read_size]);
constexpr AlignedBuf* aligned_scratch = nullptr;
s = file_reader->Read(io_options, read_offset, read_size, slice, buf->get(),
aligned_scratch, rate_limiter_priority);
aligned_scratch);
}
if (!s.ok()) {
@ -345,8 +340,7 @@ Status BlobFileReader::GetBlob(
}
prefetched = prefetch_buffer->TryReadFromCache(
io_options, file_reader_.get(), record_offset,
static_cast<size_t>(record_size), &record_slice, &s,
read_options.rate_limiter_priority, for_compaction);
static_cast<size_t>(record_size), &record_slice, &s, for_compaction);
if (!s.ok()) {
return s;
}
@ -357,10 +351,10 @@ Status BlobFileReader::GetBlob(
PERF_COUNTER_ADD(blob_read_count, 1);
PERF_COUNTER_ADD(blob_read_byte, record_size);
PERF_TIMER_GUARD(blob_read_time);
const Status s = ReadFromFile(
file_reader_.get(), read_options, record_offset,
static_cast<size_t>(record_size), statistics_, &record_slice, &buf,
&aligned_buf, read_options.rate_limiter_priority);
const Status s =
ReadFromFile(file_reader_.get(), read_options, record_offset,
static_cast<size_t>(record_size), statistics_,
&record_slice, &buf, &aligned_buf);
if (!s.ok()) {
return s;
}
@ -468,9 +462,12 @@ void BlobFileReader::MultiGetBlob(
TEST_SYNC_POINT("BlobFileReader::MultiGetBlob:ReadFromFile");
PERF_COUNTER_ADD(blob_read_count, num_blobs);
PERF_COUNTER_ADD(blob_read_byte, total_len);
s = file_reader_->MultiRead(IOOptions(), read_reqs.data(), read_reqs.size(),
direct_io ? &aligned_buf : nullptr,
read_options.rate_limiter_priority);
IOOptions opts;
s = file_reader_->PrepareIOOptions(read_options, opts);
if (s.ok()) {
s = file_reader_->MultiRead(opts, read_reqs.data(), read_reqs.size(),
direct_io ? &aligned_buf : nullptr);
}
if (!s.ok()) {
for (auto& req : read_reqs) {
req.status.PermitUncheckedError();

View File

@ -89,8 +89,7 @@ class BlobFileReader {
const ReadOptions& read_options,
uint64_t read_offset, size_t read_size,
Statistics* statistics, Slice* slice, Buffer* buf,
AlignedBuf* aligned_buf,
Env::IOPriority rate_limiter_priority);
AlignedBuf* aligned_buf);
static Status VerifyBlob(const Slice& record_slice, const Slice& user_key,
uint64_t value_size);

View File

@ -29,9 +29,8 @@ Status BlobLogSequentialReader::ReadSlice(uint64_t size, Slice* slice,
StopWatch read_sw(clock_, statistics_, BLOB_DB_BLOB_FILE_READ_MICROS);
// TODO: rate limit `BlobLogSequentialReader` reads (it appears unused?)
Status s =
file_->Read(IOOptions(), next_byte_, static_cast<size_t>(size), slice,
buf, nullptr, Env::IO_TOTAL /* rate_limiter_priority */);
Status s = file_->Read(IOOptions(), next_byte_, static_cast<size_t>(size),
slice, buf, nullptr);
next_byte_ += size;
if (!s.ok()) {
return s;

View File

@ -7,6 +7,7 @@
#include "rocksdb/convenience.h"
#include "db/convenience_impl.h"
#include "db/db_impl/db_impl.h"
#include "util/cast_util.h"
@ -38,6 +39,22 @@ Status VerifySstFileChecksum(const Options& options,
return VerifySstFileChecksum(options, env_options, read_options, file_path);
}
Status VerifySstFileChecksum(const Options& options,
const EnvOptions& env_options,
const ReadOptions& _read_options,
const std::string& file_path,
const SequenceNumber& largest_seqno) {
if (_read_options.io_activity != Env::IOActivity::kUnknown) {
return Status::InvalidArgument(
"Can only call VerifySstFileChecksum with `ReadOptions::io_activity` "
"is "
"`Env::IOActivity::kUnknown`");
}
ReadOptions read_options(_read_options);
return VerifySstFileChecksumInternal(options, env_options, read_options,
file_path, largest_seqno);
}
Status VerifySstFileChecksumInternal(const Options& options,
const EnvOptions& env_options,
const ReadOptions& read_options,
const std::string& file_path,
@ -68,8 +85,8 @@ Status VerifySstFileChecksum(const Options& options,
!kImmortal, false /* force_direct_prefetch */, -1 /* level */);
reader_options.largest_seqno = largest_seqno;
s = ioptions.table_factory->NewTableReader(
reader_options, std::move(file_reader), file_size, &table_reader,
false /* prefetch_index_and_filter_in_cache */);
read_options, reader_options, std::move(file_reader), file_size,
&table_reader, false /* prefetch_index_and_filter_in_cache */);
if (!s.ok()) {
return s;
}

15
db/convenience_impl.h Normal file
View File

@ -0,0 +1,15 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#pragma once
#include "rocksdb/db.h"
namespace ROCKSDB_NAMESPACE {
Status VerifySstFileChecksumInternal(const Options& options,
const EnvOptions& env_options,
const ReadOptions& read_options,
const std::string& file_path,
const SequenceNumber& largest_seqno = 0);
} // namespace ROCKSDB_NAMESPACE

View File

@ -43,18 +43,25 @@ Status CompactedDBImpl::Get(const ReadOptions& options, ColumnFamilyHandle*,
/*timestamp*/ nullptr);
}
Status CompactedDBImpl::Get(const ReadOptions& options, ColumnFamilyHandle*,
const Slice& key, PinnableSlice* value,
std::string* timestamp) {
if (options.io_activity != Env::IOActivity::kUnknown) {
Status CompactedDBImpl::Get(const ReadOptions& _read_options,
ColumnFamilyHandle*, const Slice& key,
PinnableSlice* value, std::string* timestamp) {
if (_read_options.io_activity != Env::IOActivity::kUnknown &&
_read_options.io_activity != Env::IOActivity::kGet) {
return Status::InvalidArgument(
"Cannot call Get with `ReadOptions::io_activity` != "
"`Env::IOActivity::kUnknown`");
"Can only call Get with `ReadOptions::io_activity` is "
"`Env::IOActivity::kUnknown` or `Env::IOActivity::kGet`");
}
ReadOptions read_options(_read_options);
if (read_options.io_activity == Env::IOActivity::kUnknown) {
read_options.io_activity = Env::IOActivity::kGet;
}
assert(user_comparator_);
if (options.timestamp) {
const Status s = FailIfTsMismatchCf(
DefaultColumnFamily(), *(options.timestamp), /*ts_for_read=*/true);
if (read_options.timestamp) {
const Status s =
FailIfTsMismatchCf(DefaultColumnFamily(), *(read_options.timestamp),
/*ts_for_read=*/true);
if (!s.ok()) {
return s;
}
@ -74,7 +81,7 @@ Status CompactedDBImpl::Get(const ReadOptions& options, ColumnFamilyHandle*,
GetWithTimestampReadCallback read_cb(kMaxSequenceNumber);
std::string* ts =
user_comparator_->timestamp_size() > 0 ? timestamp : nullptr;
LookupKey lkey(key, kMaxSequenceNumber, options.timestamp);
LookupKey lkey(key, kMaxSequenceNumber, read_options.timestamp);
GetContext get_context(user_comparator_, nullptr, nullptr, nullptr,
GetContext::kNotFound, lkey.user_key(), value,
/*columns=*/nullptr, ts, nullptr, nullptr, true,
@ -88,8 +95,8 @@ Status CompactedDBImpl::Get(const ReadOptions& options, ColumnFamilyHandle*,
/*b_has_ts=*/false) < 0) {
return Status::NotFound();
}
Status s = f.fd.table_reader->Get(options, lkey.internal_key(), &get_context,
nullptr);
Status s = f.fd.table_reader->Get(read_options, lkey.internal_key(),
&get_context, nullptr);
if (!s.ok() && !s.IsNotFound()) {
return s;
}
@ -106,14 +113,27 @@ std::vector<Status> CompactedDBImpl::MultiGet(
}
std::vector<Status> CompactedDBImpl::MultiGet(
const ReadOptions& options, const std::vector<ColumnFamilyHandle*>&,
const ReadOptions& _read_options, const std::vector<ColumnFamilyHandle*>&,
const std::vector<Slice>& keys, std::vector<std::string>* values,
std::vector<std::string>* timestamps) {
assert(user_comparator_);
size_t num_keys = keys.size();
if (_read_options.io_activity != Env::IOActivity::kUnknown &&
_read_options.io_activity != Env::IOActivity::kMultiGet) {
Status s = Status::InvalidArgument(
"Can only call MultiGet with `ReadOptions::io_activity` is "
"`Env::IOActivity::kUnknown` or `Env::IOActivity::kMultiGet`");
return std::vector<Status>(num_keys, s);
}
if (options.timestamp) {
Status s = FailIfTsMismatchCf(DefaultColumnFamily(), *(options.timestamp),
ReadOptions read_options(_read_options);
if (read_options.io_activity == Env::IOActivity::kUnknown) {
read_options.io_activity = Env::IOActivity::kMultiGet;
}
if (read_options.timestamp) {
Status s =
FailIfTsMismatchCf(DefaultColumnFamily(), *(read_options.timestamp),
/*ts_for_read=*/true);
if (!s.ok()) {
return std::vector<Status>(num_keys, s);
@ -136,7 +156,7 @@ std::vector<Status> CompactedDBImpl::MultiGet(
GetWithTimestampReadCallback read_cb(kMaxSequenceNumber);
autovector<TableReader*, 16> reader_list;
for (const auto& key : keys) {
LookupKey lkey(key, kMaxSequenceNumber, options.timestamp);
LookupKey lkey(key, kMaxSequenceNumber, read_options.timestamp);
const FdWithKeyRange& f = files_.files[FindFile(lkey.user_key())];
if (user_comparator_->CompareWithoutTimestamp(
key, /*a_has_ts=*/false,
@ -159,14 +179,15 @@ std::vector<Status> CompactedDBImpl::MultiGet(
if (r != nullptr) {
PinnableSlice pinnable_val;
std::string& value = (*values)[idx];
LookupKey lkey(keys[idx], kMaxSequenceNumber, options.timestamp);
LookupKey lkey(keys[idx], kMaxSequenceNumber, read_options.timestamp);
std::string* timestamp = timestamps ? &(*timestamps)[idx] : nullptr;
GetContext get_context(
user_comparator_, nullptr, nullptr, nullptr, GetContext::kNotFound,
lkey.user_key(), &pinnable_val, /*columns=*/nullptr,
user_comparator_->timestamp_size() > 0 ? timestamp : nullptr, nullptr,
nullptr, true, nullptr, nullptr, nullptr, nullptr, &read_cb);
Status s = r->Get(options, lkey.internal_key(), &get_context, nullptr);
Status s =
r->Get(read_options, lkey.internal_key(), &get_context, nullptr);
assert(static_cast<size_t>(idx) < statuses.size());
if (!s.ok() && !s.IsNotFound()) {
statuses[idx] = s;

View File

@ -30,9 +30,9 @@ class CompactedDBImpl : public DBImpl {
ColumnFamilyHandle* column_family, const Slice& key,
PinnableSlice* value) override;
Status Get(const ReadOptions& options, ColumnFamilyHandle* column_family,
const Slice& key, PinnableSlice* value,
std::string* timestamp) override;
Status Get(const ReadOptions& _read_options,
ColumnFamilyHandle* column_family, const Slice& key,
PinnableSlice* value, std::string* timestamp) override;
using DB::MultiGet;
// Note that CompactedDBImpl::MultiGet is not the optimized version of
@ -43,7 +43,7 @@ class CompactedDBImpl : public DBImpl {
const std::vector<Slice>& keys,
std::vector<std::string>* values) override;
std::vector<Status> MultiGet(const ReadOptions& options,
std::vector<Status> MultiGet(const ReadOptions& _read_options,
const std::vector<ColumnFamilyHandle*>&,
const std::vector<Slice>& keys,
std::vector<std::string>* values,

View File

@ -28,6 +28,7 @@
#include "db/arena_wrapped_db_iter.h"
#include "db/builder.h"
#include "db/compaction/compaction_job.h"
#include "db/convenience_impl.h"
#include "db/db_info_dumper.h"
#include "db/db_iter.h"
#include "db/dbformat.h"
@ -74,7 +75,6 @@
#include "port/port.h"
#include "rocksdb/cache.h"
#include "rocksdb/compaction_filter.h"
#include "rocksdb/convenience.h"
#include "rocksdb/db.h"
#include "rocksdb/env.h"
#include "rocksdb/merge_operator.h"
@ -1922,15 +1922,43 @@ Status DBImpl::Get(const ReadOptions& read_options,
return Get(read_options, column_family, key, value, /*timestamp=*/nullptr);
}
Status DBImpl::Get(const ReadOptions& read_options,
Status DBImpl::GetImpl(const ReadOptions& read_options,
ColumnFamilyHandle* column_family, const Slice& key,
PinnableSlice* value) {
return GetImpl(read_options, column_family, key, value,
/*timestamp=*/nullptr);
}
Status DBImpl::Get(const ReadOptions& _read_options,
ColumnFamilyHandle* column_family, const Slice& key,
PinnableSlice* value, std::string* timestamp) {
assert(value != nullptr);
value->Reset();
if (_read_options.io_activity != Env::IOActivity::kUnknown &&
_read_options.io_activity != Env::IOActivity::kGet) {
return Status::InvalidArgument(
"Can only call Get with `ReadOptions::io_activity` is "
"`Env::IOActivity::kUnknown` or `Env::IOActivity::kGet`");
}
ReadOptions read_options(_read_options);
if (read_options.io_activity == Env::IOActivity::kUnknown) {
read_options.io_activity = Env::IOActivity::kGet;
}
Status s = GetImpl(read_options, column_family, key, value, timestamp);
return s;
}
Status DBImpl::GetImpl(const ReadOptions& read_options,
ColumnFamilyHandle* column_family, const Slice& key,
PinnableSlice* value, std::string* timestamp) {
GetImplOptions get_impl_options;
get_impl_options.column_family = column_family;
get_impl_options.value = value;
get_impl_options.timestamp = timestamp;
Status s = GetImpl(read_options, key, get_impl_options);
return s;
}
@ -1999,11 +2027,6 @@ Status DBImpl::GetImpl(const ReadOptions& read_options, const Slice& key,
assert(get_impl_options.column_family);
if (read_options.io_activity != Env::IOActivity::kUnknown) {
return Status::InvalidArgument(
"Cannot call Get with `ReadOptions::io_activity` != "
"`Env::IOActivity::kUnknown`");
}
if (read_options.timestamp) {
const Status s = FailIfTsMismatchCf(get_impl_options.column_family,
@ -2297,7 +2320,7 @@ std::vector<Status> DBImpl::MultiGet(
}
std::vector<Status> DBImpl::MultiGet(
const ReadOptions& read_options,
const ReadOptions& _read_options,
const std::vector<ColumnFamilyHandle*>& column_family,
const std::vector<Slice>& keys, std::vector<std::string>* values,
std::vector<std::string>* timestamps) {
@ -2309,12 +2332,30 @@ std::vector<Status> DBImpl::MultiGet(
assert(column_family.size() == num_keys);
std::vector<Status> stat_list(num_keys);
if (_read_options.io_activity != Env::IOActivity::kUnknown &&
_read_options.io_activity != Env::IOActivity::kMultiGet) {
Status s = Status::InvalidArgument(
"Can only call MultiGet with `ReadOptions::io_activity` is "
"`Env::IOActivity::kUnknown` or `Env::IOActivity::kMultiGet`");
for (size_t i = 0; i < num_keys; ++i) {
stat_list[i] = s;
}
return stat_list;
}
ReadOptions read_options(_read_options);
if (read_options.io_activity == Env::IOActivity::kUnknown) {
read_options.io_activity = Env::IOActivity::kMultiGet;
}
bool should_fail = false;
for (size_t i = 0; i < num_keys; ++i) {
assert(column_family[i]);
if (read_options.timestamp) {
stat_list[i] = FailIfTsMismatchCf(
column_family[i], *(read_options.timestamp), /*ts_for_read=*/true);
stat_list[i] =
FailIfTsMismatchCf(column_family[i], *(read_options.timestamp),
/*ts_for_read=*/true);
if (!stat_list[i].ok()) {
should_fail = true;
}
@ -2627,10 +2668,26 @@ void DBImpl::MultiGet(const ReadOptions& read_options, const size_t num_keys,
/* timestamps */ nullptr, statuses, sorted_input);
}
void DBImpl::MultiGet(const ReadOptions& read_options, const size_t num_keys,
void DBImpl::MultiGet(const ReadOptions& _read_options, const size_t num_keys,
ColumnFamilyHandle** column_families, const Slice* keys,
PinnableSlice* values, std::string* timestamps,
Status* statuses, const bool sorted_input) {
if (_read_options.io_activity != Env::IOActivity::kUnknown &&
_read_options.io_activity != Env::IOActivity::kMultiGet) {
Status s = Status::InvalidArgument(
"Can only call MultiGet with `ReadOptions::io_activity` is "
"`Env::IOActivity::kUnknown` or `Env::IOActivity::kMultiGet`");
for (size_t i = 0; i < num_keys; ++i) {
if (statuses[i].ok()) {
statuses[i] = s;
}
}
return;
}
ReadOptions read_options(_read_options);
if (read_options.io_activity == Env::IOActivity::kUnknown) {
read_options.io_activity = Env::IOActivity::kMultiGet;
}
MultiGetCommon(read_options, num_keys, column_families, keys, values,
/* columns */ nullptr, timestamps, statuses, sorted_input);
}
@ -2645,7 +2702,6 @@ void DBImpl::MultiGetCommon(const ReadOptions& read_options,
if (num_keys == 0) {
return;
}
bool should_fail = false;
for (size_t i = 0; i < num_keys; ++i) {
ColumnFamilyHandle* cfh = column_families[i];
@ -2828,11 +2884,28 @@ void DBImpl::MultiGet(const ReadOptions& read_options,
/* timestamps */ nullptr, statuses, sorted_input);
}
void DBImpl::MultiGet(const ReadOptions& read_options,
void DBImpl::MultiGet(const ReadOptions& _read_options,
ColumnFamilyHandle* column_family, const size_t num_keys,
const Slice* keys, PinnableSlice* values,
std::string* timestamps, Status* statuses,
const bool sorted_input) {
if (_read_options.io_activity != Env::IOActivity::kUnknown &&
_read_options.io_activity != Env::IOActivity::kMultiGet) {
Status s = Status::InvalidArgument(
"Can only call MultiGet with `ReadOptions::io_activity` is "
"`Env::IOActivity::kUnknown` or `Env::IOActivity::kMultiGet`");
for (size_t i = 0; i < num_keys; ++i) {
if (statuses[i].ok()) {
statuses[i] = s;
}
}
return;
}
ReadOptions read_options(_read_options);
if (read_options.io_activity == Env::IOActivity::kUnknown) {
read_options.io_activity = Env::IOActivity::kMultiGet;
}
MultiGetCommon(read_options, column_family, num_keys, keys, values,
/* columns */ nullptr, timestamps, statuses, sorted_input);
}
@ -2881,9 +2954,20 @@ void DBImpl::MultiGetCommon(const ReadOptions& read_options,
}
void DBImpl::MultiGetWithCallback(
const ReadOptions& read_options, ColumnFamilyHandle* column_family,
const ReadOptions& _read_options, ColumnFamilyHandle* column_family,
ReadCallback* callback,
autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE>* sorted_keys) {
if (_read_options.io_activity != Env::IOActivity::kUnknown &&
_read_options.io_activity != Env::IOActivity::kMultiGet) {
assert(false);
return;
}
ReadOptions read_options(_read_options);
if (read_options.io_activity == Env::IOActivity::kUnknown) {
read_options.io_activity = Env::IOActivity::kMultiGet;
}
std::array<MultiGetColumnFamilyData, 1> multiget_cf_data;
multiget_cf_data[0] = MultiGetColumnFamilyData(column_family, nullptr);
std::function<MultiGetColumnFamilyData*(
@ -2954,11 +3038,6 @@ Status DBImpl::MultiGetImpl(
autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE>* sorted_keys,
SuperVersion* super_version, SequenceNumber snapshot,
ReadCallback* callback) {
if (read_options.io_activity != Env::IOActivity::kUnknown) {
return Status::InvalidArgument(
"Cannot call MultiGet with `ReadOptions::io_activity` != "
"`Env::IOActivity::kUnknown`");
}
PERF_CPU_TIMER_GUARD(get_cpu_nanos, immutable_db_options_.clock);
StopWatch sw(immutable_db_options_.clock, stats_, DB_MULTIGET);
@ -3383,8 +3462,19 @@ bool DBImpl::KeyMayExist(const ReadOptions& read_options,
return s.ok() || s.IsIncomplete();
}
Iterator* DBImpl::NewIterator(const ReadOptions& read_options,
Iterator* DBImpl::NewIterator(const ReadOptions& _read_options,
ColumnFamilyHandle* column_family) {
if (_read_options.io_activity != Env::IOActivity::kUnknown &&
_read_options.io_activity != Env::IOActivity::kDBIterator) {
return NewErrorIterator(Status::InvalidArgument(
"Can only call NewIterator with `ReadOptions::io_activity` is "
"`Env::IOActivity::kUnknown` or `Env::IOActivity::kDBIterator`"));
}
ReadOptions read_options(_read_options);
if (read_options.io_activity == Env::IOActivity::kUnknown) {
read_options.io_activity = Env::IOActivity::kDBIterator;
}
if (read_options.managed) {
return NewErrorIterator(
Status::NotSupported("Managed iterator is not supported anymore."));
@ -3394,17 +3484,12 @@ Iterator* DBImpl::NewIterator(const ReadOptions& read_options,
return NewErrorIterator(Status::NotSupported(
"ReadTier::kPersistedData is not yet supported in iterators."));
}
if (read_options.io_activity != Env::IOActivity::kUnknown) {
return NewErrorIterator(Status::InvalidArgument(
"Cannot call NewIterator with `ReadOptions::io_activity` != "
"`Env::IOActivity::kUnknown`"));
}
assert(column_family);
if (read_options.timestamp) {
const Status s = FailIfTsMismatchCf(
column_family, *(read_options.timestamp), /*ts_for_read=*/true);
const Status s =
FailIfTsMismatchCf(column_family, *(read_options.timestamp),
/*ts_for_read=*/true);
if (!s.ok()) {
return NewErrorIterator(s);
}
@ -3524,9 +3609,19 @@ ArenaWrappedDBIter* DBImpl::NewIteratorImpl(const ReadOptions& read_options,
}
Status DBImpl::NewIterators(
const ReadOptions& read_options,
const ReadOptions& _read_options,
const std::vector<ColumnFamilyHandle*>& column_families,
std::vector<Iterator*>* iterators) {
if (_read_options.io_activity != Env::IOActivity::kUnknown &&
_read_options.io_activity != Env::IOActivity::kDBIterator) {
return Status::InvalidArgument(
"Can only call NewIterators with `ReadOptions::io_activity` is "
"`Env::IOActivity::kUnknown` or `Env::IOActivity::kDBIterator`");
}
ReadOptions read_options(_read_options);
if (read_options.io_activity == Env::IOActivity::kUnknown) {
read_options.io_activity = Env::IOActivity::kDBIterator;
}
if (read_options.managed) {
return Status::NotSupported("Managed iterator is not supported anymore.");
}
@ -3534,11 +3629,6 @@ Status DBImpl::NewIterators(
return Status::NotSupported(
"ReadTier::kPersistedData is not yet supported in iterators.");
}
if (read_options.io_activity != Env::IOActivity::kUnknown) {
return Status::InvalidArgument(
"Cannot call NewIterators with `ReadOptions::io_activity` != "
"`Env::IOActivity::kUnknown`");
}
if (read_options.timestamp) {
for (auto* cf : column_families) {
@ -5793,12 +5883,35 @@ Status DBImpl::ClipColumnFamily(ColumnFamilyHandle* column_family,
return status;
}
Status DBImpl::VerifyFileChecksums(const ReadOptions& read_options) {
return VerifyChecksumInternal(read_options, /*use_file_checksum=*/true);
Status DBImpl::VerifyFileChecksums(const ReadOptions& _read_options) {
if (_read_options.io_activity != Env::IOActivity::kUnknown &&
_read_options.io_activity != Env::IOActivity::kVerifyFileChecksums) {
return Status::InvalidArgument(
"Can only call VerifyFileChecksums with `ReadOptions::io_activity` is "
"`Env::IOActivity::kUnknown` or "
"`Env::IOActivity::kVerifyFileChecksums`");
}
ReadOptions read_options(_read_options);
if (read_options.io_activity == Env::IOActivity::kUnknown) {
read_options.io_activity = Env::IOActivity::kVerifyFileChecksums;
}
return VerifyChecksumInternal(read_options,
/*use_file_checksum=*/true);
}
Status DBImpl::VerifyChecksum(const ReadOptions& read_options) {
return VerifyChecksumInternal(read_options, /*use_file_checksum=*/false);
Status DBImpl::VerifyChecksum(const ReadOptions& _read_options) {
if (_read_options.io_activity != Env::IOActivity::kUnknown &&
_read_options.io_activity != Env::IOActivity::kVerifyDBChecksum) {
return Status::InvalidArgument(
"Can only call VerifyChecksum with `ReadOptions::io_activity` is "
"`Env::IOActivity::kUnknown` or `Env::IOActivity::kVerifyDBChecksum`");
}
ReadOptions read_options(_read_options);
if (read_options.io_activity == Env::IOActivity::kUnknown) {
read_options.io_activity = Env::IOActivity::kVerifyDBChecksum;
}
return VerifyChecksumInternal(read_options,
/*use_file_checksum=*/false);
}
Status DBImpl::VerifyChecksumInternal(const ReadOptions& read_options,
@ -5810,12 +5923,6 @@ Status DBImpl::VerifyChecksumInternal(const ReadOptions& read_options,
Status s;
if (read_options.io_activity != Env::IOActivity::kUnknown) {
s = Status::InvalidArgument(
"Cannot verify file checksum with `ReadOptions::io_activity` != "
"`Env::IOActivity::kUnknown`");
return s;
}
if (use_file_checksum) {
FileChecksumGenFactory* const file_checksum_gen_factory =
immutable_db_options_.file_checksum_gen_factory.get();
@ -5867,7 +5974,7 @@ Status DBImpl::VerifyChecksumInternal(const ReadOptions& read_options,
fmeta->file_checksum_func_name, fname,
read_options);
} else {
s = ROCKSDB_NAMESPACE::VerifySstFileChecksum(
s = ROCKSDB_NAMESPACE::VerifySstFileChecksumInternal(
opts, file_options_, read_options, fname, fd.largest_seqno);
}
RecordTick(stats_, VERIFY_CHECKSUM_READ_BYTES,
@ -5930,12 +6037,6 @@ Status DBImpl::VerifyFullFileChecksum(const std::string& file_checksum_expected,
const std::string& func_name_expected,
const std::string& fname,
const ReadOptions& read_options) {
if (read_options.io_activity != Env::IOActivity::kUnknown) {
return Status::InvalidArgument(
"Cannot call VerifyChecksum with `ReadOptions::io_activity` != "
"`Env::IOActivity::kUnknown`");
}
Status s;
if (file_checksum_expected == kUnknownFileChecksum) {
return s;
@ -5946,8 +6047,7 @@ Status DBImpl::VerifyFullFileChecksum(const std::string& file_checksum_expected,
fs_.get(), fname, immutable_db_options_.file_checksum_gen_factory.get(),
func_name_expected, &file_checksum, &func_name,
read_options.readahead_size, immutable_db_options_.allow_mmap_reads,
io_tracer_, immutable_db_options_.rate_limiter.get(),
read_options.rate_limiter_priority);
io_tracer_, immutable_db_options_.rate_limiter.get(), read_options);
if (s.ok()) {
assert(func_name_expected == func_name);
if (file_checksum != file_checksum_expected) {

View File

@ -234,7 +234,7 @@ class DBImpl : public DB {
virtual Status Get(const ReadOptions& options,
ColumnFamilyHandle* column_family, const Slice& key,
PinnableSlice* value) override;
virtual Status Get(const ReadOptions& options,
virtual Status Get(const ReadOptions& _read_options,
ColumnFamilyHandle* column_family, const Slice& key,
PinnableSlice* value, std::string* timestamp) override;
@ -265,7 +265,7 @@ class DBImpl : public DB {
const std::vector<Slice>& keys,
std::vector<std::string>* values) override;
virtual std::vector<Status> MultiGet(
const ReadOptions& options,
const ReadOptions& _read_options,
const std::vector<ColumnFamilyHandle*>& column_family,
const std::vector<Slice>& keys, std::vector<std::string>* values,
std::vector<std::string>* timestamps) override;
@ -280,8 +280,9 @@ class DBImpl : public DB {
void MultiGet(const ReadOptions& options, ColumnFamilyHandle* column_family,
const size_t num_keys, const Slice* keys, PinnableSlice* values,
Status* statuses, const bool sorted_input = false) override;
void MultiGet(const ReadOptions& options, ColumnFamilyHandle* column_family,
const size_t num_keys, const Slice* keys, PinnableSlice* values,
void MultiGet(const ReadOptions& _read_options,
ColumnFamilyHandle* column_family, const size_t num_keys,
const Slice* keys, PinnableSlice* values,
std::string* timestamps, Status* statuses,
const bool sorted_input = false) override;
@ -289,13 +290,13 @@ class DBImpl : public DB {
ColumnFamilyHandle** column_families, const Slice* keys,
PinnableSlice* values, Status* statuses,
const bool sorted_input = false) override;
void MultiGet(const ReadOptions& options, const size_t num_keys,
void MultiGet(const ReadOptions& _read_options, const size_t num_keys,
ColumnFamilyHandle** column_families, const Slice* keys,
PinnableSlice* values, std::string* timestamps,
Status* statuses, const bool sorted_input = false) override;
void MultiGetWithCallback(
const ReadOptions& options, ColumnFamilyHandle* column_family,
const ReadOptions& _read_options, ColumnFamilyHandle* column_family,
ReadCallback* callback,
autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE>* sorted_keys);
@ -336,10 +337,10 @@ class DBImpl : public DB {
bool* value_found = nullptr) override;
using DB::NewIterator;
virtual Iterator* NewIterator(const ReadOptions& options,
virtual Iterator* NewIterator(const ReadOptions& _read_options,
ColumnFamilyHandle* column_family) override;
virtual Status NewIterators(
const ReadOptions& options,
const ReadOptions& _read_options,
const std::vector<ColumnFamilyHandle*>& column_families,
std::vector<Iterator*>* iterators) override;
@ -627,6 +628,14 @@ class DBImpl : public DB {
int* number_of_operands = nullptr;
};
Status GetImpl(const ReadOptions& read_options,
ColumnFamilyHandle* column_family, const Slice& key,
PinnableSlice* value);
Status GetImpl(const ReadOptions& read_options,
ColumnFamilyHandle* column_family, const Slice& key,
PinnableSlice* value, std::string* timestamp);
// Function that Get and KeyMayExist call with no_io true or false
// Note: 'value_found' from KeyMayExist propagates here
// This function is also called by GetMergeOperands

View File

@ -36,14 +36,19 @@ Status DBImplReadOnly::Get(const ReadOptions& read_options,
/*timestamp*/ nullptr);
}
Status DBImplReadOnly::Get(const ReadOptions& read_options,
Status DBImplReadOnly::Get(const ReadOptions& _read_options,
ColumnFamilyHandle* column_family, const Slice& key,
PinnableSlice* pinnable_val,
std::string* timestamp) {
if (read_options.io_activity != Env::IOActivity::kUnknown) {
if (_read_options.io_activity != Env::IOActivity::kUnknown &&
_read_options.io_activity != Env::IOActivity::kGet) {
return Status::InvalidArgument(
"Cannot call Get with `ReadOptions::io_activity` != "
"`Env::IOActivity::kUnknown`");
"Can only call Get with `ReadOptions::io_activity` is "
"`Env::IOActivity::kUnknown` or `Env::IOActivity::kGet`");
}
ReadOptions read_options(_read_options);
if (read_options.io_activity == Env::IOActivity::kUnknown) {
read_options.io_activity = Env::IOActivity::kGet;
}
assert(pinnable_val != nullptr);
PERF_CPU_TIMER_GUARD(get_cpu_nanos, immutable_db_options_.clock);
@ -52,8 +57,9 @@ Status DBImplReadOnly::Get(const ReadOptions& read_options,
assert(column_family);
if (read_options.timestamp) {
const Status s = FailIfTsMismatchCf(
column_family, *(read_options.timestamp), /*ts_for_read=*/true);
const Status s =
FailIfTsMismatchCf(column_family, *(read_options.timestamp),
/*ts_for_read=*/true);
if (!s.ok()) {
return s;
}
@ -116,17 +122,23 @@ Status DBImplReadOnly::Get(const ReadOptions& read_options,
return s;
}
Iterator* DBImplReadOnly::NewIterator(const ReadOptions& read_options,
Iterator* DBImplReadOnly::NewIterator(const ReadOptions& _read_options,
ColumnFamilyHandle* column_family) {
if (read_options.io_activity != Env::IOActivity::kUnknown) {
if (_read_options.io_activity != Env::IOActivity::kUnknown &&
_read_options.io_activity != Env::IOActivity::kDBIterator) {
return NewErrorIterator(Status::InvalidArgument(
"Cannot call NewIterator with `ReadOptions::io_activity` != "
"`Env::IOActivity::kUnknown`"));
"Can only call NewIterator with `ReadOptions::io_activity` is "
"`Env::IOActivity::kUnknown` or `Env::IOActivity::kDBIterator`"));
}
ReadOptions read_options(_read_options);
if (read_options.io_activity == Env::IOActivity::kUnknown) {
read_options.io_activity = Env::IOActivity::kDBIterator;
}
assert(column_family);
if (read_options.timestamp) {
const Status s = FailIfTsMismatchCf(
column_family, *(read_options.timestamp), /*ts_for_read=*/true);
const Status s =
FailIfTsMismatchCf(column_family, *(read_options.timestamp),
/*ts_for_read=*/true);
if (!s.ok()) {
return NewErrorIterator(s);
}

View File

@ -28,14 +28,14 @@ class DBImplReadOnly : public DBImpl {
virtual Status Get(const ReadOptions& options,
ColumnFamilyHandle* column_family, const Slice& key,
PinnableSlice* value) override;
Status Get(const ReadOptions& options, ColumnFamilyHandle* column_family,
const Slice& key, PinnableSlice* value,
std::string* timestamp) override;
Status Get(const ReadOptions& _read_options,
ColumnFamilyHandle* column_family, const Slice& key,
PinnableSlice* value, std::string* timestamp) override;
// TODO: Implement ReadOnly MultiGet?
using DBImpl::NewIterator;
virtual Iterator* NewIterator(const ReadOptions&,
virtual Iterator* NewIterator(const ReadOptions& _read_options,
ColumnFamilyHandle* column_family) override;
virtual Status NewIterators(
@ -179,4 +179,3 @@ class DBImplReadOnly : public DBImpl {
friend class DB;
};
} // namespace ROCKSDB_NAMESPACE

View File

@ -340,16 +340,36 @@ Status DBImplSecondary::RecoverLogFiles(
}
// Implementation of the DB interface
Status DBImplSecondary::Get(const ReadOptions& read_options,
Status DBImplSecondary::Get(const ReadOptions& _read_options,
ColumnFamilyHandle* column_family, const Slice& key,
PinnableSlice* value) {
if (_read_options.io_activity != Env::IOActivity::kUnknown &&
_read_options.io_activity != Env::IOActivity::kGet) {
return Status::InvalidArgument(
"Can only call Get with `ReadOptions::io_activity` is "
"`Env::IOActivity::kUnknown` or `Env::IOActivity::kGet`");
}
ReadOptions read_options(_read_options);
if (read_options.io_activity == Env::IOActivity::kUnknown) {
read_options.io_activity = Env::IOActivity::kGet;
}
return GetImpl(read_options, column_family, key, value,
/*timestamp*/ nullptr);
}
Status DBImplSecondary::Get(const ReadOptions& read_options,
Status DBImplSecondary::Get(const ReadOptions& _read_options,
ColumnFamilyHandle* column_family, const Slice& key,
PinnableSlice* value, std::string* timestamp) {
if (_read_options.io_activity != Env::IOActivity::kUnknown &&
_read_options.io_activity != Env::IOActivity::kGet) {
return Status::InvalidArgument(
"Can only call Get with `ReadOptions::io_activity` is "
"`Env::IOActivity::kUnknown` or `Env::IOActivity::kGet`");
}
ReadOptions read_options(_read_options);
if (read_options.io_activity == Env::IOActivity::kUnknown) {
read_options.io_activity = Env::IOActivity::kGet;
}
return GetImpl(read_options, column_family, key, value, timestamp);
}
@ -357,11 +377,6 @@ Status DBImplSecondary::GetImpl(const ReadOptions& read_options,
ColumnFamilyHandle* column_family,
const Slice& key, PinnableSlice* pinnable_val,
std::string* timestamp) {
if (read_options.io_activity != Env::IOActivity::kUnknown) {
return Status::InvalidArgument(
"Cannot call Get with `ReadOptions::io_activity` != "
"`Env::IOActivity::kUnknown`");
}
assert(pinnable_val != nullptr);
PERF_CPU_TIMER_GUARD(get_cpu_nanos, immutable_db_options_.clock);
StopWatch sw(immutable_db_options_.clock, stats_, DB_GET);
@ -452,8 +467,18 @@ Status DBImplSecondary::GetImpl(const ReadOptions& read_options,
return s;
}
Iterator* DBImplSecondary::NewIterator(const ReadOptions& read_options,
Iterator* DBImplSecondary::NewIterator(const ReadOptions& _read_options,
ColumnFamilyHandle* column_family) {
if (_read_options.io_activity != Env::IOActivity::kUnknown &&
_read_options.io_activity != Env::IOActivity::kDBIterator) {
return NewErrorIterator(Status::InvalidArgument(
"Can only call NewIterator with `ReadOptions::io_activity` is "
"`Env::IOActivity::kUnknown` or `Env::IOActivity::kDBIterator`"));
}
ReadOptions read_options(_read_options);
if (read_options.io_activity == Env::IOActivity::kUnknown) {
read_options.io_activity = Env::IOActivity::kDBIterator;
}
if (read_options.managed) {
return NewErrorIterator(
Status::NotSupported("Managed iterator is not supported anymore."));
@ -462,16 +487,12 @@ Iterator* DBImplSecondary::NewIterator(const ReadOptions& read_options,
return NewErrorIterator(Status::NotSupported(
"ReadTier::kPersistedData is not yet supported in iterators."));
}
if (read_options.io_activity != Env::IOActivity::kUnknown) {
return NewErrorIterator(Status::InvalidArgument(
"Cannot call NewIterator with `ReadOptions::io_activity` != "
"`Env::IOActivity::kUnknown`"));
}
assert(column_family);
if (read_options.timestamp) {
const Status s = FailIfTsMismatchCf(
column_family, *(read_options.timestamp), /*ts_for_read=*/true);
const Status s =
FailIfTsMismatchCf(column_family, *(read_options.timestamp),
/*ts_for_read=*/true);
if (!s.ok()) {
return NewErrorIterator(s);
}
@ -523,9 +544,19 @@ ArenaWrappedDBIter* DBImplSecondary::NewIteratorImpl(
}
Status DBImplSecondary::NewIterators(
const ReadOptions& read_options,
const ReadOptions& _read_options,
const std::vector<ColumnFamilyHandle*>& column_families,
std::vector<Iterator*>* iterators) {
if (_read_options.io_activity != Env::IOActivity::kUnknown &&
_read_options.io_activity != Env::IOActivity::kDBIterator) {
return Status::InvalidArgument(
"Can only call NewIterators with `ReadOptions::io_activity` is "
"`Env::IOActivity::kUnknown` or `Env::IOActivity::kDBIterator`");
}
ReadOptions read_options(_read_options);
if (read_options.io_activity == Env::IOActivity::kUnknown) {
read_options.io_activity = Env::IOActivity::kDBIterator;
}
if (read_options.managed) {
return Status::NotSupported("Managed iterator is not supported anymore.");
}
@ -533,11 +564,6 @@ Status DBImplSecondary::NewIterators(
return Status::NotSupported(
"ReadTier::kPersistedData is not yet supported in iterators.");
}
if (read_options.io_activity != Env::IOActivity::kUnknown) {
return Status::InvalidArgument(
"Cannot call NewIterators with `ReadOptions::io_activity` != "
"`Env::IOActivity::kUnknown`");
}
ReadCallback* read_callback = nullptr; // No read callback provided.
if (iterators == nullptr) {
return Status::InvalidArgument("iterators not allowed to be nullptr");

View File

@ -96,12 +96,13 @@ class DBImplSecondary : public DBImpl {
// workaround, the secondaries can be opened with `max_open_files=-1` so that
// it eagerly keeps all talbe files open and is able to access the contents of
// deleted files via prior open fd.
Status Get(const ReadOptions& options, ColumnFamilyHandle* column_family,
const Slice& key, PinnableSlice* value) override;
Status Get(const ReadOptions& _read_options,
ColumnFamilyHandle* column_family, const Slice& key,
PinnableSlice* value) override;
Status Get(const ReadOptions& options, ColumnFamilyHandle* column_family,
const Slice& key, PinnableSlice* value,
std::string* timestamp) override;
Status Get(const ReadOptions& _read_options,
ColumnFamilyHandle* column_family, const Slice& key,
PinnableSlice* value, std::string* timestamp) override;
Status GetImpl(const ReadOptions& options, ColumnFamilyHandle* column_family,
const Slice& key, PinnableSlice* value,
@ -117,7 +118,7 @@ class DBImplSecondary : public DBImpl {
// deleted. As a partial hacky workaround, the secondaries can be opened with
// `max_open_files=-1` so that it eagerly keeps all talbe files open and is
// able to access the contents of deleted files via prior open fd.
Iterator* NewIterator(const ReadOptions&,
Iterator* NewIterator(const ReadOptions& _read_options,
ColumnFamilyHandle* column_family) override;
ArenaWrappedDBIter* NewIteratorImpl(const ReadOptions& read_options,
@ -127,7 +128,7 @@ class DBImplSecondary : public DBImpl {
bool expose_blob_index = false,
bool allow_refresh = true);
Status NewIterators(const ReadOptions& options,
Status NewIterators(const ReadOptions& _read_options,
const std::vector<ColumnFamilyHandle*>& column_families,
std::vector<Iterator*>* iterators) override;
@ -324,4 +325,3 @@ class DBImplSecondary : public DBImpl {
};
} // namespace ROCKSDB_NAMESPACE

View File

@ -77,6 +77,7 @@ DBIter::DBIter(Env* _env, const ReadOptions& read_options,
expose_blob_index_(expose_blob_index),
is_blob_(false),
arena_mode_(arena_mode),
io_activity_(read_options.io_activity),
db_impl_(db_impl),
cfd_(cfd),
timestamp_ub_(read_options.timestamp),
@ -196,12 +197,11 @@ bool DBIter::SetBlobValueIfNeeded(const Slice& user_key,
// TODO: consider moving ReadOptions from ArenaWrappedDBIter to DBIter to
// avoid having to copy options back and forth.
// TODO: plumb Env::IOActivity
ReadOptions read_options;
read_options.read_tier = read_tier_;
read_options.fill_cache = fill_cache_;
read_options.verify_checksums = verify_checksums_;
read_options.io_activity = io_activity_;
constexpr FilePrefetchBuffer* prefetch_buffer = nullptr;
constexpr uint64_t* bytes_read = nullptr;

View File

@ -384,6 +384,7 @@ class DBIter final : public Iterator {
bool expose_blob_index_;
bool is_blob_;
bool arena_mode_;
const Env::IOActivity io_activity_;
// List of operands for merge operator.
MergeContext merge_context_;
LocalStatistics local_stats_;

View File

@ -235,9 +235,18 @@ TEST_P(DBRateLimiterOnReadTest, VerifyChecksum) {
ASSERT_EQ(0, options_.rate_limiter->GetTotalRequests(Env::IO_USER));
ASSERT_OK(db_->VerifyChecksum(GetReadOptions()));
// There are 3 reads per file: ReadMetaIndexBlock,
// VerifyChecksumInMetaBlocks, VerifyChecksumInBlocks
int expected = kNumFiles * 3;
// In BufferedIO,
// there are 7 reads per file, each of which will be rate-limited.
// During open: read footer, meta index block, properties block, index block.
// During actual checksum verification: read meta index block, verify checksum
// in meta blocks and verify checksum in file blocks.
//
// In DirectIO, where we support tail prefetching, during table open, we only
// do 1 read instead of 4 as described above. Actual checksum verification
// reads stay the same.
int num_read_per_file = (!use_direct_io_) ? 7 : 4;
int expected = kNumFiles * num_read_per_file;
ASSERT_EQ(expected, options_.rate_limiter->GetTotalRequests(Env::IO_USER));
}

View File

@ -217,6 +217,8 @@ Status ExternalSstFileIngestionJob::Prepare(
std::string requested_checksum_func_name;
// TODO: rate limit file reads for checksum calculation during file
// ingestion.
// TODO: plumb Env::IOActivity
ReadOptions ro;
IOStatus io_s = GenerateOneFileChecksum(
fs_.get(), files_to_ingest_[i].internal_file_path,
db_options_.file_checksum_gen_factory.get(),
@ -224,8 +226,7 @@ Status ExternalSstFileIngestionJob::Prepare(
&generated_checksum_func_name,
ingestion_options_.verify_checksums_readahead_size,
db_options_.allow_mmap_reads, io_tracer_,
db_options_.rate_limiter.get(),
Env::IO_TOTAL /* rate_limiter_priority */);
db_options_.rate_limiter.get(), ro);
if (!io_s.ok()) {
status = io_s;
ROCKS_LOG_WARN(db_options_.info_log,
@ -1058,13 +1059,15 @@ IOStatus ExternalSstFileIngestionJob::GenerateChecksumForIngestedFile(
std::string file_checksum_func_name;
std::string requested_checksum_func_name;
// TODO: rate limit file reads for checksum calculation during file ingestion.
// TODO: plumb Env::IOActivity
ReadOptions ro;
IOStatus io_s = GenerateOneFileChecksum(
fs_.get(), file_to_ingest->internal_file_path,
db_options_.file_checksum_gen_factory.get(), requested_checksum_func_name,
&file_checksum, &file_checksum_func_name,
ingestion_options_.verify_checksums_readahead_size,
db_options_.allow_mmap_reads, io_tracer_, db_options_.rate_limiter.get(),
Env::IO_TOTAL /* rate_limiter_priority */);
ro);
if (!io_s.ok()) {
return io_s;
}

View File

@ -248,6 +248,7 @@ DECLARE_bool(avoid_flush_during_recovery);
DECLARE_uint64(max_write_batch_group_size_bytes);
DECLARE_bool(level_compaction_dynamic_level_bytes);
DECLARE_int32(verify_checksum_one_in);
DECLARE_int32(verify_file_checksums_one_in);
DECLARE_int32(verify_db_one_in);
DECLARE_int32(continuous_verification_interval);
DECLARE_int32(get_property_one_in);

View File

@ -14,6 +14,7 @@
namespace ROCKSDB_NAMESPACE {
void ThreadBody(void* v) {
ThreadStatusUtil::RegisterThread(db_stress_env, ThreadStatus::USER);
ThreadState* thread = reinterpret_cast<ThreadState*>(v);
SharedState* shared = thread->shared;
@ -54,6 +55,7 @@ void ThreadBody(void* v) {
shared->GetCondVar()->SignalAll();
}
}
ThreadStatusUtil::UnregisterThread();
}
bool RunStressTestImpl(SharedState* shared) {
SystemClock* clock = db_stress_env->GetSystemClock().get();

View File

@ -32,6 +32,48 @@ class DbStressRandomAccessFileWrapper : public FSRandomAccessFileOwnerWrapper {
#endif
return target()->Read(offset, n, options, result, scratch, dbg);
}
IOStatus MultiRead(FSReadRequest* reqs, size_t num_reqs,
const IOOptions& options, IODebugContext* dbg) override {
#ifndef NDEBUG
const ThreadStatus::OperationType thread_op =
ThreadStatusUtil::GetThreadOperation();
Env::IOActivity io_activity =
ThreadStatusUtil::TEST_GetExpectedIOActivity(thread_op);
assert(io_activity == Env::IOActivity::kUnknown ||
io_activity == options.io_activity);
#endif
return target()->MultiRead(reqs, num_reqs, options, dbg);
}
IOStatus Prefetch(uint64_t offset, size_t n, const IOOptions& options,
IODebugContext* dbg) override {
#ifndef NDEBUG
const ThreadStatus::OperationType thread_op =
ThreadStatusUtil::GetThreadOperation();
Env::IOActivity io_activity =
ThreadStatusUtil::TEST_GetExpectedIOActivity(thread_op);
assert(io_activity == Env::IOActivity::kUnknown ||
io_activity == options.io_activity);
#endif
return target()->Prefetch(offset, n, options, dbg);
}
IOStatus ReadAsync(FSReadRequest& req, const IOOptions& options,
std::function<void(const FSReadRequest&, void*)> cb,
void* cb_arg, void** io_handle, IOHandleDeleter* del_fn,
IODebugContext* dbg) override {
#ifndef NDEBUG
const ThreadStatus::OperationType thread_op =
ThreadStatusUtil::GetThreadOperation();
Env::IOActivity io_activity =
ThreadStatusUtil::TEST_GetExpectedIOActivity(thread_op);
assert(io_activity == Env::IOActivity::kUnknown ||
io_activity == options.io_activity);
#endif
return target()->ReadAsync(req, options, cb, cb_arg, io_handle, del_fn,
dbg);
}
};
class DbStressFSWrapper : public FileSystemWrapper {

View File

@ -929,6 +929,13 @@ DEFINE_int32(verify_checksum_one_in, 0,
" checksum verification of all the files in the database once for"
" every N ops on average. 0 indicates that calls to"
" VerifyChecksum() are disabled.");
DEFINE_int32(verify_file_checksums_one_in, 0,
"If non-zero, then DB::VerifyFileChecksums() will be called to do"
" checksum verification of all the files in the database once for"
" every N ops on average. 0 indicates that calls to"
" VerifyFileChecksums() are disabled.");
DEFINE_int32(verify_db_one_in, 0,
"If non-zero, call VerifyDb() once for every N ops. 0 indicates "
"that VerifyDb() will not be called in OperateDb(). Note that "

View File

@ -929,12 +929,28 @@ void StressTest::OperateDb(ThreadState* thread) {
}
if (thread->rand.OneInOpt(FLAGS_verify_checksum_one_in)) {
ThreadStatusUtil::SetEnableTracking(FLAGS_enable_thread_tracking);
ThreadStatusUtil::SetThreadOperation(
ThreadStatus::OperationType::OP_VERIFY_DB_CHECKSUM);
Status status = db_->VerifyChecksum();
ThreadStatusUtil::ResetThreadStatus();
if (!status.ok()) {
VerificationAbort(shared, "VerifyChecksum status not OK", status);
}
}
if (thread->rand.OneInOpt(FLAGS_verify_file_checksums_one_in)) {
ThreadStatusUtil::SetEnableTracking(FLAGS_enable_thread_tracking);
ThreadStatusUtil::SetThreadOperation(
ThreadStatus::OperationType::OP_VERIFY_FILE_CHECKSUMS);
Status status = db_->VerifyFileChecksums(read_opts);
ThreadStatusUtil::ResetThreadStatus();
if (!status.ok()) {
VerificationAbort(shared, "VerifyFileChecksums status not OK",
status);
}
}
if (thread->rand.OneInOpt(FLAGS_get_property_one_in)) {
TestGetProperty(thread);
}
@ -1034,10 +1050,18 @@ void StressTest::OperateDb(ThreadState* thread) {
// If its the last iteration, ensure that multiget_batch_size is 1
multiget_batch_size = std::max(multiget_batch_size, 1);
rand_keys = GenerateNKeys(thread, multiget_batch_size, i);
ThreadStatusUtil::SetEnableTracking(FLAGS_enable_thread_tracking);
ThreadStatusUtil::SetThreadOperation(
ThreadStatus::OperationType::OP_MULTIGET);
TestMultiGet(thread, read_opts, rand_column_families, rand_keys);
ThreadStatusUtil::ResetThreadStatus();
i += multiget_batch_size - 1;
} else {
ThreadStatusUtil::SetEnableTracking(FLAGS_enable_thread_tracking);
ThreadStatusUtil::SetThreadOperation(
ThreadStatus::OperationType::OP_GET);
TestGet(thread, read_opts, rand_column_families, rand_keys);
ThreadStatusUtil::ResetThreadStatus();
}
} else if (prob_op < prefix_bound) {
assert(static_cast<int>(FLAGS_readpercent) <= prob_op);
@ -1066,8 +1090,12 @@ void StressTest::OperateDb(ThreadState* thread) {
if (!FLAGS_skip_verifydb &&
thread->rand.OneInOpt(
FLAGS_verify_iterator_with_expected_state_one_in)) {
ThreadStatusUtil::SetEnableTracking(FLAGS_enable_thread_tracking);
ThreadStatusUtil::SetThreadOperation(
ThreadStatus::OperationType::OP_DBITERATOR);
TestIterateAgainstExpected(thread, read_opts, rand_column_families,
rand_keys);
ThreadStatusUtil::ResetThreadStatus();
} else {
int num_seeks = static_cast<int>(std::min(
std::max(static_cast<uint64_t>(thread->rand.Uniform(4)),
@ -1076,7 +1104,11 @@ void StressTest::OperateDb(ThreadState* thread) {
static_cast<uint64_t>(1))));
rand_keys = GenerateNKeys(thread, num_seeks, i);
i += num_seeks - 1;
ThreadStatusUtil::SetEnableTracking(FLAGS_enable_thread_tracking);
ThreadStatusUtil::SetThreadOperation(
ThreadStatus::OperationType::OP_DBITERATOR);
TestIterate(thread, read_opts, rand_column_families, rand_keys);
ThreadStatusUtil::ResetThreadStatus();
}
} else {
assert(iterate_bound <= prob_op);

View File

@ -585,6 +585,7 @@ class NonBatchedOpsStressTest : public StressTest {
bool do_consistency_check = thread->rand.OneIn(4);
ReadOptions readoptionscopy = read_opts;
if (do_consistency_check) {
readoptionscopy.snapshot = db_->GetSnapshot();
}
@ -778,9 +779,17 @@ class NonBatchedOpsStressTest : public StressTest {
if (use_txn) {
assert(txn);
ThreadStatusUtil::SetThreadOperation(
ThreadStatus::OperationType::OP_GET);
tmp_s = txn->Get(readoptionscopy, cfh, key, &value);
ThreadStatusUtil::SetThreadOperation(
ThreadStatus::OperationType::OP_MULTIGET);
} else {
ThreadStatusUtil::SetThreadOperation(
ThreadStatus::OperationType::OP_GET);
tmp_s = db_->Get(readoptionscopy, cfh, key, &value);
ThreadStatusUtil::SetThreadOperation(
ThreadStatus::OperationType::OP_MULTIGET);
}
if (!tmp_s.ok() && !tmp_s.IsNotFound()) {
fprintf(stderr, "Get error: %s\n", s.ToString().c_str());

View File

@ -81,13 +81,12 @@ void FilePrefetchBuffer::CalculateOffsetAndLen(size_t alignment,
Status FilePrefetchBuffer::Read(const IOOptions& opts,
RandomAccessFileReader* reader,
Env::IOPriority rate_limiter_priority,
uint64_t read_len, uint64_t chunk_len,
uint64_t rounddown_start, uint32_t index) {
Slice result;
Status s = reader->Read(opts, rounddown_start + chunk_len, read_len, &result,
bufs_[index].buffer_.BufferStart() + chunk_len,
/*aligned_buf=*/nullptr, rate_limiter_priority);
/*aligned_buf=*/nullptr);
#ifndef NDEBUG
if (result.size() < read_len) {
// Fake an IO error to force db_stress fault injection to ignore
@ -134,8 +133,7 @@ Status FilePrefetchBuffer::ReadAsync(const IOOptions& opts,
Status FilePrefetchBuffer::Prefetch(const IOOptions& opts,
RandomAccessFileReader* reader,
uint64_t offset, size_t n,
Env::IOPriority rate_limiter_priority) {
uint64_t offset, size_t n) {
if (!enable_ || reader == nullptr) {
return Status::OK();
}
@ -160,8 +158,7 @@ Status FilePrefetchBuffer::Prefetch(const IOOptions& opts,
true /*refit_tail*/, chunk_len);
size_t read_len = static_cast<size_t>(roundup_len - chunk_len);
Status s = Read(opts, reader, rate_limiter_priority, read_len, chunk_len,
rounddown_offset, curr_);
Status s = Read(opts, reader, read_len, chunk_len, rounddown_offset, curr_);
if (usage_ == FilePrefetchBufferUsage::kTableOpenPrefetchTail && s.ok()) {
RecordInHistogram(stats_, TABLE_OPEN_PREFETCH_TAIL_READ_BYTES, read_len);
}
@ -328,8 +325,7 @@ void FilePrefetchBuffer::PollAndUpdateBuffersIfNeeded(uint64_t offset) {
Status FilePrefetchBuffer::HandleOverlappingData(
const IOOptions& opts, RandomAccessFileReader* reader, uint64_t offset,
size_t length, size_t readahead_size,
Env::IOPriority /*rate_limiter_priority*/, bool& copy_to_third_buffer,
size_t length, size_t readahead_size, bool& copy_to_third_buffer,
uint64_t& tmp_offset, size_t& tmp_length) {
Status s;
size_t alignment = reader->file()->GetRequiredBufferAlignment();
@ -412,9 +408,10 @@ Status FilePrefetchBuffer::HandleOverlappingData(
// curr_, send async request on curr_, wait for poll to fill second
// buffer (if any), and copy remaining data from second buffer to third
// buffer.
Status FilePrefetchBuffer::PrefetchAsyncInternal(
const IOOptions& opts, RandomAccessFileReader* reader, uint64_t offset,
size_t length, size_t readahead_size, Env::IOPriority rate_limiter_priority,
Status FilePrefetchBuffer::PrefetchAsyncInternal(const IOOptions& opts,
RandomAccessFileReader* reader,
uint64_t offset, size_t length,
size_t readahead_size,
bool& copy_to_third_buffer) {
if (!enable_) {
return Status::OK();
@ -442,8 +439,7 @@ Status FilePrefetchBuffer::PrefetchAsyncInternal(
// - switch buffers and curr_ now points to second buffer to copy remaining
// data.
s = HandleOverlappingData(opts, reader, offset, length, readahead_size,
rate_limiter_priority, copy_to_third_buffer,
tmp_offset, tmp_length);
copy_to_third_buffer, tmp_offset, tmp_length);
if (!s.ok()) {
return s;
}
@ -581,8 +577,7 @@ Status FilePrefetchBuffer::PrefetchAsyncInternal(
}
if (read_len1 > 0) {
s = Read(opts, reader, rate_limiter_priority, read_len1, chunk_len1,
rounddown_start1, curr_);
s = Read(opts, reader, read_len1, chunk_len1, rounddown_start1, curr_);
if (!s.ok()) {
if (bufs_[second].io_handle_ != nullptr) {
std::vector<void*> handles;
@ -610,10 +605,9 @@ bool FilePrefetchBuffer::TryReadFromCache(const IOOptions& opts,
RandomAccessFileReader* reader,
uint64_t offset, size_t n,
Slice* result, Status* status,
Env::IOPriority rate_limiter_priority,
bool for_compaction /* = false */) {
bool ret = TryReadFromCacheUntracked(opts, reader, offset, n, result, status,
rate_limiter_priority, for_compaction);
for_compaction);
if (usage_ == FilePrefetchBufferUsage::kTableOpenPrefetchTail && enable_) {
if (ret) {
RecordTick(stats_, TABLE_OPEN_PREFETCH_TAIL_HIT);
@ -627,7 +621,7 @@ bool FilePrefetchBuffer::TryReadFromCache(const IOOptions& opts,
bool FilePrefetchBuffer::TryReadFromCacheUntracked(
const IOOptions& opts, RandomAccessFileReader* reader, uint64_t offset,
size_t n, Slice* result, Status* status,
Env::IOPriority rate_limiter_priority, bool for_compaction /* = false */) {
bool for_compaction /* = false */) {
if (track_min_offset_ && offset < min_offset_read_) {
min_offset_read_ = static_cast<size_t>(offset);
}
@ -647,8 +641,7 @@ bool FilePrefetchBuffer::TryReadFromCacheUntracked(
assert(reader != nullptr);
assert(max_readahead_size_ >= readahead_size_);
if (for_compaction) {
s = Prefetch(opts, reader, offset, std::max(n, readahead_size_),
rate_limiter_priority);
s = Prefetch(opts, reader, offset, std::max(n, readahead_size_));
} else {
if (implicit_auto_readahead_) {
if (!IsEligibleForPrefetch(offset, n)) {
@ -657,8 +650,7 @@ bool FilePrefetchBuffer::TryReadFromCacheUntracked(
return false;
}
}
s = Prefetch(opts, reader, offset, n + readahead_size_,
rate_limiter_priority);
s = Prefetch(opts, reader, offset, n + readahead_size_);
}
if (!s.ok()) {
if (status) {
@ -681,12 +673,12 @@ bool FilePrefetchBuffer::TryReadFromCacheUntracked(
return true;
}
bool FilePrefetchBuffer::TryReadFromCacheAsync(
const IOOptions& opts, RandomAccessFileReader* reader, uint64_t offset,
size_t n, Slice* result, Status* status,
Env::IOPriority rate_limiter_priority) {
bool ret = TryReadFromCacheAsyncUntracked(opts, reader, offset, n, result,
status, rate_limiter_priority);
bool FilePrefetchBuffer::TryReadFromCacheAsync(const IOOptions& opts,
RandomAccessFileReader* reader,
uint64_t offset, size_t n,
Slice* result, Status* status) {
bool ret =
TryReadFromCacheAsyncUntracked(opts, reader, offset, n, result, status);
if (usage_ == FilePrefetchBufferUsage::kTableOpenPrefetchTail && enable_) {
if (ret) {
RecordTick(stats_, TABLE_OPEN_PREFETCH_TAIL_HIT);
@ -699,8 +691,7 @@ bool FilePrefetchBuffer::TryReadFromCacheAsync(
bool FilePrefetchBuffer::TryReadFromCacheAsyncUntracked(
const IOOptions& opts, RandomAccessFileReader* reader, uint64_t offset,
size_t n, Slice* result, Status* status,
Env::IOPriority rate_limiter_priority) {
size_t n, Slice* result, Status* status) {
if (track_min_offset_ && offset < min_offset_read_) {
min_offset_read_ = static_cast<size_t>(offset);
}
@ -755,7 +746,7 @@ bool FilePrefetchBuffer::TryReadFromCacheAsyncUntracked(
// Prefetch n + readahead_size_/2 synchronously as remaining
// readahead_size_/2 will be prefetched asynchronously.
s = PrefetchAsyncInternal(opts, reader, offset, n, readahead_size_ / 2,
rate_limiter_priority, copy_to_third_buffer);
copy_to_third_buffer);
explicit_prefetch_submitted_ = false;
if (!s.ok()) {
if (status) {

View File

@ -183,14 +183,12 @@ class FilePrefetchBuffer {
bool Enabled() const { return enable_; }
// Load data into the buffer from a file.
// opts : the IO options to use.
// reader : the file reader.
// offset : the file offset to start reading from.
// n : the number of bytes to read.
// rate_limiter_priority : rate limiting priority, or `Env::IO_TOTAL` to
// bypass.
Status Prefetch(const IOOptions& opts, RandomAccessFileReader* reader,
uint64_t offset, size_t n,
Env::IOPriority rate_limiter_priority);
uint64_t offset, size_t n);
// Request for reading the data from a file asynchronously.
// If data already exists in the buffer, result will be updated.
@ -217,18 +215,14 @@ class FilePrefetchBuffer {
// n : the number of bytes.
// result : output buffer to put the data into.
// s : output status.
// rate_limiter_priority : rate limiting priority, or `Env::IO_TOTAL` to
// bypass.
// for_compaction : true if cache read is done for compaction read.
bool TryReadFromCache(const IOOptions& opts, RandomAccessFileReader* reader,
uint64_t offset, size_t n, Slice* result, Status* s,
Env::IOPriority rate_limiter_priority,
bool for_compaction = false);
bool TryReadFromCacheAsync(const IOOptions& opts,
RandomAccessFileReader* reader, uint64_t offset,
size_t n, Slice* result, Status* status,
Env::IOPriority rate_limiter_priority);
size_t n, Slice* result, Status* status);
// The minimum `offset` ever passed to TryReadFromCache(). This will nly be
// tracked if track_min_offset = true.
@ -305,12 +299,11 @@ class FilePrefetchBuffer {
Status PrefetchAsyncInternal(const IOOptions& opts,
RandomAccessFileReader* reader, uint64_t offset,
size_t length, size_t readahead_size,
Env::IOPriority rate_limiter_priority,
bool& copy_to_third_buffer);
Status Read(const IOOptions& opts, RandomAccessFileReader* reader,
Env::IOPriority rate_limiter_priority, uint64_t read_len,
uint64_t chunk_len, uint64_t rounddown_start, uint32_t index);
uint64_t read_len, uint64_t chunk_len, uint64_t rounddown_start,
uint32_t index);
Status ReadAsync(const IOOptions& opts, RandomAccessFileReader* reader,
uint64_t read_len, uint64_t rounddown_start, uint32_t index);
@ -409,7 +402,6 @@ class FilePrefetchBuffer {
Status HandleOverlappingData(const IOOptions& opts,
RandomAccessFileReader* reader, uint64_t offset,
size_t length, size_t readahead_size,
Env::IOPriority rate_limiter_priority,
bool& copy_to_third_buffer, uint64_t& tmp_offset,
size_t& tmp_length);
@ -417,14 +409,12 @@ class FilePrefetchBuffer {
RandomAccessFileReader* reader,
uint64_t offset, size_t n, Slice* result,
Status* s,
Env::IOPriority rate_limiter_priority,
bool for_compaction = false);
bool TryReadFromCacheAsyncUntracked(const IOOptions& opts,
RandomAccessFileReader* reader,
uint64_t offset, size_t n, Slice* result,
Status* status,
Env::IOPriority rate_limiter_priority);
Status* status);
std::vector<BufferInfo> bufs_;
// curr_ represents the index for bufs_ indicating which buffer is being

View File

@ -137,7 +137,7 @@ IOStatus GenerateOneFileChecksum(
std::string* file_checksum_func_name,
size_t verify_checksums_readahead_size, bool /*allow_mmap_reads*/,
std::shared_ptr<IOTracer>& io_tracer, RateLimiter* rate_limiter,
Env::IOPriority rate_limiter_priority) {
const ReadOptions& read_options) {
if (checksum_factory == nullptr) {
return IOStatus::InvalidArgument("Checksum factory is invalid");
}
@ -206,11 +206,15 @@ IOStatus GenerateOneFileChecksum(
Slice slice;
uint64_t offset = 0;
IOOptions opts;
io_s = reader->PrepareIOOptions(read_options, opts);
if (!io_s.ok()) {
return io_s;
}
while (size > 0) {
size_t bytes_to_read =
static_cast<size_t>(std::min(uint64_t{readahead_size}, size));
io_s = reader->Read(opts, offset, bytes_to_read, &slice, buf.get(), nullptr,
rate_limiter_priority);
io_s =
reader->Read(opts, offset, bytes_to_read, &slice, buf.get(), nullptr);
if (!io_s.ok()) {
return IOStatus::Corruption("file read failed with error: " +
io_s.ToString());

View File

@ -59,7 +59,7 @@ extern IOStatus GenerateOneFileChecksum(
std::string* file_checksum_func_name,
size_t verify_checksums_readahead_size, bool allow_mmap_reads,
std::shared_ptr<IOTracer>& io_tracer, RateLimiter* rate_limiter,
Env::IOPriority rate_limiter_priority);
const ReadOptions& read_options);
inline IOStatus PrepareIOFromReadOptions(const ReadOptions& ro,
SystemClock* clock, IOOptions& opts) {

View File

@ -2606,8 +2606,10 @@ TEST_F(FilePrefetchBufferTest, SeekWithBlockCacheHit) {
fpb.UpdateReadPattern(0, 4096, false);
// Now read some data that straddles the two prefetch buffers - offset 8192 to
// 16384
ASSERT_TRUE(fpb.TryReadFromCacheAsync(IOOptions(), r.get(), 8192, 8192,
&result, &s, Env::IOPriority::IO_LOW));
IOOptions io_opts;
io_opts.rate_limiter_priority = Env::IOPriority::IO_LOW;
ASSERT_TRUE(
fpb.TryReadFromCacheAsync(io_opts, r.get(), 8192, 8192, &result, &s));
}
TEST_F(FilePrefetchBufferTest, NoSyncWithAsyncIO) {
@ -2642,9 +2644,10 @@ TEST_F(FilePrefetchBufferTest, NoSyncWithAsyncIO) {
}
ASSERT_TRUE(s.IsTryAgain());
ASSERT_TRUE(fpb.TryReadFromCacheAsync(IOOptions(), r.get(), /*offset=*/3000,
/*length=*/4000, &async_result, &s,
Env::IOPriority::IO_LOW));
IOOptions io_opts;
io_opts.rate_limiter_priority = Env::IOPriority::IO_LOW;
ASSERT_TRUE(fpb.TryReadFromCacheAsync(io_opts, r.get(), /*offset=*/3000,
/*length=*/4000, &async_result, &s));
// No sync call should be made.
HistogramData sst_read_micros;
stats()->histogramData(SST_READ_MICROS, &sst_read_micros);

View File

@ -22,12 +22,37 @@
#include "util/rate_limiter_impl.h"
namespace ROCKSDB_NAMESPACE {
const std::array<Histograms, std::size_t(Env::IOActivity::kUnknown)>
kReadHistograms{{
FILE_READ_FLUSH_MICROS,
FILE_READ_COMPACTION_MICROS,
FILE_READ_DB_OPEN_MICROS,
}};
inline Histograms GetFileReadHistograms(Statistics* stats,
Env::IOActivity io_activity) {
switch (io_activity) {
case Env::IOActivity::kFlush:
return Histograms::FILE_READ_FLUSH_MICROS;
case Env::IOActivity::kCompaction:
return Histograms::FILE_READ_COMPACTION_MICROS;
case Env::IOActivity::kDBOpen:
return Histograms::FILE_READ_DB_OPEN_MICROS;
default:
break;
}
if (stats && stats->get_stats_level() > StatsLevel::kExceptDetailedTimers) {
switch (io_activity) {
case Env::IOActivity::kGet:
return Histograms::FILE_READ_GET_MICROS;
case Env::IOActivity::kMultiGet:
return Histograms::FILE_READ_MULTIGET_MICROS;
case Env::IOActivity::kDBIterator:
return Histograms::FILE_READ_DB_ITERATOR_MICROS;
case Env::IOActivity::kVerifyDBChecksum:
return Histograms::FILE_READ_VERIFY_DB_CHECKSUM_MICROS;
case Env::IOActivity::kVerifyFileChecksums:
return Histograms::FILE_READ_VERIFY_FILE_CHECKSUMS_MICROS;
default:
break;
}
}
return Histograms::HISTOGRAM_ENUM_MAX;
}
inline void RecordIOStats(Statistics* stats, Temperature file_temperature,
bool is_last_level, size_t size) {
IOSTATS_ADD(bytes_read, size);
@ -79,11 +104,11 @@ IOStatus RandomAccessFileReader::Create(
return io_s;
}
IOStatus RandomAccessFileReader::Read(
const IOOptions& opts, uint64_t offset, size_t n, Slice* result,
char* scratch, AlignedBuf* aligned_buf,
Env::IOPriority rate_limiter_priority) const {
IOStatus RandomAccessFileReader::Read(const IOOptions& opts, uint64_t offset,
size_t n, Slice* result, char* scratch,
AlignedBuf* aligned_buf) const {
(void)aligned_buf;
const Env::IOPriority rate_limiter_priority = opts.rate_limiter_priority;
TEST_SYNC_POINT_CALLBACK("RandomAccessFileReader::Read", nullptr);
@ -108,9 +133,7 @@ IOStatus RandomAccessFileReader::Read(
{
StopWatch sw(clock_, stats_, hist_type_,
(opts.io_activity != Env::IOActivity::kUnknown)
? kReadHistograms[(std::size_t)(opts.io_activity)]
: Histograms::HISTOGRAM_ENUM_MAX,
GetFileReadHistograms(stats_, opts.io_activity),
(stats_ != nullptr) ? &elapsed : nullptr, true /*overwrite*/,
true /*delay_enabled*/);
auto prev_perf_level = GetPerfLevel();
@ -277,9 +300,10 @@ bool TryMerge(FSReadRequest* dest, const FSReadRequest& src) {
return true;
}
IOStatus RandomAccessFileReader::MultiRead(
const IOOptions& opts, FSReadRequest* read_reqs, size_t num_reqs,
AlignedBuf* aligned_buf, Env::IOPriority rate_limiter_priority) const {
IOStatus RandomAccessFileReader::MultiRead(const IOOptions& opts,
FSReadRequest* read_reqs,
size_t num_reqs,
AlignedBuf* aligned_buf) const {
(void)aligned_buf; // suppress warning of unused variable in LITE mode
assert(num_reqs > 0);
@ -288,6 +312,7 @@ IOStatus RandomAccessFileReader::MultiRead(
assert(read_reqs[i].offset <= read_reqs[i + 1].offset);
}
#endif // !NDEBUG
const Env::IOPriority rate_limiter_priority = opts.rate_limiter_priority;
// To be paranoid modify scratch a little bit, so in case underlying
// FileSystem doesn't fill the buffer but return success and `scratch` returns
@ -304,9 +329,7 @@ IOStatus RandomAccessFileReader::MultiRead(
uint64_t elapsed = 0;
{
StopWatch sw(clock_, stats_, hist_type_,
(opts.io_activity != Env::IOActivity::kUnknown)
? kReadHistograms[(std::size_t)(opts.io_activity)]
: Histograms::HISTOGRAM_ENUM_MAX,
GetFileReadHistograms(stats_, opts.io_activity),
(stats_ != nullptr) ? &elapsed : nullptr, true /*overwrite*/,
true /*delay_enabled*/);
auto prev_perf_level = GetPerfLevel();
@ -495,16 +518,16 @@ IOStatus RandomAccessFileReader::ReadAsync(
assert(read_async_info->buf_.CurrentSize() == 0);
StopWatch sw(clock_, nullptr /*stats*/,
Histograms::HISTOGRAM_ENUM_MAX /*hist_type*/,
Histograms::HISTOGRAM_ENUM_MAX, &elapsed, true /*overwrite*/,
StopWatch sw(clock_, stats_, hist_type_,
GetFileReadHistograms(stats_, opts.io_activity),
(stats_ != nullptr) ? &elapsed : nullptr, true /*overwrite*/,
true /*delay_enabled*/);
s = file_->ReadAsync(aligned_req, opts, read_async_callback,
read_async_info, io_handle, del_fn, nullptr /*dbg*/);
} else {
StopWatch sw(clock_, nullptr /*stats*/,
Histograms::HISTOGRAM_ENUM_MAX /*hist_type*/,
Histograms::HISTOGRAM_ENUM_MAX, &elapsed, true /*overwrite*/,
StopWatch sw(clock_, stats_, hist_type_,
GetFileReadHistograms(stats_, opts.io_activity),
(stats_ != nullptr) ? &elapsed : nullptr, true /*overwrite*/,
true /*delay_enabled*/);
s = file_->ReadAsync(req, opts, read_async_callback, read_async_info,
io_handle, del_fn, nullptr /*dbg*/);

View File

@ -164,31 +164,18 @@ class RandomAccessFileReader {
// 2. Otherwise, scratch is not used and can be null, the aligned_buf owns
// the internally allocated buffer on return, and the result refers to a
// region in aligned_buf.
//
// `rate_limiter_priority` is used to charge the internal rate limiter when
// enabled. The special value `Env::IO_TOTAL` makes this operation bypass the
// rate limiter.
IOStatus Read(const IOOptions& opts, uint64_t offset, size_t n, Slice* result,
char* scratch, AlignedBuf* aligned_buf,
Env::IOPriority rate_limiter_priority) const;
char* scratch, AlignedBuf* aligned_buf) const;
// REQUIRES:
// num_reqs > 0, reqs do not overlap, and offsets in reqs are increasing.
// In non-direct IO mode, aligned_buf should be null;
// In direct IO mode, aligned_buf stores the aligned buffer allocated inside
// MultiRead, the result Slices in reqs refer to aligned_buf.
//
// `rate_limiter_priority` will be used to charge the internal rate limiter.
// It is not yet supported so the client must provide the special value
// `Env::IO_TOTAL` to bypass the rate limiter.
IOStatus MultiRead(const IOOptions& opts, FSReadRequest* reqs,
size_t num_reqs, AlignedBuf* aligned_buf,
Env::IOPriority rate_limiter_priority) const;
size_t num_reqs, AlignedBuf* aligned_buf) const;
IOStatus Prefetch(uint64_t offset, size_t n,
const Env::IOPriority rate_limiter_priority) const {
IOOptions opts;
opts.rate_limiter_priority = rate_limiter_priority;
IOStatus Prefetch(const IOOptions& opts, uint64_t offset, size_t n) const {
return file_->Prefetch(offset, n, opts, nullptr);
}

View File

@ -83,8 +83,9 @@ TEST_F(RandomAccessFileReaderTest, ReadDirectIO) {
Slice result;
AlignedBuf buf;
for (Env::IOPriority rate_limiter_priority : {Env::IO_LOW, Env::IO_TOTAL}) {
ASSERT_OK(r->Read(IOOptions(), offset, len, &result, nullptr, &buf,
rate_limiter_priority));
IOOptions io_opts;
io_opts.rate_limiter_priority = rate_limiter_priority;
ASSERT_OK(r->Read(io_opts, offset, len, &result, nullptr, &buf));
ASSERT_EQ(result.ToString(), content.substr(offset, len));
}
}
@ -146,8 +147,8 @@ TEST_F(RandomAccessFileReaderTest, MultiReadDirectIO) {
reqs.push_back(std::move(r0));
reqs.push_back(std::move(r1));
AlignedBuf aligned_buf;
ASSERT_OK(r->MultiRead(IOOptions(), reqs.data(), reqs.size(), &aligned_buf,
Env::IO_TOTAL /*rate_limiter_priority*/));
ASSERT_OK(
r->MultiRead(IOOptions(), reqs.data(), reqs.size(), &aligned_buf));
AssertResult(content, reqs);
@ -191,8 +192,8 @@ TEST_F(RandomAccessFileReaderTest, MultiReadDirectIO) {
reqs.push_back(std::move(r1));
reqs.push_back(std::move(r2));
AlignedBuf aligned_buf;
ASSERT_OK(r->MultiRead(IOOptions(), reqs.data(), reqs.size(), &aligned_buf,
Env::IO_TOTAL /*rate_limiter_priority*/));
ASSERT_OK(
r->MultiRead(IOOptions(), reqs.data(), reqs.size(), &aligned_buf));
AssertResult(content, reqs);
@ -236,8 +237,8 @@ TEST_F(RandomAccessFileReaderTest, MultiReadDirectIO) {
reqs.push_back(std::move(r1));
reqs.push_back(std::move(r2));
AlignedBuf aligned_buf;
ASSERT_OK(r->MultiRead(IOOptions(), reqs.data(), reqs.size(), &aligned_buf,
Env::IO_TOTAL /*rate_limiter_priority*/));
ASSERT_OK(
r->MultiRead(IOOptions(), reqs.data(), reqs.size(), &aligned_buf));
AssertResult(content, reqs);
@ -273,8 +274,8 @@ TEST_F(RandomAccessFileReaderTest, MultiReadDirectIO) {
reqs.push_back(std::move(r0));
reqs.push_back(std::move(r1));
AlignedBuf aligned_buf;
ASSERT_OK(r->MultiRead(IOOptions(), reqs.data(), reqs.size(), &aligned_buf,
Env::IO_TOTAL /*rate_limiter_priority*/));
ASSERT_OK(
r->MultiRead(IOOptions(), reqs.data(), reqs.size(), &aligned_buf));
AssertResult(content, reqs);

View File

@ -99,6 +99,9 @@ class SequentialFileReader {
// when less than n bytes are actually read (e.g. at end of file). To avoid
// overcharging the rate limiter, the caller can use file size to cap n to
// read until end of file.
//
// TODO(hx235): accept parameter `IOOptions` containing
// `rate_limiter_priority` like RandomAccessFileReader::Read()
IOStatus Read(size_t n, Slice* result, char* scratch,
Env::IOPriority rate_limiter_priority);

View File

@ -459,7 +459,7 @@ Status VerifySstFileChecksum(const Options& options,
// Verify the checksum of file
Status VerifySstFileChecksum(const Options& options,
const EnvOptions& env_options,
const ReadOptions& read_options,
const ReadOptions& _read_options,
const std::string& file_path,
const SequenceNumber& largest_seqno = 0);

View File

@ -441,6 +441,11 @@ class Env : public Customizable {
kFlush = 0,
kCompaction = 1,
kDBOpen = 2,
kGet = 3,
kMultiGet = 4,
kDBIterator = 5,
kVerifyDBChecksum = 6,
kVerifyFileChecksums = 7,
kUnknown, // Keep last for easy array of non-unknowns
};

View File

@ -555,6 +555,13 @@ enum Histograms : uint32_t {
FILE_READ_FLUSH_MICROS,
FILE_READ_COMPACTION_MICROS,
FILE_READ_DB_OPEN_MICROS,
// The following `FILE_READ_*` require stats level greater than
// `StatsLevel::kExceptDetailedTimers`
FILE_READ_GET_MICROS,
FILE_READ_MULTIGET_MICROS,
FILE_READ_DB_ITERATOR_MICROS,
FILE_READ_VERIFY_DB_CHECKSUM_MICROS,
FILE_READ_VERIFY_FILE_CHECKSUMS_MICROS,
// The number of subcompactions actually scheduled during a compaction
NUM_SUBCOMPACTIONS_SCHEDULED,

View File

@ -57,6 +57,11 @@ struct ThreadStatus {
OP_COMPACTION,
OP_FLUSH,
OP_DBOPEN,
OP_GET,
OP_MULTIGET,
OP_DBITERATOR,
OP_VERIFY_DB_CHECKSUM,
OP_VERIFY_FILE_CHECKSUMS,
NUM_OP_TYPES
};

View File

@ -335,8 +335,22 @@ class Transaction {
const size_t num_keys, const Slice* keys,
PinnableSlice* values, Status* statuses,
const bool /*sorted_input*/ = false) {
if (options.io_activity != Env::IOActivity::kUnknown &&
options.io_activity != Env::IOActivity::kMultiGet) {
Status s = Status::InvalidArgument(
"Can only call MultiGet with `ReadOptions::io_activity` is "
"`Env::IOActivity::kUnknown` or `Env::IOActivity::kMultiGet`");
for (size_t i = 0; i < num_keys; ++i) {
statuses[i] = Get(options, column_family, keys[i], &values[i]);
if (statuses[i].ok()) {
statuses[i] = s;
}
}
return;
}
for (size_t i = 0; i < num_keys; ++i) {
statuses[i] = GetImpl(options, column_family, keys[i], &values[i]);
}
}
@ -673,6 +687,21 @@ class Transaction {
id_ = id;
}
virtual Status GetImpl(const ReadOptions& /* options */,
ColumnFamilyHandle* /* column_family */,
const Slice& /* key */, std::string* /* value */) {
return Status::NotSupported("Not implemented");
}
virtual Status GetImpl(const ReadOptions& options,
ColumnFamilyHandle* column_family, const Slice& key,
PinnableSlice* pinnable_val) {
assert(pinnable_val != nullptr);
auto s = GetImpl(options, column_family, key, pinnable_val->GetSelf());
pinnable_val->PinSelf();
return s;
}
virtual uint64_t GetLastLogNumber() const { return log_number_; }
private:

View File

@ -5629,6 +5629,17 @@ class HistogramTypeJni {
return 0x3B;
case ROCKSDB_NAMESPACE::Histograms::FILE_READ_DB_OPEN_MICROS:
return 0x3C;
case ROCKSDB_NAMESPACE::Histograms::FILE_READ_GET_MICROS:
return 0x3D;
case ROCKSDB_NAMESPACE::Histograms::FILE_READ_MULTIGET_MICROS:
return 0x3E;
case ROCKSDB_NAMESPACE::Histograms::FILE_READ_DB_ITERATOR_MICROS:
return 0x3F;
case ROCKSDB_NAMESPACE::Histograms::FILE_READ_VERIFY_DB_CHECKSUM_MICROS:
return 0x40;
case ROCKSDB_NAMESPACE::Histograms::
FILE_READ_VERIFY_FILE_CHECKSUMS_MICROS:
return 0x41;
case ROCKSDB_NAMESPACE::Histograms::HISTOGRAM_ENUM_MAX:
// 0x1F for backwards compatibility on current minor version.
return 0x1F;
@ -5754,6 +5765,18 @@ class HistogramTypeJni {
return ROCKSDB_NAMESPACE::Histograms::FILE_READ_COMPACTION_MICROS;
case 0x3C:
return ROCKSDB_NAMESPACE::Histograms::FILE_READ_DB_OPEN_MICROS;
case 0x3D:
return ROCKSDB_NAMESPACE::Histograms::FILE_READ_GET_MICROS;
case 0x3E:
return ROCKSDB_NAMESPACE::Histograms::FILE_READ_MULTIGET_MICROS;
case 0x3F:
return ROCKSDB_NAMESPACE::Histograms::FILE_READ_DB_ITERATOR_MICROS;
case 0x40:
return ROCKSDB_NAMESPACE::Histograms::
FILE_READ_VERIFY_DB_CHECKSUM_MICROS;
case 0x41:
return ROCKSDB_NAMESPACE::Histograms::
FILE_READ_VERIFY_FILE_CHECKSUMS_MICROS;
case 0x1F:
// 0x1F for backwards compatibility on current minor version.
return ROCKSDB_NAMESPACE::Histograms::HISTOGRAM_ENUM_MAX;

View File

@ -175,6 +175,16 @@ public enum HistogramType {
FILE_READ_DB_OPEN_MICROS((byte) 0x3C),
FILE_READ_GET_MICROS((byte) 0x3D),
FILE_READ_MULTIGET_MICROS((byte) 0x3E),
FILE_READ_DB_ITERATOR_MICROS((byte) 0x3F),
FILE_READ_VERIFY_DB_CHECKSUM_MICROS((byte) 0x40),
FILE_READ_VERIFY_FILE_CHECKSUMS_MICROS((byte) 0x41),
// 0x1F for backwards compatibility on current minor version.
HISTOGRAM_ENUM_MAX((byte) 0x1F);

View File

@ -1555,8 +1555,7 @@ static void RandomAccessFileReaderRead(benchmark::State& state) {
uint64_t idx = 0;
for (auto _ : state) {
s = readers[idx++ % kFileNum]->Read(io_options, 0, kDefaultPageSize / 3,
&result, scratch.get(), nullptr,
Env::IO_TOTAL);
&result, scratch.get(), nullptr);
if (!s.ok()) {
state.SkipWithError(s.ToString().c_str());
}

View File

@ -281,6 +281,13 @@ const std::vector<std::pair<Histograms, std::string>> HistogramsNameMap = {
{FILE_READ_FLUSH_MICROS, "rocksdb.file.read.flush.micros"},
{FILE_READ_COMPACTION_MICROS, "rocksdb.file.read.compaction.micros"},
{FILE_READ_DB_OPEN_MICROS, "rocksdb.file.read.db.open.micros"},
{FILE_READ_GET_MICROS, "rocksdb.file.read.get.micros"},
{FILE_READ_MULTIGET_MICROS, "rocksdb.file.read.multiget.micros"},
{FILE_READ_DB_ITERATOR_MICROS, "rocksdb.file.read.db.iterator.micros"},
{FILE_READ_VERIFY_DB_CHECKSUM_MICROS,
"rocksdb.file.read.verify.db.checksum.micros"},
{FILE_READ_VERIFY_FILE_CHECKSUMS_MICROS,
"rocksdb.file.read.verify.file.checksums.micros"},
{NUM_SUBCOMPACTIONS_SCHEDULED, "rocksdb.num.subcompactions.scheduled"},
{BYTES_PER_READ, "rocksdb.bytes.per.read"},
{BYTES_PER_WRITE, "rocksdb.bytes.per.write"},

View File

@ -36,6 +36,16 @@ Env::IOActivity ThreadStatusUtil::TEST_GetExpectedIOActivity(
return Env::IOActivity::kCompaction;
case ThreadStatus::OperationType::OP_DBOPEN:
return Env::IOActivity::kDBOpen;
case ThreadStatus::OperationType::OP_GET:
return Env::IOActivity::kGet;
case ThreadStatus::OperationType::OP_MULTIGET:
return Env::IOActivity::kMultiGet;
case ThreadStatus::OperationType::OP_DBITERATOR:
return Env::IOActivity::kDBIterator;
case ThreadStatus::OperationType::OP_VERIFY_DB_CHECKSUM:
return Env::IOActivity::kVerifyDBChecksum;
case ThreadStatus::OperationType::OP_VERIFY_FILE_CHECKSUMS:
return Env::IOActivity::kVerifyFileChecksums;
default:
return Env::IOActivity::kUnknown;
}

View File

@ -281,7 +281,7 @@ void BlockBasedTableIterator::InitDataBlock() {
// Enabled from the very first IO when ReadOptions.readahead_size is set.
block_prefetcher_.PrefetchIfNeeded(
rep, data_block_handle, read_options_.readahead_size, is_for_compaction,
/*no_sequential_checking=*/false, read_options_.rate_limiter_priority);
/*no_sequential_checking=*/false, read_options_);
Status s;
table_->NewDataBlockIterator<DataBlockIter>(
read_options_, data_block_handle, &block_iter_, BlockType::kData,
@ -326,7 +326,7 @@ void BlockBasedTableIterator::AsyncInitDataBlock(bool is_first_pass) {
block_prefetcher_.PrefetchIfNeeded(
rep, data_block_handle, read_options_.readahead_size,
is_for_compaction, /*no_sequential_checking=*/read_options_.async_io,
read_options_.rate_limiter_priority);
read_options_);
Status s;
table_->NewDataBlockIterator<DataBlockIter>(

View File

@ -860,10 +860,11 @@ Status BlockBasedTable::PrefetchTail(
&prefetch_off_len_pair);
#endif // NDEBUG
IOOptions opts;
Status s = file->PrepareIOOptions(ro, opts);
// Try file system prefetch
if (!file->use_direct_io() && !force_direct_prefetch) {
if (!file->Prefetch(prefetch_off, prefetch_len, ro.rate_limiter_priority)
.IsNotSupported()) {
if (s.ok() && !file->use_direct_io() && !force_direct_prefetch) {
if (!file->Prefetch(opts, prefetch_off, prefetch_len).IsNotSupported()) {
prefetch_buffer->reset(new FilePrefetchBuffer(
0 /* readahead_size */, 0 /* max_readahead_size */,
false /* enable */, true /* track_min_offset */));
@ -879,12 +880,8 @@ Status BlockBasedTable::PrefetchTail(
nullptr /* fs */, nullptr /* clock */, stats,
FilePrefetchBufferUsage::kTableOpenPrefetchTail));
IOOptions opts;
Status s = file->PrepareIOOptions(ro, opts);
if (s.ok()) {
s = (*prefetch_buffer)
->Prefetch(opts, file, prefetch_off, prefetch_len,
ro.rate_limiter_priority);
s = (*prefetch_buffer)->Prefetch(opts, file, prefetch_off, prefetch_len);
}
return s;
}

View File

@ -144,7 +144,7 @@ DEFINE_SYNC_AND_ASYNC(void, BlockBasedTable::RetrieveMultipleBlocks)
if (file->use_direct_io()) {
#endif // WITH_COROUTINES
s = file->MultiRead(opts, &read_reqs[0], read_reqs.size(),
&direct_io_buf, options.rate_limiter_priority);
&direct_io_buf);
#if defined(WITH_COROUTINES)
} else {
co_await batch->context()->reader().MultiReadAsync(

View File

@ -12,11 +12,12 @@
#include "table/block_based/block_based_table_reader.h"
namespace ROCKSDB_NAMESPACE {
void BlockPrefetcher::PrefetchIfNeeded(
const BlockBasedTable::Rep* rep, const BlockHandle& handle,
const size_t readahead_size, bool is_for_compaction,
void BlockPrefetcher::PrefetchIfNeeded(const BlockBasedTable::Rep* rep,
const BlockHandle& handle,
const size_t readahead_size,
bool is_for_compaction,
const bool no_sequential_checking,
const Env::IOPriority rate_limiter_priority) {
const ReadOptions& read_options) {
const size_t len = BlockBasedTable::BlockSizeWithTrailer(handle);
const size_t offset = handle.offset();
@ -27,8 +28,12 @@ void BlockPrefetcher::PrefetchIfNeeded(
if (offset + len <= readahead_limit_) {
return;
}
Status s = rep->file->Prefetch(offset, len + compaction_readahead_size_,
rate_limiter_priority);
IOOptions opts;
Status s = rep->file->PrepareIOOptions(read_options, opts);
if (!s.ok()) {
return;
}
s = rep->file->Prefetch(opts, offset, len + compaction_readahead_size_);
if (s.ok()) {
readahead_limit_ = offset + len + compaction_readahead_size_;
return;
@ -117,10 +122,14 @@ void BlockPrefetcher::PrefetchIfNeeded(
// If prefetch is not supported, fall back to use internal prefetch buffer.
// Discarding other return status of Prefetch calls intentionally, as
// we can fallback to reading from disk if Prefetch fails.
Status s = rep->file->Prefetch(
handle.offset(),
BlockBasedTable::BlockSizeWithTrailer(handle) + readahead_size_,
rate_limiter_priority);
IOOptions opts;
Status s = rep->file->PrepareIOOptions(read_options, opts);
if (!s.ok()) {
return;
}
s = rep->file->Prefetch(
opts, handle.offset(),
BlockBasedTable::BlockSizeWithTrailer(handle) + readahead_size_);
if (s.IsNotSupported()) {
rep->CreateFilePrefetchBufferIfNotExists(
initial_auto_readahead_size_, max_auto_readahead_size,

View File

@ -22,7 +22,7 @@ class BlockPrefetcher {
const BlockHandle& handle, size_t readahead_size,
bool is_for_compaction,
const bool no_sequential_checking,
Env::IOPriority rate_limiter_priority);
const ReadOptions& read_options);
FilePrefetchBuffer* prefetch_buffer() { return prefetch_buffer_.get(); }
void UpdateReadPattern(const uint64_t& offset, const size_t& len) {

View File

@ -503,8 +503,7 @@ Status PartitionedFilterBlockReader::CacheDependencies(
s = rep->file->PrepareIOOptions(ro, opts);
if (s.ok()) {
s = prefetch_buffer->Prefetch(opts, rep->file.get(), prefetch_off,
static_cast<size_t>(prefetch_len),
ro.rate_limiter_priority);
static_cast<size_t>(prefetch_len));
}
if (!s.ok()) {
return s;

View File

@ -91,8 +91,7 @@ void PartitionedIndexIterator::InitPartitionedIndexBlock() {
// Enabled from the very first IO when ReadOptions.readahead_size is set.
block_prefetcher_.PrefetchIfNeeded(
rep, partitioned_index_handle, read_options_.readahead_size,
is_for_compaction, /*no_sequential_checking=*/false,
read_options_.rate_limiter_priority);
is_for_compaction, /*no_sequential_checking=*/false, read_options_);
Status s;
table_->NewDataBlockIterator<IndexBlockIter>(
read_options_, partitioned_index_handle, &block_iter_,

View File

@ -175,8 +175,7 @@ Status PartitionIndexReader::CacheDependencies(
Status s = rep->file->PrepareIOOptions(ro, opts);
if (s.ok()) {
s = prefetch_buffer->Prefetch(opts, rep->file.get(), prefetch_off,
static_cast<size_t>(prefetch_len),
ro.rate_limiter_priority);
static_cast<size_t>(prefetch_len));
}
if (!s.ok()) {
return s;

View File

@ -80,11 +80,11 @@ inline bool BlockFetcher::TryGetFromPrefetchBuffer() {
if (read_options_.async_io && !for_compaction_) {
read_from_prefetch_buffer = prefetch_buffer_->TryReadFromCacheAsync(
opts, file_, handle_.offset(), block_size_with_trailer_, &slice_,
&io_s, read_options_.rate_limiter_priority);
&io_s);
} else {
read_from_prefetch_buffer = prefetch_buffer_->TryReadFromCache(
opts, file_, handle_.offset(), block_size_with_trailer_, &slice_,
&io_s, read_options_.rate_limiter_priority, for_compaction_);
&io_s, for_compaction_);
}
if (read_from_prefetch_buffer) {
ProcessTrailerIfPresent();
@ -259,18 +259,18 @@ IOStatus BlockFetcher::ReadBlockContents() {
if (file_->use_direct_io()) {
PERF_TIMER_GUARD(block_read_time);
PERF_CPU_TIMER_GUARD(block_read_cpu_time, nullptr);
io_status_ = file_->Read(
opts, handle_.offset(), block_size_with_trailer_, &slice_, nullptr,
&direct_io_buf_, read_options_.rate_limiter_priority);
io_status_ =
file_->Read(opts, handle_.offset(), block_size_with_trailer_,
&slice_, nullptr, &direct_io_buf_);
PERF_COUNTER_ADD(block_read_count, 1);
used_buf_ = const_cast<char*>(slice_.data());
} else {
PrepareBufferForBlockFromFile();
PERF_TIMER_GUARD(block_read_time);
PERF_CPU_TIMER_GUARD(block_read_cpu_time, nullptr);
io_status_ = file_->Read(opts, handle_.offset(),
block_size_with_trailer_, &slice_, used_buf_,
nullptr, read_options_.rate_limiter_priority);
io_status_ =
file_->Read(opts, handle_.offset(), block_size_with_trailer_,
&slice_, used_buf_, nullptr);
PERF_COUNTER_ADD(block_read_count, 1);
#ifndef NDEBUG
if (slice_.data() == &stack_buf_[0]) {

View File

@ -122,8 +122,7 @@ class CuckooBuilderTest : public testing::Test {
for (uint32_t i = 0; i + 1 < table_size + cuckoo_block_size; ++i) {
Slice read_slice;
ASSERT_OK(file_reader->Read(IOOptions(), i * bucket_size, bucket_size,
&read_slice, nullptr, nullptr,
Env::IO_TOTAL /* rate_limiter_priority */));
&read_slice, nullptr, nullptr));
size_t key_idx =
std::find(expected_locations.begin(), expected_locations.end(), i) -
expected_locations.begin();

View File

@ -144,9 +144,8 @@ CuckooTableReader::CuckooTableReader(
*reinterpret_cast<const uint32_t*>(cuckoo_block_size->second.data());
cuckoo_block_bytes_minus_one_ = cuckoo_block_size_ * bucket_length_ - 1;
// TODO: rate limit reads of whole cuckoo tables.
status_ =
file_->Read(IOOptions(), 0, static_cast<size_t>(file_size), &file_data_,
nullptr, nullptr, Env::IO_TOTAL /* rate_limiter_priority */);
status_ = file_->Read(IOOptions(), 0, static_cast<size_t>(file_size),
&file_data_, nullptr, nullptr);
}
Status CuckooTableReader::Get(const ReadOptions& /*readOptions*/,

View File

@ -509,18 +509,16 @@ Status ReadFooterFromFile(const IOOptions& opts, RandomAccessFileReader* file,
// need to pass a timeout at that point
// TODO: rate limit footer reads.
if (prefetch_buffer == nullptr ||
!prefetch_buffer->TryReadFromCache(
opts, file, read_offset, Footer::kMaxEncodedLength, &footer_input,
nullptr, opts.rate_limiter_priority)) {
!prefetch_buffer->TryReadFromCache(opts, file, read_offset,
Footer::kMaxEncodedLength,
&footer_input, nullptr)) {
if (file->use_direct_io()) {
s = file->Read(opts, read_offset, Footer::kMaxEncodedLength,
&footer_input, nullptr, &internal_buf,
opts.rate_limiter_priority);
&footer_input, nullptr, &internal_buf);
} else {
footer_buf.reserve(Footer::kMaxEncodedLength);
s = file->Read(opts, read_offset, Footer::kMaxEncodedLength,
&footer_input, &footer_buf[0], nullptr,
opts.rate_limiter_priority);
&footer_input, &footer_buf[0], nullptr);
}
if (!s.ok()) return s;
}

View File

@ -305,8 +305,7 @@ Status MockTableFactory::GetIDFromFile(RandomAccessFileReader* file,
uint32_t* id) const {
char buf[4];
Slice result;
Status s = file->Read(IOOptions(), 0, 4, &result, buf, nullptr,
Env::IO_TOTAL /* rate_limiter_priority */);
Status s = file->Read(IOOptions(), 0, 4, &result, buf, nullptr);
assert(result.size() == 4);
*id = DecodeFixed32(buf);
return s;

View File

@ -215,8 +215,7 @@ bool PlainTableFileReader::ReadNonMmap(uint32_t file_offset, uint32_t len,
// TODO: rate limit plain table reads.
Status s =
file_info_->file->Read(IOOptions(), file_offset, size_to_read,
&read_result, new_buffer->buf.get(), nullptr,
Env::IO_TOTAL /* rate_limiter_priority */);
&read_result, new_buffer->buf.get(), nullptr);
if (!s.ok()) {
status_ = s;
return false;

View File

@ -284,9 +284,9 @@ void PlainTableReader::FillBloom(const std::vector<uint32_t>& prefix_hashes) {
Status PlainTableReader::MmapDataIfNeeded() {
if (file_info_.is_mmap_mode) {
// Get mmapped memory.
return file_info_.file->Read(
IOOptions(), 0, static_cast<size_t>(file_size_), &file_info_.file_data,
nullptr, nullptr, Env::IO_TOTAL /* rate_limiter_priority */);
return file_info_.file->Read(IOOptions(), 0,
static_cast<size_t>(file_size_),
&file_info_.file_data, nullptr, nullptr);
}
return Status::OK();
}

View File

@ -109,8 +109,7 @@ Status SstFileDumper::GetTableReader(const std::string& file_path) {
uint64_t prefetch_off = file_size - prefetch_size;
IOOptions opts;
s = prefetch_buffer.Prefetch(opts, file_.get(), prefetch_off,
static_cast<size_t>(prefetch_size),
Env::IO_TOTAL /* rate_limiter_priority */);
static_cast<size_t>(prefetch_size));
s = ReadFooterFromFile(opts, file_.get(), *fs, &prefetch_buffer, file_size,
&footer);

View File

@ -1330,7 +1330,7 @@ class FileChecksumTestHelper {
uint64_t offset = 0;
Status s;
s = file_reader_->Read(IOOptions(), offset, 2048, &result, scratch.get(),
nullptr, Env::IO_TOTAL /* rate_limiter_priority */);
nullptr);
if (!s.ok()) {
return s;
}
@ -1338,8 +1338,7 @@ class FileChecksumTestHelper {
file_checksum_generator->Update(scratch.get(), result.size());
offset += static_cast<uint64_t>(result.size());
s = file_reader_->Read(IOOptions(), offset, 2048, &result, scratch.get(),
nullptr,
Env::IO_TOTAL /* rate_limiter_priority */);
nullptr);
if (!s.ok()) {
return s;
}
@ -5396,16 +5395,13 @@ TEST_F(BBTTailPrefetchTest, FilePrefetchBufferMinOffset) {
IOOptions opts;
buffer.TryReadFromCache(opts, nullptr /* reader */, 500 /* offset */,
10 /* n */, nullptr /* result */,
nullptr /* status */,
Env::IO_TOTAL /* rate_limiter_priority */);
nullptr /* status */);
buffer.TryReadFromCache(opts, nullptr /* reader */, 480 /* offset */,
10 /* n */, nullptr /* result */,
nullptr /* status */,
Env::IO_TOTAL /* rate_limiter_priority */);
nullptr /* status */);
buffer.TryReadFromCache(opts, nullptr /* reader */, 490 /* offset */,
10 /* n */, nullptr /* result */,
nullptr /* status */,
Env::IO_TOTAL /* rate_limiter_priority */);
nullptr /* status */);
ASSERT_EQ(480, buffer.min_offset_read());
}

View File

@ -169,6 +169,7 @@ default_params = {
),
"level_compaction_dynamic_level_bytes": lambda: random.randint(0, 1),
"verify_checksum_one_in": 1000000,
"verify_file_checksums_one_in": 1000000,
"verify_db_one_in": 100000,
"continuous_verification_interval": 0,
"max_key_len": 3,
@ -658,6 +659,8 @@ def finalize_and_sanitize(src_params):
dest_params["ingest_external_file_one_in"] = 0
dest_params["use_merge"] = 0
dest_params["use_full_merge_v1"] = 0
if dest_params["file_checksum_impl"] == "none":
dest_params["verify_file_checksums_one_in"] = 0
return dest_params

View File

@ -0,0 +1 @@
Statistics `rocksdb.sst.read.micros` now includes time spent on multi read and async read into the file

View File

@ -0,0 +1 @@
New statistics `rocksdb.file.read.{db.open|get|multiget|db.iterator|verify.checksum|verify.file.checksums}.micros` measure read time of block-based SST tables or blob files during db open, `Get()`, `MultiGet()`, using db iterator, `VerifyFileChecksums()` and `VerifyChecksum()`. They require stats level greater than `StatsLevel::kExceptDetailedTimers`.

View File

@ -39,7 +39,14 @@ static OperationInfo global_operation_table[] = {
{ThreadStatus::OP_UNKNOWN, ""},
{ThreadStatus::OP_COMPACTION, "Compaction"},
{ThreadStatus::OP_FLUSH, "Flush"},
{ThreadStatus::OP_DBOPEN, "DBOpen"}};
{ThreadStatus::OP_DBOPEN, "DBOpen"},
{ThreadStatus::OP_GET, "Get"},
{ThreadStatus::OP_MULTIGET, "MultiGet"},
{ThreadStatus::OP_DBITERATOR, "DBIterator"},
{ThreadStatus::OP_VERIFY_DB_CHECKSUM, "VerifyDBChecksum"},
{ThreadStatus::OP_VERIFY_FILE_CHECKSUMS, "VerifyFileChecksums"},
};
struct OperationStageInfo {
const ThreadStatus::OperationStage stage;

View File

@ -1384,28 +1384,46 @@ Status BlobDBImpl::AppendBlob(const std::shared_ptr<BlobFile>& bfile,
return s;
}
std::vector<Status> BlobDBImpl::MultiGet(const ReadOptions& read_options,
std::vector<Status> BlobDBImpl::MultiGet(const ReadOptions& _read_options,
const std::vector<Slice>& keys,
std::vector<std::string>* values) {
StopWatch multiget_sw(clock_, statistics_, BLOB_DB_MULTIGET_MICROS);
RecordTick(statistics_, BLOB_DB_NUM_MULTIGET);
// Get a snapshot to avoid blob file get deleted between we
// fetch and index entry and reading from the file.
ReadOptions ro(read_options);
bool snapshot_created = SetSnapshotIfNeeded(&ro);
std::vector<Status> statuses;
statuses.reserve(keys.size());
std::size_t num_keys = keys.size();
statuses.reserve(num_keys);
if (_read_options.io_activity != Env::IOActivity::kUnknown &&
_read_options.io_activity != Env::IOActivity::kMultiGet) {
Status s = Status::InvalidArgument(
"Can only call MultiGet with `ReadOptions::io_activity` is "
"`Env::IOActivity::kUnknown` or `Env::IOActivity::kMultiGet`");
for (size_t i = 0; i < num_keys; ++i) {
statuses.push_back(s);
}
return statuses;
}
ReadOptions read_options(_read_options);
if (read_options.io_activity == Env::IOActivity::kUnknown) {
read_options.io_activity = Env::IOActivity::kMultiGet;
}
bool snapshot_created = SetSnapshotIfNeeded(&read_options);
values->clear();
values->reserve(keys.size());
PinnableSlice value;
for (size_t i = 0; i < keys.size(); i++) {
statuses.push_back(Get(ro, DefaultColumnFamily(), keys[i], &value));
statuses.push_back(
GetImpl(read_options, DefaultColumnFamily(), keys[i], &value));
values->push_back(value.ToString());
value.Reset();
}
if (snapshot_created) {
db_->ReleaseSnapshot(ro.snapshot);
db_->ReleaseSnapshot(read_options.snapshot);
}
return statuses;
}
@ -1544,12 +1562,12 @@ Status BlobDBImpl::GetRawBlobFromFile(const Slice& key, uint64_t file_number,
if (reader->use_direct_io()) {
s = reader->Read(IOOptions(), record_offset,
static_cast<size_t>(record_size), &blob_record, nullptr,
&aligned_buf, Env::IO_TOTAL /* rate_limiter_priority */);
&aligned_buf);
} else {
buf.reserve(static_cast<size_t>(record_size));
s = reader->Read(IOOptions(), record_offset,
static_cast<size_t>(record_size), &blob_record, &buf[0],
nullptr, Env::IO_TOTAL /* rate_limiter_priority */);
nullptr);
}
RecordTick(statistics_, BLOB_DB_BLOB_FILE_BYTES_READ, blob_record.size());
}
@ -1609,16 +1627,36 @@ Status BlobDBImpl::GetRawBlobFromFile(const Slice& key, uint64_t file_number,
return Status::OK();
}
Status BlobDBImpl::Get(const ReadOptions& read_options,
Status BlobDBImpl::Get(const ReadOptions& _read_options,
ColumnFamilyHandle* column_family, const Slice& key,
PinnableSlice* value) {
return Get(read_options, column_family, key, value,
static_cast<uint64_t*>(nullptr) /*expiration*/);
if (_read_options.io_activity != Env::IOActivity::kUnknown &&
_read_options.io_activity != Env::IOActivity::kGet) {
return Status::InvalidArgument(
"Can only call Get with `ReadOptions::io_activity` is "
"`Env::IOActivity::kUnknown` or `Env::IOActivity::kGet`");
}
ReadOptions read_options(_read_options);
if (read_options.io_activity == Env::IOActivity::kUnknown) {
read_options.io_activity = Env::IOActivity::kGet;
}
return GetImpl(read_options, column_family, key, value);
}
Status BlobDBImpl::Get(const ReadOptions& read_options,
Status BlobDBImpl::Get(const ReadOptions& _read_options,
ColumnFamilyHandle* column_family, const Slice& key,
PinnableSlice* value, uint64_t* expiration) {
if (_read_options.io_activity != Env::IOActivity::kUnknown &&
_read_options.io_activity != Env::IOActivity::kGet) {
return Status::InvalidArgument(
"Can only call Get with `ReadOptions::io_activity` is "
"`Env::IOActivity::kUnknown` or `Env::IOActivity::kGet`");
}
ReadOptions read_options(_read_options);
if (read_options.io_activity == Env::IOActivity::kUnknown) {
read_options.io_activity = Env::IOActivity::kGet;
}
StopWatch get_sw(clock_, statistics_, BLOB_DB_GET_MICROS);
RecordTick(statistics_, BLOB_DB_NUM_GET);
return GetImpl(read_options, column_family, key, value, expiration);
@ -1631,11 +1669,6 @@ Status BlobDBImpl::GetImpl(const ReadOptions& read_options,
return Status::NotSupported(
"Blob DB doesn't support non-default column family.");
}
if (read_options.io_activity != Env::IOActivity::kUnknown) {
return Status::InvalidArgument(
"Cannot call Get with `ReadOptions::io_activity` != "
"`Env::IOActivity::kUnknown`");
}
// Get a snapshot to avoid blob file get deleted between we
// fetch and index entry and reading from the file.
// TODO(yiwu): For Get() retry if file not found would be a simpler strategy.
@ -2040,11 +2073,16 @@ void BlobDBImpl::CopyBlobFiles(
}
}
Iterator* BlobDBImpl::NewIterator(const ReadOptions& read_options) {
if (read_options.io_activity != Env::IOActivity::kUnknown) {
Iterator* BlobDBImpl::NewIterator(const ReadOptions& _read_options) {
if (_read_options.io_activity != Env::IOActivity::kUnknown &&
_read_options.io_activity != Env::IOActivity::kDBIterator) {
return NewErrorIterator(Status::InvalidArgument(
"Cannot call NewIterator with `ReadOptions::io_activity` != "
"`Env::IOActivity::kUnknown`"));
"Can only call NewIterator with `ReadOptions::io_activity` is "
"`Env::IOActivity::kUnknown` or `Env::IOActivity::kDBIterator`"));
}
ReadOptions read_options(_read_options);
if (read_options.io_activity == Env::IOActivity::kUnknown) {
read_options.io_activity = Env::IOActivity::kDBIterator;
}
auto* cfd =
static_cast_with_check<ColumnFamilyHandleImpl>(DefaultColumnFamily())

View File

@ -103,12 +103,13 @@ class BlobDBImpl : public BlobDB {
const Slice& value) override;
using BlobDB::Get;
Status Get(const ReadOptions& read_options, ColumnFamilyHandle* column_family,
const Slice& key, PinnableSlice* value) override;
Status Get(const ReadOptions& _read_options,
ColumnFamilyHandle* column_family, const Slice& key,
PinnableSlice* value) override;
Status Get(const ReadOptions& read_options, ColumnFamilyHandle* column_family,
const Slice& key, PinnableSlice* value,
uint64_t* expiration) override;
Status Get(const ReadOptions& _read_options,
ColumnFamilyHandle* column_family, const Slice& key,
PinnableSlice* value, uint64_t* expiration) override;
using BlobDB::NewIterator;
virtual Iterator* NewIterator(const ReadOptions& read_options) override;
@ -123,7 +124,7 @@ class BlobDBImpl : public BlobDB {
using BlobDB::MultiGet;
virtual std::vector<Status> MultiGet(
const ReadOptions& read_options, const std::vector<Slice>& keys,
const ReadOptions& _read_options, const std::vector<Slice>& keys,
std::vector<std::string>* values) override;
using BlobDB::Write;

View File

@ -102,8 +102,8 @@ Status BlobDumpTool::Read(uint64_t offset, size_t size, Slice* result) {
}
buffer_.reset(new char[buffer_size_]);
}
Status s = reader_->Read(IOOptions(), offset, size, result, buffer_.get(),
nullptr, Env::IO_TOTAL /* rate_limiter_priority */);
Status s =
reader_->Read(IOOptions(), offset, size, result, buffer_.get(), nullptr);
if (!s.ok()) {
return s;
}
@ -277,4 +277,3 @@ std::string BlobDumpTool::GetString(std::pair<T, T> p) {
} // namespace blob_db
} // namespace ROCKSDB_NAMESPACE

View File

@ -114,13 +114,11 @@ Status BlobFile::ReadFooter(BlobLogFooter* bf) {
// TODO: rate limit reading footers from blob files.
if (ra_file_reader_->use_direct_io()) {
s = ra_file_reader_->Read(IOOptions(), footer_offset, BlobLogFooter::kSize,
&result, nullptr, &aligned_buf,
Env::IO_TOTAL /* rate_limiter_priority */);
&result, nullptr, &aligned_buf);
} else {
buf.reserve(BlobLogFooter::kSize + 10);
s = ra_file_reader_->Read(IOOptions(), footer_offset, BlobLogFooter::kSize,
&result, &buf[0], nullptr,
Env::IO_TOTAL /* rate_limiter_priority */);
&result, &buf[0], nullptr);
}
if (!s.ok()) return s;
if (result.size() != BlobLogFooter::kSize) {
@ -238,13 +236,11 @@ Status BlobFile::ReadMetadata(const std::shared_ptr<FileSystem>& fs,
// TODO: rate limit reading headers from blob files.
if (file_reader->use_direct_io()) {
s = file_reader->Read(IOOptions(), 0, BlobLogHeader::kSize, &header_slice,
nullptr, &aligned_buf,
Env::IO_TOTAL /* rate_limiter_priority */);
nullptr, &aligned_buf);
} else {
header_buf.reserve(BlobLogHeader::kSize);
s = file_reader->Read(IOOptions(), 0, BlobLogHeader::kSize, &header_slice,
&header_buf[0], nullptr,
Env::IO_TOTAL /* rate_limiter_priority */);
&header_buf[0], nullptr);
}
if (!s.ok()) {
ROCKS_LOG_ERROR(
@ -281,13 +277,12 @@ Status BlobFile::ReadMetadata(const std::shared_ptr<FileSystem>& fs,
if (file_reader->use_direct_io()) {
s = file_reader->Read(IOOptions(), file_size - BlobLogFooter::kSize,
BlobLogFooter::kSize, &footer_slice, nullptr,
&aligned_buf,
Env::IO_TOTAL /* rate_limiter_priority */);
&aligned_buf);
} else {
footer_buf.reserve(BlobLogFooter::kSize);
s = file_reader->Read(IOOptions(), file_size - BlobLogFooter::kSize,
BlobLogFooter::kSize, &footer_slice, &footer_buf[0],
nullptr, Env::IO_TOTAL /* rate_limiter_priority */);
nullptr);
}
if (!s.ok()) {
ROCKS_LOG_ERROR(

View File

@ -249,8 +249,7 @@ class FromFileCacheDumpReader : public CacheDumpReader {
while (to_read > 0) {
io_s = file_reader_->Read(IOOptions(), offset_, to_read, &result_,
buffer_, nullptr,
Env::IO_TOTAL /* rate_limiter_priority */);
buffer_, nullptr);
if (!io_s.ok()) {
return io_s;
}

View File

@ -236,7 +236,7 @@ bool RandomAccessCacheFile::Read(const LBA& lba, Slice* key, Slice* val,
Slice result;
Status s = freader_->Read(IOOptions(), lba.off_, lba.size_, &result, scratch,
nullptr, Env::IO_TOTAL /* rate_limiter_priority */);
nullptr);
if (!s.ok()) {
Error(log_, "Error reading from file %s. %s", Path().c_str(),
s.ToString().c_str());
@ -605,4 +605,3 @@ void ThreadedWriter::DispatchIO(const IO& io) {
}
} // namespace ROCKSDB_NAMESPACE

View File

@ -42,8 +42,7 @@ Status FileTraceReader::Reset() {
Status FileTraceReader::Read(std::string* data) {
assert(file_reader_ != nullptr);
Status s = file_reader_->Read(IOOptions(), offset_, kTraceMetadataSize,
&result_, buffer_, nullptr,
Env::IO_TOTAL /* rate_limiter_priority */);
&result_, buffer_, nullptr);
if (!s.ok()) {
return s;
}
@ -68,7 +67,7 @@ Status FileTraceReader::Read(std::string* data) {
bytes_to_read > kBufferSize ? kBufferSize : bytes_to_read;
while (to_read > 0) {
s = file_reader_->Read(IOOptions(), offset_, to_read, &result_, buffer_,
nullptr, Env::IO_TOTAL /* rate_limiter_priority */);
nullptr);
if (!s.ok()) {
return s;
}

View File

@ -232,27 +232,56 @@ Status TransactionBaseImpl::PopSavePoint() {
return write_batch_.PopSavePoint();
}
Status TransactionBaseImpl::Get(const ReadOptions& read_options,
Status TransactionBaseImpl::Get(const ReadOptions& _read_options,
ColumnFamilyHandle* column_family,
const Slice& key, std::string* value) {
if (read_options.io_activity != Env::IOActivity::kUnknown) {
if (_read_options.io_activity != Env::IOActivity::kUnknown &&
_read_options.io_activity != Env::IOActivity::kGet) {
return Status::InvalidArgument(
"Cannot call Get with `ReadOptions::io_activity` != "
"`Env::IOActivity::kUnknown`");
"Can only call Get with `ReadOptions::io_activity` is "
"`Env::IOActivity::kUnknown` or `Env::IOActivity::kGet`");
}
ReadOptions read_options(_read_options);
if (read_options.io_activity == Env::IOActivity::kUnknown) {
read_options.io_activity = Env::IOActivity::kGet;
}
auto s = GetImpl(read_options, column_family, key, value);
return s;
}
Status TransactionBaseImpl::GetImpl(const ReadOptions& read_options,
ColumnFamilyHandle* column_family,
const Slice& key, std::string* value) {
assert(value != nullptr);
PinnableSlice pinnable_val(value);
assert(!pinnable_val.IsPinned());
auto s = Get(read_options, column_family, key, &pinnable_val);
auto s = GetImpl(read_options, column_family, key, &pinnable_val);
if (s.ok() && pinnable_val.IsPinned()) {
value->assign(pinnable_val.data(), pinnable_val.size());
} // else value is already assigned
return s;
}
Status TransactionBaseImpl::Get(const ReadOptions& read_options,
Status TransactionBaseImpl::Get(const ReadOptions& _read_options,
ColumnFamilyHandle* column_family,
const Slice& key, PinnableSlice* pinnable_val) {
if (_read_options.io_activity != Env::IOActivity::kUnknown &&
_read_options.io_activity != Env::IOActivity::kGet) {
return Status::InvalidArgument(
"Can only call Get with `ReadOptions::io_activity` is "
"`Env::IOActivity::kUnknown` or `Env::IOActivity::kGet`");
}
ReadOptions read_options(_read_options);
if (read_options.io_activity == Env::IOActivity::kUnknown) {
read_options.io_activity = Env::IOActivity::kGet;
}
return GetImpl(read_options, column_family, key, pinnable_val);
}
Status TransactionBaseImpl::GetImpl(const ReadOptions& read_options,
ColumnFamilyHandle* column_family,
const Slice& key,
PinnableSlice* pinnable_val) {
return write_batch_.GetFromBatchAndDB(db_, read_options, column_family, key,
pinnable_val);
}
@ -279,7 +308,7 @@ Status TransactionBaseImpl::GetForUpdate(const ReadOptions& read_options,
assert(value != nullptr);
PinnableSlice pinnable_val(value);
assert(!pinnable_val.IsPinned());
s = Get(read_options, column_family, key, &pinnable_val);
s = GetImpl(read_options, column_family, key, &pinnable_val);
if (s.ok() && pinnable_val.IsPinned()) {
value->assign(pinnable_val.data(), pinnable_val.size());
} // else value is already assigned
@ -307,39 +336,63 @@ Status TransactionBaseImpl::GetForUpdate(const ReadOptions& read_options,
TryLock(column_family, key, true /* read_only */, exclusive, do_validate);
if (s.ok() && pinnable_val != nullptr) {
s = Get(read_options, column_family, key, pinnable_val);
s = GetImpl(read_options, column_family, key, pinnable_val);
}
return s;
}
std::vector<Status> TransactionBaseImpl::MultiGet(
const ReadOptions& read_options,
const ReadOptions& _read_options,
const std::vector<ColumnFamilyHandle*>& column_family,
const std::vector<Slice>& keys, std::vector<std::string>* values) {
size_t num_keys = keys.size();
if (read_options.io_activity != Env::IOActivity::kUnknown) {
std::vector<Status> stat_list(num_keys);
if (_read_options.io_activity != Env::IOActivity::kUnknown &&
_read_options.io_activity != Env::IOActivity::kMultiGet) {
Status s = Status::InvalidArgument(
"Cannot call MultiGet with `ReadOptions::io_activity` != "
"`Env::IOActivity::kUnknown`");
return std::vector<Status>(num_keys, s);
"Can only call MultiGet with `ReadOptions::io_activity` is "
"`Env::IOActivity::kUnknown` or `Env::IOActivity::kMultiGet`");
for (size_t i = 0; i < num_keys; ++i) {
stat_list[i] = s;
}
return stat_list;
}
ReadOptions read_options(_read_options);
if (read_options.io_activity == Env::IOActivity::kUnknown) {
read_options.io_activity = Env::IOActivity::kMultiGet;
}
values->resize(num_keys);
std::vector<Status> stat_list(num_keys);
for (size_t i = 0; i < num_keys; ++i) {
stat_list[i] = Get(read_options, column_family[i], keys[i], &(*values)[i]);
stat_list[i] =
GetImpl(read_options, column_family[i], keys[i], &(*values)[i]);
}
return stat_list;
}
void TransactionBaseImpl::MultiGet(const ReadOptions& read_options,
void TransactionBaseImpl::MultiGet(const ReadOptions& _read_options,
ColumnFamilyHandle* column_family,
const size_t num_keys, const Slice* keys,
PinnableSlice* values, Status* statuses,
const bool sorted_input) {
assert(read_options.io_activity == Env::IOActivity::kUnknown);
if (_read_options.io_activity != Env::IOActivity::kUnknown &&
_read_options.io_activity != Env::IOActivity::kMultiGet) {
Status s = Status::InvalidArgument(
"Can only call MultiGet with `ReadOptions::io_activity` is "
"`Env::IOActivity::kUnknown` or `Env::IOActivity::kMultiGet`");
for (size_t i = 0; i < num_keys; ++i) {
if (statuses[i].ok()) {
statuses[i] = s;
}
}
return;
}
ReadOptions read_options(_read_options);
if (read_options.io_activity == Env::IOActivity::kUnknown) {
read_options.io_activity = Env::IOActivity::kMultiGet;
}
write_batch_.MultiGetFromBatchAndDB(db_, read_options, column_family,
num_keys, keys, values, statuses,
sorted_input);
@ -349,7 +402,6 @@ std::vector<Status> TransactionBaseImpl::MultiGetForUpdate(
const ReadOptions& read_options,
const std::vector<ColumnFamilyHandle*>& column_family,
const std::vector<Slice>& keys, std::vector<std::string>* values) {
// Regardless of whether the MultiGet succeeded, track these keys.
size_t num_keys = keys.size();
if (read_options.io_activity != Env::IOActivity::kUnknown) {
Status s = Status::InvalidArgument(
@ -357,6 +409,7 @@ std::vector<Status> TransactionBaseImpl::MultiGetForUpdate(
"`Env::IOActivity::kUnknown`");
return std::vector<Status>(num_keys, s);
}
// Regardless of whether the MultiGet succeeded, track these keys.
values->resize(num_keys);
// Lock all keys
@ -372,7 +425,8 @@ std::vector<Status> TransactionBaseImpl::MultiGetForUpdate(
// TODO(agiardullo): optimize multiget?
std::vector<Status> stat_list(num_keys);
for (size_t i = 0; i < num_keys; ++i) {
stat_list[i] = Get(read_options, column_family[i], keys[i], &(*values)[i]);
stat_list[i] =
GetImpl(read_options, column_family[i], keys[i], &(*values)[i]);
}
return stat_list;

View File

@ -53,11 +53,13 @@ class TransactionBaseImpl : public Transaction {
Status PopSavePoint() override;
using Transaction::Get;
Status Get(const ReadOptions& options, ColumnFamilyHandle* column_family,
const Slice& key, std::string* value) override;
Status Get(const ReadOptions& _read_options,
ColumnFamilyHandle* column_family, const Slice& key,
std::string* value) override;
Status Get(const ReadOptions& options, ColumnFamilyHandle* column_family,
const Slice& key, PinnableSlice* value) override;
Status Get(const ReadOptions& _read_options,
ColumnFamilyHandle* column_family, const Slice& key,
PinnableSlice* value) override;
Status Get(const ReadOptions& options, const Slice& key,
std::string* value) override {
@ -84,7 +86,7 @@ class TransactionBaseImpl : public Transaction {
using Transaction::MultiGet;
std::vector<Status> MultiGet(
const ReadOptions& options,
const ReadOptions& _read_options,
const std::vector<ColumnFamilyHandle*>& column_family,
const std::vector<Slice>& keys,
std::vector<std::string>* values) override;
@ -98,9 +100,10 @@ class TransactionBaseImpl : public Transaction {
keys, values);
}
void MultiGet(const ReadOptions& options, ColumnFamilyHandle* column_family,
const size_t num_keys, const Slice* keys, PinnableSlice* values,
Status* statuses, const bool sorted_input = false) override;
void MultiGet(const ReadOptions& _read_options,
ColumnFamilyHandle* column_family, const size_t num_keys,
const Slice* keys, PinnableSlice* values, Status* statuses,
const bool sorted_input = false) override;
using Transaction::MultiGetForUpdate;
std::vector<Status> MultiGetForUpdate(
@ -260,6 +263,13 @@ class TransactionBaseImpl : public Transaction {
LockTracker& GetTrackedLocks() { return *tracked_locks_; }
protected:
Status GetImpl(const ReadOptions& options, ColumnFamilyHandle* column_family,
const Slice& key, std::string* value) override;
virtual Status GetImpl(const ReadOptions& options,
ColumnFamilyHandle* column_family, const Slice& key,
PinnableSlice* value) override;
// Add a key to the list of tracked keys.
//
// seqno is the earliest seqno this key was involved with this transaction.
@ -379,4 +389,3 @@ class TransactionBaseImpl : public Transaction {
};
} // namespace ROCKSDB_NAMESPACE

View File

@ -39,20 +39,37 @@ void WritePreparedTxn::Initialize(const TransactionOptions& txn_options) {
prepare_batch_cnt_ = 0;
}
void WritePreparedTxn::MultiGet(const ReadOptions& options,
void WritePreparedTxn::MultiGet(const ReadOptions& _read_options,
ColumnFamilyHandle* column_family,
const size_t num_keys, const Slice* keys,
PinnableSlice* values, Status* statuses,
const bool sorted_input) {
assert(options.io_activity == Env::IOActivity::kUnknown);
if (_read_options.io_activity != Env::IOActivity::kUnknown &&
_read_options.io_activity != Env::IOActivity::kMultiGet) {
Status s = Status::InvalidArgument(
"Can only call MultiGet with `ReadOptions::io_activity` is "
"`Env::IOActivity::kUnknown` or `Env::IOActivity::kMultiGet`");
for (size_t i = 0; i < num_keys; ++i) {
if (statuses[i].ok()) {
statuses[i] = s;
}
}
return;
}
ReadOptions read_options(_read_options);
if (read_options.io_activity == Env::IOActivity::kUnknown) {
read_options.io_activity = Env::IOActivity::kMultiGet;
}
SequenceNumber min_uncommitted, snap_seq;
const SnapshotBackup backed_by_snapshot =
wpt_db_->AssignMinMaxSeqs(options.snapshot, &min_uncommitted, &snap_seq);
const SnapshotBackup backed_by_snapshot = wpt_db_->AssignMinMaxSeqs(
read_options.snapshot, &min_uncommitted, &snap_seq);
WritePreparedTxnReadCallback callback(wpt_db_, snap_seq, min_uncommitted,
backed_by_snapshot);
write_batch_.MultiGetFromBatchAndDB(db_, options, column_family, num_keys,
keys, values, statuses, sorted_input,
&callback);
write_batch_.MultiGetFromBatchAndDB(db_, read_options, column_family,
num_keys, keys, values, statuses,
sorted_input, &callback);
if (UNLIKELY(!callback.valid() ||
!wpt_db_->ValidateSnapshot(snap_seq, backed_by_snapshot))) {
wpt_db_->WPRecordTick(TXN_GET_TRY_AGAIN);
@ -62,14 +79,27 @@ void WritePreparedTxn::MultiGet(const ReadOptions& options,
}
}
Status WritePreparedTxn::Get(const ReadOptions& options,
Status WritePreparedTxn::Get(const ReadOptions& _read_options,
ColumnFamilyHandle* column_family,
const Slice& key, PinnableSlice* pinnable_val) {
if (options.io_activity != Env::IOActivity::kUnknown) {
if (_read_options.io_activity != Env::IOActivity::kUnknown &&
_read_options.io_activity != Env::IOActivity::kGet) {
return Status::InvalidArgument(
"Cannot call Get with `ReadOptions::io_activity` != "
"`Env::IOActivity::kUnknown`");
"Can only call Get with `ReadOptions::io_activity` is "
"`Env::IOActivity::kUnknown` or `Env::IOActivity::kGet`");
}
ReadOptions read_options(_read_options);
if (read_options.io_activity == Env::IOActivity::kUnknown) {
read_options.io_activity = Env::IOActivity::kGet;
}
return GetImpl(read_options, column_family, key, pinnable_val);
}
Status WritePreparedTxn::GetImpl(const ReadOptions& options,
ColumnFamilyHandle* column_family,
const Slice& key,
PinnableSlice* pinnable_val) {
SequenceNumber min_uncommitted, snap_seq;
const SnapshotBackup backed_by_snapshot =
wpt_db_->AssignMinMaxSeqs(options.snapshot, &min_uncommitted, &snap_seq);

View File

@ -51,12 +51,12 @@ class WritePreparedTxn : public PessimisticTransaction {
// seq in the WAL that is also published, LastPublishedSequence, as opposed to
// the last seq in the memtable.
using Transaction::Get;
virtual Status Get(const ReadOptions& options,
virtual Status Get(const ReadOptions& _read_options,
ColumnFamilyHandle* column_family, const Slice& key,
PinnableSlice* value) override;
using Transaction::MultiGet;
virtual void MultiGet(const ReadOptions& options,
virtual void MultiGet(const ReadOptions& _read_options,
ColumnFamilyHandle* column_family,
const size_t num_keys, const Slice* keys,
PinnableSlice* values, Status* statuses,
@ -86,6 +86,10 @@ class WritePreparedTxn : public PessimisticTransaction {
friend class WriteUnpreparedTxnDB;
friend class WriteUnpreparedTxn;
using Transaction::GetImpl;
Status GetImpl(const ReadOptions& options, ColumnFamilyHandle* column_family,
const Slice& key, PinnableSlice* value) override;
Status PrepareInternal() override;
Status CommitWithoutPrepareInternal() override;

View File

@ -247,14 +247,26 @@ Status WritePreparedTxnDB::WriteInternal(const WriteOptions& write_options_orig,
return s;
}
Status WritePreparedTxnDB::Get(const ReadOptions& options,
Status WritePreparedTxnDB::Get(const ReadOptions& _read_options,
ColumnFamilyHandle* column_family,
const Slice& key, PinnableSlice* value) {
if (options.io_activity != Env::IOActivity::kUnknown) {
if (_read_options.io_activity != Env::IOActivity::kUnknown &&
_read_options.io_activity != Env::IOActivity::kGet) {
return Status::InvalidArgument(
"Cannot call Get with `ReadOptions::io_activity` != "
"`Env::IOActivity::kUnknown`");
"Can only call Get with `ReadOptions::io_activity` is "
"`Env::IOActivity::kUnknown` or `Env::IOActivity::kGet`");
}
ReadOptions read_options(_read_options);
if (read_options.io_activity == Env::IOActivity::kUnknown) {
read_options.io_activity = Env::IOActivity::kGet;
}
return GetImpl(read_options, column_family, key, value);
}
Status WritePreparedTxnDB::GetImpl(const ReadOptions& options,
ColumnFamilyHandle* column_family,
const Slice& key, PinnableSlice* value) {
SequenceNumber min_uncommitted, snap_seq;
const SnapshotBackup backed_by_snapshot =
AssignMinMaxSeqs(options.snapshot, &min_uncommitted, &snap_seq);
@ -314,16 +326,35 @@ void WritePreparedTxnDB::UpdateCFComparatorMap(ColumnFamilyHandle* h) {
}
std::vector<Status> WritePreparedTxnDB::MultiGet(
const ReadOptions& options,
const ReadOptions& _read_options,
const std::vector<ColumnFamilyHandle*>& column_family,
const std::vector<Slice>& keys, std::vector<std::string>* values) {
assert(values);
size_t num_keys = keys.size();
std::vector<Status> stat_list(num_keys);
if (_read_options.io_activity != Env::IOActivity::kUnknown &&
_read_options.io_activity != Env::IOActivity::kMultiGet) {
Status s = Status::InvalidArgument(
"Can only call MultiGet with `ReadOptions::io_activity` is "
"`Env::IOActivity::kUnknown` or `Env::IOActivity::kMultiGet`");
for (size_t i = 0; i < num_keys; ++i) {
stat_list[i] = s;
}
return stat_list;
}
ReadOptions read_options(_read_options);
if (read_options.io_activity == Env::IOActivity::kUnknown) {
read_options.io_activity = Env::IOActivity::kMultiGet;
}
values->resize(num_keys);
std::vector<Status> stat_list(num_keys);
for (size_t i = 0; i < num_keys; ++i) {
stat_list[i] = this->Get(options, column_family[i], keys[i], &(*values)[i]);
stat_list[i] =
this->GetImpl(read_options, column_family[i], keys[i], &(*values)[i]);
}
return stat_list;
}
@ -346,22 +377,27 @@ static void CleanupWritePreparedTxnDBIterator(void* arg1, void* /*arg2*/) {
}
} // anonymous namespace
Iterator* WritePreparedTxnDB::NewIterator(const ReadOptions& options,
Iterator* WritePreparedTxnDB::NewIterator(const ReadOptions& _read_options,
ColumnFamilyHandle* column_family) {
if (options.io_activity != Env::IOActivity::kUnknown) {
if (_read_options.io_activity != Env::IOActivity::kUnknown &&
_read_options.io_activity != Env::IOActivity::kDBIterator) {
return NewErrorIterator(Status::InvalidArgument(
"Cannot call NewIterator with `ReadOptions::io_activity` != "
"`Env::IOActivity::kUnknown`"));
"Can only call NewIterator with `ReadOptions::io_activity` is "
"`Env::IOActivity::kUnknown` or `Env::IOActivity::kDBIterator`"));
}
ReadOptions read_options(_read_options);
if (read_options.io_activity == Env::IOActivity::kUnknown) {
read_options.io_activity = Env::IOActivity::kDBIterator;
}
constexpr bool expose_blob_index = false;
constexpr bool allow_refresh = false;
std::shared_ptr<ManagedSnapshot> own_snapshot = nullptr;
SequenceNumber snapshot_seq = kMaxSequenceNumber;
SequenceNumber min_uncommitted = 0;
if (options.snapshot != nullptr) {
snapshot_seq = options.snapshot->GetSequenceNumber();
if (read_options.snapshot != nullptr) {
snapshot_seq = read_options.snapshot->GetSequenceNumber();
min_uncommitted =
static_cast_with_check<const SnapshotImpl>(options.snapshot)
static_cast_with_check<const SnapshotImpl>(read_options.snapshot)
->min_uncommitted_;
} else {
auto* snapshot = GetSnapshot();
@ -377,26 +413,37 @@ Iterator* WritePreparedTxnDB::NewIterator(const ReadOptions& options,
static_cast_with_check<ColumnFamilyHandleImpl>(column_family)->cfd();
auto* state =
new IteratorState(this, snapshot_seq, own_snapshot, min_uncommitted);
auto* db_iter =
db_impl_->NewIteratorImpl(options, cfd, snapshot_seq, &state->callback,
expose_blob_index, allow_refresh);
auto* db_iter = db_impl_->NewIteratorImpl(read_options, cfd, snapshot_seq,
&state->callback, expose_blob_index,
allow_refresh);
db_iter->RegisterCleanup(CleanupWritePreparedTxnDBIterator, state, nullptr);
return db_iter;
}
Status WritePreparedTxnDB::NewIterators(
const ReadOptions& options,
const ReadOptions& _read_options,
const std::vector<ColumnFamilyHandle*>& column_families,
std::vector<Iterator*>* iterators) {
if (_read_options.io_activity != Env::IOActivity::kUnknown &&
_read_options.io_activity != Env::IOActivity::kDBIterator) {
return Status::InvalidArgument(
"Can only call NewIterator with `ReadOptions::io_activity` is "
"`Env::IOActivity::kUnknown` or `Env::IOActivity::kDBIterator`");
}
ReadOptions read_options(_read_options);
if (read_options.io_activity == Env::IOActivity::kUnknown) {
read_options.io_activity = Env::IOActivity::kDBIterator;
}
constexpr bool expose_blob_index = false;
constexpr bool allow_refresh = false;
std::shared_ptr<ManagedSnapshot> own_snapshot = nullptr;
SequenceNumber snapshot_seq = kMaxSequenceNumber;
SequenceNumber min_uncommitted = 0;
if (options.snapshot != nullptr) {
snapshot_seq = options.snapshot->GetSequenceNumber();
if (read_options.snapshot != nullptr) {
snapshot_seq = read_options.snapshot->GetSequenceNumber();
min_uncommitted =
static_cast_with_check<const SnapshotImpl>(options.snapshot)
static_cast_with_check<const SnapshotImpl>(read_options.snapshot)
->min_uncommitted_;
} else {
auto* snapshot = GetSnapshot();
@ -414,8 +461,8 @@ Status WritePreparedTxnDB::NewIterators(
static_cast_with_check<ColumnFamilyHandleImpl>(column_family)->cfd();
auto* state =
new IteratorState(this, snapshot_seq, own_snapshot, min_uncommitted);
auto* db_iter =
db_impl_->NewIteratorImpl(options, cfd, snapshot_seq, &state->callback,
auto* db_iter = db_impl_->NewIteratorImpl(read_options, cfd, snapshot_seq,
&state->callback,
expose_blob_index, allow_refresh);
db_iter->RegisterCleanup(CleanupWritePreparedTxnDBIterator, state, nullptr);
iterators->push_back(db_iter);

View File

@ -83,24 +83,24 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
size_t batch_cnt, WritePreparedTxn* txn);
using DB::Get;
virtual Status Get(const ReadOptions& options,
virtual Status Get(const ReadOptions& _read_options,
ColumnFamilyHandle* column_family, const Slice& key,
PinnableSlice* value) override;
using DB::MultiGet;
virtual std::vector<Status> MultiGet(
const ReadOptions& options,
const ReadOptions& _read_options,
const std::vector<ColumnFamilyHandle*>& column_family,
const std::vector<Slice>& keys,
std::vector<std::string>* values) override;
using DB::NewIterator;
virtual Iterator* NewIterator(const ReadOptions& options,
virtual Iterator* NewIterator(const ReadOptions& _read_options,
ColumnFamilyHandle* column_family) override;
using DB::NewIterators;
virtual Status NewIterators(
const ReadOptions& options,
const ReadOptions& _read_options,
const std::vector<ColumnFamilyHandle*>& column_families,
std::vector<Iterator*>* iterators) override;
@ -520,6 +520,21 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
RecordTick(db_impl_->immutable_db_options_.statistics.get(), ticker_type);
}
Status GetImpl(const ReadOptions& options, ColumnFamilyHandle* column_family,
const Slice& key, std::string* value) {
assert(value != nullptr);
PinnableSlice pinnable_val(value);
assert(!pinnable_val.IsPinned());
auto s = GetImpl(options, column_family, key, &pinnable_val);
if (s.ok() && pinnable_val.IsPinned()) {
value->assign(pinnable_val.data(), pinnable_val.size());
} // else value is already assigned
return s;
}
Status GetImpl(const ReadOptions& options, ColumnFamilyHandle* column_family,
const Slice& key, PinnableSlice* value);
// A heap with the amortized O(1) complexity for erase. It uses one extra heap
// to keep track of erased entries that are not yet on top of the main heap.
class PreparedHeap {

View File

@ -943,20 +943,36 @@ Status WriteUnpreparedTxn::PopSavePoint() {
return Status::NotFound();
}
void WriteUnpreparedTxn::MultiGet(const ReadOptions& options,
void WriteUnpreparedTxn::MultiGet(const ReadOptions& _read_options,
ColumnFamilyHandle* column_family,
const size_t num_keys, const Slice* keys,
PinnableSlice* values, Status* statuses,
const bool sorted_input) {
assert(options.io_activity == Env::IOActivity::kUnknown);
if (_read_options.io_activity != Env::IOActivity::kUnknown &&
_read_options.io_activity != Env::IOActivity::kMultiGet) {
Status s = Status::InvalidArgument(
"Can only call MultiGet with `ReadOptions::io_activity` is "
"`Env::IOActivity::kUnknown` or `Env::IOActivity::kMultiGet`");
for (size_t i = 0; i < num_keys; ++i) {
if (statuses[i].ok()) {
statuses[i] = s;
}
}
return;
}
ReadOptions read_options(_read_options);
if (read_options.io_activity == Env::IOActivity::kUnknown) {
read_options.io_activity = Env::IOActivity::kMultiGet;
}
SequenceNumber min_uncommitted, snap_seq;
const SnapshotBackup backed_by_snapshot =
wupt_db_->AssignMinMaxSeqs(options.snapshot, &min_uncommitted, &snap_seq);
const SnapshotBackup backed_by_snapshot = wupt_db_->AssignMinMaxSeqs(
read_options.snapshot, &min_uncommitted, &snap_seq);
WriteUnpreparedTxnReadCallback callback(wupt_db_, snap_seq, min_uncommitted,
unprep_seqs_, backed_by_snapshot);
write_batch_.MultiGetFromBatchAndDB(db_, options, column_family, num_keys,
keys, values, statuses, sorted_input,
&callback);
write_batch_.MultiGetFromBatchAndDB(db_, read_options, column_family,
num_keys, keys, values, statuses,
sorted_input, &callback);
if (UNLIKELY(!callback.valid() ||
!wupt_db_->ValidateSnapshot(snap_seq, backed_by_snapshot))) {
wupt_db_->WPRecordTick(TXN_GET_TRY_AGAIN);
@ -966,14 +982,26 @@ void WriteUnpreparedTxn::MultiGet(const ReadOptions& options,
}
}
Status WriteUnpreparedTxn::Get(const ReadOptions& options,
Status WriteUnpreparedTxn::Get(const ReadOptions& _read_options,
ColumnFamilyHandle* column_family,
const Slice& key, PinnableSlice* value) {
if (options.io_activity != Env::IOActivity::kUnknown) {
if (_read_options.io_activity != Env::IOActivity::kUnknown &&
_read_options.io_activity != Env::IOActivity::kGet) {
return Status::InvalidArgument(
"Cannot call Get with `ReadOptions::io_activity` != "
"`Env::IOActivity::kUnknown`");
"Can only call Get with `ReadOptions::io_activity` is "
"`Env::IOActivity::kUnknown` or `Env::IOActivity::kGet`");
}
ReadOptions read_options(_read_options);
if (read_options.io_activity == Env::IOActivity::kUnknown) {
read_options.io_activity = Env::IOActivity::kGet;
}
return GetImpl(read_options, column_family, key, value);
}
Status WriteUnpreparedTxn::GetImpl(const ReadOptions& options,
ColumnFamilyHandle* column_family,
const Slice& key, PinnableSlice* value) {
SequenceNumber min_uncommitted, snap_seq;
const SnapshotBackup backed_by_snapshot =
wupt_db_->AssignMinMaxSeqs(options.snapshot, &min_uncommitted, &snap_seq);

View File

@ -184,12 +184,12 @@ class WriteUnpreparedTxn : public WritePreparedTxn {
// Get and GetIterator needs to be overridden so that a ReadCallback to
// handle read-your-own-write is used.
using Transaction::Get;
virtual Status Get(const ReadOptions& options,
virtual Status Get(const ReadOptions& _read_options,
ColumnFamilyHandle* column_family, const Slice& key,
PinnableSlice* value) override;
using Transaction::MultiGet;
virtual void MultiGet(const ReadOptions& options,
virtual void MultiGet(const ReadOptions& _read_options,
ColumnFamilyHandle* column_family,
const size_t num_keys, const Slice* keys,
PinnableSlice* values, Status* statuses,
@ -211,6 +211,10 @@ class WriteUnpreparedTxn : public WritePreparedTxn {
friend class WriteUnpreparedTxnDB;
const std::map<SequenceNumber, size_t>& GetUnpreparedSequenceNumbers();
using Transaction::GetImpl;
Status GetImpl(const ReadOptions& options, ColumnFamilyHandle* column_family,
const Slice& key, PinnableSlice* value) override;
Status WriteRollbackKeys(const LockTracker& tracked_keys,
WriteBatchWithIndex* rollback_batch,
ReadCallback* callback, const ReadOptions& roptions);
@ -336,4 +340,3 @@ class WriteUnpreparedTxn : public WritePreparedTxn {
};
} // namespace ROCKSDB_NAMESPACE

View File

@ -385,13 +385,19 @@ static void CleanupWriteUnpreparedTxnDBIterator(void* arg1, void* /*arg2*/) {
}
} // anonymous namespace
Iterator* WriteUnpreparedTxnDB::NewIterator(const ReadOptions& options,
Iterator* WriteUnpreparedTxnDB::NewIterator(const ReadOptions& _read_options,
ColumnFamilyHandle* column_family,
WriteUnpreparedTxn* txn) {
if (options.io_activity != Env::IOActivity::kUnknown) {
if (_read_options.io_activity != Env::IOActivity::kUnknown &&
_read_options.io_activity != Env::IOActivity::kDBIterator) {
return NewErrorIterator(Status::InvalidArgument(
"Cannot call NewIterator with `ReadOptions::io_activity` != "
"`Env::IOActivity::kUnknown`"));
"Can only call NewIterator with `ReadOptions::io_activity` is "
"`Env::IOActivity::kUnknown` or `Env::IOActivity::kDBIterator`"));
}
ReadOptions read_options(_read_options);
if (read_options.io_activity == Env::IOActivity::kUnknown) {
read_options.io_activity = Env::IOActivity::kDBIterator;
}
// TODO(lth): Refactor so that this logic is shared with WritePrepared.
constexpr bool expose_blob_index = false;
@ -431,11 +437,11 @@ Iterator* WriteUnpreparedTxnDB::NewIterator(const ReadOptions& options,
// max_visible_seq, and then return the last visible value, so that this
// restriction can be lifted.
const Snapshot* snapshot = nullptr;
if (options.snapshot == nullptr) {
if (read_options.snapshot == nullptr) {
snapshot = GetSnapshot();
own_snapshot = std::make_shared<ManagedSnapshot>(db_impl_, snapshot);
} else {
snapshot = options.snapshot;
snapshot = read_options.snapshot;
}
snapshot_seq = snapshot->GetSequenceNumber();
@ -467,8 +473,8 @@ Iterator* WriteUnpreparedTxnDB::NewIterator(const ReadOptions& options,
auto* state =
new IteratorState(this, snapshot_seq, own_snapshot, min_uncommitted, txn);
auto* db_iter = db_impl_->NewIteratorImpl(
options, cfd, state->MaxVisibleSeq(), &state->callback, expose_blob_index,
allow_refresh);
read_options, cfd, state->MaxVisibleSeq(), &state->callback,
expose_blob_index, allow_refresh);
db_iter->RegisterCleanup(CleanupWriteUnpreparedTxnDBIterator, state, nullptr);
return db_iter;
}

View File

@ -27,7 +27,7 @@ class WriteUnpreparedTxnDB : public WritePreparedTxnDB {
struct IteratorState;
using WritePreparedTxnDB::NewIterator;
Iterator* NewIterator(const ReadOptions& options,
Iterator* NewIterator(const ReadOptions& _read_options,
ColumnFamilyHandle* column_family,
WriteUnpreparedTxn* txn);

View File

@ -594,14 +594,19 @@ Status DBWithTTLImpl::Write(const WriteOptions& opts, WriteBatch* updates) {
}
}
Iterator* DBWithTTLImpl::NewIterator(const ReadOptions& opts,
Iterator* DBWithTTLImpl::NewIterator(const ReadOptions& _read_options,
ColumnFamilyHandle* column_family) {
if (opts.io_activity != Env::IOActivity::kUnknown) {
if (_read_options.io_activity != Env::IOActivity::kUnknown &&
_read_options.io_activity != Env::IOActivity::kDBIterator) {
return NewErrorIterator(Status::InvalidArgument(
"Cannot call NewIterator with `ReadOptions::io_activity` != "
"`Env::IOActivity::kUnknown`"));
"Can only call NewIterator with `ReadOptions::io_activity` is "
"`Env::IOActivity::kUnknown` or `Env::IOActivity::kDBIterator`"));
}
return new TtlIterator(db_->NewIterator(opts, column_family));
ReadOptions read_options(_read_options);
if (read_options.io_activity == Env::IOActivity::kUnknown) {
read_options.io_activity = Env::IOActivity::kDBIterator;
}
return new TtlIterator(db_->NewIterator(read_options, column_family));
}
void DBWithTTLImpl::SetTtl(ColumnFamilyHandle* h, int32_t ttl) {

View File

@ -78,7 +78,7 @@ class DBWithTTLImpl : public DBWithTTL {
virtual Status Write(const WriteOptions& opts, WriteBatch* updates) override;
using StackableDB::NewIterator;
virtual Iterator* NewIterator(const ReadOptions& opts,
virtual Iterator* NewIterator(const ReadOptions& _read_options,
ColumnFamilyHandle* column_family) override;
virtual DB* GetBaseDB() override { return db_; }

View File

@ -531,7 +531,8 @@ Status WriteBatchWithIndex::GetFromBatchAndDB(
// Did not find key in batch OR could not resolve Merges. Try DB.
if (!callback) {
s = db->Get(read_options, column_family, key, pinnable_val);
s = static_cast_with_check<DBImpl>(db->GetRootDB())
->GetImpl(read_options, column_family, key, pinnable_val);
} else {
DBImpl::GetImplOptions get_impl_options;
get_impl_options.column_family = column_family;